ApsaraMQ for Kafka インスタンスを Filebeat に入力として接続できます。このトピックでは、VPC (Virtual Private Cloud) 環境で Filebeat を使用して ApsaraMQ for Kafka インスタンスからメッセージをコンシュームする方法について説明します。
背景情報
開始する前に、次の操作を完了してください:
ApsaraMQ for Kafka インスタンスを購入してデプロイします。このトピックでは、非サーバーレスインスタンスを例として使用します。詳細については、「VPC タイプのインスタンスに接続する」をご参照ください。
Filebeat をダウンロードしてインストールします。詳細については、「Filebeat のダウンロード」をご参照ください。
JDK 8 をダウンロードしてインストールします。詳細については、「JDK 8 のダウンロード」をご参照ください。
ステップ 1: エンドポイントの取得
Filebeat は、ApsaraMQ for Kafka インスタンスのエンドポイントを介して ApsaraMQ for Kafka インスタンスに接続します。
ApsaraMQ for Kafka コンソールにログインします。
概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
インスタンスリスト ページで、使用するインスタンスの名前をクリックします。
インスタンスの詳細 ページの アクセスポイント情報 セクションで、インスタンスのエンドポイントを表示します。設定情報 セクションで、ユーザー名 および パスワード パラメーターの値を取得します。
説明さまざまな種類のエンドポイントの違いについては、「エンドポイント間の比較」をご参照ください。
ステップ 2: Topic の作成
メッセージを格納するための Topic を作成します。
ApsaraMQ for Kafka コンソールにログインします。
概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
重要Elastic Compute Service (ECS) インスタンスがデプロイされているリージョンに Topic を作成する必要があります。Topic はリージョンをまたいで使用することはできません。たとえば、メッセージのプロデューサーとコンシューマーが中国 (北京) リージョンにデプロイされた ECS インスタンスで実行されている場合、Topic も中国 (北京) リージョンに作成する必要があります。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、トピック管理 をクリックします。
トピック管理 ページで、トピックの作成 をクリックします。
トピックの作成 パネルで、Topic のプロパティを指定し、[OK] をクリックします。
パラメーター
説明
例
名前
Topic 名。
demo
記述
Topic の説明。
demo test
パーティションの数
Topic 内のパーティション数。
12
ストレージエンジン
説明サーバーレスでない Professional Edition インスタンスを使用する場合にのみ、ストレージエンジンのタイプを指定できます。他のタイプのインスタンスでは、デフォルトで [クラウドストレージ] が選択されます。
Topic 内のメッセージを格納するために使用されるストレージエンジンのタイプ。
ApsaraMQ for Kafka は、次のタイプのストレージエンジンをサポートしています:
クラウドストレージ: この値を選択すると、システムは Topic に Alibaba Cloud ディスクを使用し、分散モードで 3 つのレプリカにデータを格納します。このストレージエンジンは、低レイテンシー、高性能、高耐久性、高信頼性を特徴としています。インスタンス作成時に 仕様タイプ パラメーターを Standard Edition (High Write) に設定した場合、このパラメーターは クラウドストレージ にしか設定できません。
ローカルストレージ: この値を選択すると、システムはオープンソースの Apache Kafka の In-Sync Replicas (ISR) アルゴリズムを使用し、分散モードで 3 つのレプリカにデータを格納します。
クラウドストレージ
メッセージタイプ
Topic のメッセージタイプ。有効な値:
通常のメッセージ: デフォルトでは、同じキーを持つメッセージは、送信された順序で同じパーティションに格納されます。クラスター内のブローカーに障害が発生した場合、パーティションに格納されているメッセージの順序が維持されないことがあります。ストレージエンジン パラメーターを クラウドストレージ に設定した場合、このパラメーターは自動的に 通常のメッセージ に設定されます。
パーティション順位メッセージ: デフォルトでは、同じキーを持つメッセージは、送信された順序で同じパーティションに格納されます。クラスター内のブローカーに障害が発生した場合でも、メッセージは送信された順序でパーティションに格納されます。一部のパーティションのメッセージは、パーティションが復元されるまで送信できません。ストレージエンジン パラメーターを ローカルストレージ に設定した場合、このパラメーターは自動的に パーティション順位メッセージ に設定されます。
通常のメッセージ
ログリリースポリシー
Topic で使用されるログクリーンアップポリシー。
ストレージエンジン パラメーターを ローカルストレージ に設定した場合は、ログリリースポリシー パラメーターを設定する必要があります。ストレージエンジンパラメーターをローカルストレージに設定できるのは、ApsaraMQ for Kafka Professional Edition インスタンスを使用する場合のみです。
ApsaraMQ for Kafka は、次のログクリーンアップポリシーを提供します:
Delete: デフォルトのログクリーンアップポリシー。システムに十分なストレージ容量がある場合、メッセージは最大保持期間に基づいて保持されます。ストレージ使用量が 85% を超えると、システムはサービスの可用性を確保するために最も古いメッセージを削除します。
Compact: Apache Kafka で使用されるログ圧縮ポリシー。ログ圧縮により、同じキーを持つメッセージの最新の値が保持されることが保証されます。このポリシーは、障害が発生したシステムの復元や、システム再起動後のキャッシュの再読み込みなどのシナリオに適しています。たとえば、Kafka Connect または Confluent Schema Registry を使用する場合、システムの状態と構成に関する情報をログ圧縮された Topic に格納する必要があります。
重要ログ圧縮された Topic は、Kafka Connect や Confluent Schema Registry などの特定のクラウドネイティブコンポーネントでのみ使用できます。詳細については、「aliware-kafka-demos」をご参照ください。
Compact
タグ
Topic にアタッチするタグ。
demo
Topic が作成されると、トピック管理 ページで Topic を表示できます。
ステップ 3: メッセージの送信
作成した Topic にメッセージを送信します。
ApsaraMQ for Kafka コンソールにログインします。
概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、トピック管理 をクリックします。
トピック管理 ページで、管理する Topic の名前をクリックします。トピックの詳細 ページの右上隅にある メッセージの送信を体験する をクリックします。名前
メッセージ送受信のクイック体験 パネルで、テスト用のメッセージを送信するためのパラメーターを設定します。
送信方法 パラメーターを コンソール に設定した場合は、次の手順を実行します:
メッセージキー フィールドに、メッセージキーを入力します。例: demo。
メッセージの内容 フィールドに、メッセージの内容を入力します。例: {"key": "test"}。
指定されたパーティションに送信 パラメーターを設定して、テストメッセージを特定のパーティションに送信するかどうかを指定します。
テストメッセージを特定のパーティションに送信する場合は、はい をクリックし、パーティション ID フィールドにパーティション ID を入力します。例: 0。パーティション ID のクエリ方法については、「パーティションステータスの表示」をご参照ください。
テストメッセージを特定のパーティションに送信しない場合は、いいえ をクリックします。
ApsaraMQ for Kafka SDK を使用するか、[メッセージの送受信を開始] パネルに表示される Docker コマンドを実行して、テストメッセージをサブスクライブします。
送信方法 パラメーターを [Docker] に設定した場合は、次の手順を実行して Docker コンテナーを実行します:
Docker コンテナーを実行してサンプルメッセージを生成する セクションに表示される Docker コマンドを実行して、テストメッセージを送信します。
送信後にメッセージを消費するにはどうすればよいですか? セクションに表示される Docker コマンドを実行して、テストメッセージをサブスクライブします。
送信方法 パラメーターを [SDK] に設定した場合は、必要なプログラミング言語またはフレームワークの SDK とアクセス方法を選択して、テストメッセージを送信およびサブスクライブします。
ステップ 4: グループの作成
Filebeat 用の グループを作成します。
ApsaraMQ for Kafka コンソールにログインします。
概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、Group の管理 をクリックします。
Group の管理 ページで、グループの作成 をクリックします。
グループの作成 パネルで、Group ID フィールドにグループ名を入力し、記述 フィールドにグループの説明を入力し、グループにタグをアタッチしてから、[OK] をクリックします。
コンシューマーグループを作成すると、Group の管理 ページでコンシューマーグループを表示できます。
ステップ 5: Filebeat を使用したメッセージのコンシューム
Filebeat がインストールされているサーバーで Filebeat を起動し、作成した Topic からメッセージをコンシュームします。
cd コマンドを実行して Filebeat のインストールディレクトリに移動します。
input.yml 設定ファイルを作成します。
vim input.ymlコマンドを実行して設定ファイルを作成します。i キーを押して挿入モードに入ります。
次の内容を入力します。
filebeat.inputs: - type: kafka hosts: - alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092 - alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092 - alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092 topics: ["filebeat_test"] group_id: "filebeat_group" output.console: pretty: trueパラメーター
説明
例
type
Filebeat の入力タイプ。
kafka
hosts
ApsaraMQ for Kafka は、次の VPC エンドポイントを提供します:
デフォルトのエンドポイント
SASL エンドポイント
- alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092 - alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092 - alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092topics
Topic の名前。
filebeat_test
group_id
グループの名前。
filebeat_group
パラメーターの詳細については、「Kafka 入力プラグイン」をご参照ください。
Esc キーを押して CLI モードに戻ります。
: キーを押してボトムラインモードに入ります。wq と入力して Enter キーを押し、ファイルを保存して終了します。
次のコマンドを実行してメッセージをコンシュームします。
./filebeat -c ./input.yml