ApsaraMQ for Kafka インスタンスは、Filebeat に出力として接続できます。このトピックでは、仮想プライベートクラウド (VPC) で Filebeat を使用して ApsaraMQ for Kafka にメッセージを送信する方法について説明します。
前提条件
このチュートリアルを開始する前に、以下の操作が完了していることを確認してください。
ApsaraMQ for Kafka インスタンスが購入され、デプロイされていること。詳細については、「VPC 接続インスタンスの購入とデプロイ」をご参照ください。
Filebeat がダウンロードされ、インストールされていること。詳細については、「Filebeat のダウンロード」をご参照ください。
Java Development Kit (JDK) 8 がダウンロードされ、インストールされていること。詳細については、「JDK 8 のダウンロード」をご参照ください。
手順 1:エンドポイントを取得する
Filebeat は、ApsaraMQ for Kafka エンドポイントを使用して ApsaraMQ for Kafka への接続を確立します。
ApsaraMQ for Kafka コンソール にログインします。
リソースの分布概要 ページの セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
インスタンスリスト ページで、Filebeat に出力として接続するインスタンスの名前をクリックします。
アクセスポイント情報 セクションの インスタンスの詳細 ページで、インスタンスのエンドポイントを表示します。設定情報 セクションで、ユーザー名 パラメーターと パスワード パラメーターの値を取得します。
説明さまざまなタイプのエンドポイントの違いについては、「エンドポイントの比較」をご参照ください。
手順 2:トピックを作成する
メッセージを格納するためのトピックを作成するには、次の操作を実行します。
ApsaraMQ for Kafka コンソール にログインします。
リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
重要Elastic Compute Service ( ECS ) インスタンスがデプロイされているリージョンにトピックを作成する必要があります。トピックはリージョンをまたがって使用することはできません。たとえば、メッセージのプロデューサーとコンシューマーが中国 ( 北京 ) リージョンにデプロイされている ECS インスタンスで実行されている場合、トピックも中国 ( 北京 ) リージョンに作成する必要があります。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションペインで、トピック管理 をクリックします。
トピック管理 ページで、トピックの作成 をクリックします。
トピックの作成 パネルで、トピックのプロパティを指定し、[OK] をクリックします。
パラメーター
説明
例
名前
トピック名。
demo
記述
トピックの説明。
demo test
パーティションの数
トピック内のパーティション数。
12
ストレージエンジン
説明Professional Editionインスタンスを使用している場合のみ、ストレージエンジンの種類を指定できます。Standard Editionインスタンスを使用している場合、クラウドストレージがデフォルトで選択されます。
トピックにメッセージを格納するために使用されるストレージエンジンの種類。
ApsaraMQ for Kafka は、次の種類のストレージエンジンをサポートしています。
クラウドストレージ: この値を選択すると、システムはトピックにAlibaba Cloudディスクを使用し、分散モードで3つのレプリカにデータを格納します。このストレージエンジンは、低レイテンシ、高パフォーマンス、長期の耐久性、高信頼性を備えています。インスタンスの作成時に 仕様タイプ パラメーターを Standard Edition (High Write) に設定した場合、このパラメーターは クラウドストレージ にのみ設定できます。
ローカルストレージ: この値を選択すると、システムはオープンソースのApache Kafkaの同期レプリカ(ISR)アルゴリズムを使用し、分散モードで3つのレプリカにデータを格納します。
クラウドストレージ
メッセージタイプ
トピックのメッセージタイプ。有効な値:
通常のメッセージ: デフォルトでは、同じキーを持つメッセージは、メッセージが送信された順序で同じパーティションに格納されます。クラスター内のブローカーに障害が発生した場合、パーティションに格納されているメッセージの順序は保持されない場合があります。ストレージエンジン パラメーターを クラウドストレージ に設定すると、このパラメーターは自動的に 通常のメッセージ に設定されます。
パーティション順位メッセージ: デフォルトでは、同じキーを持つメッセージは、メッセージが送信された順序で同じパーティションに格納されます。クラスター内のブローカーに障害が発生した場合でも、メッセージは送信された順序でパーティションに格納されます。一部のパーティションのメッセージは、パーティションが復元されるまで送信できません。ストレージエンジン パラメーターを ローカルストレージ に設定すると、このパラメーターは自動的に パーティション順位メッセージ に設定されます。
通常のメッセージ
ログリリースポリシー
トピックで使用されるログクリーンアップポリシー。
ストレージエンジン パラメーターを ローカルストレージ に設定する場合は、ログリリースポリシー パラメーターを設定する必要があります。ApsaraMQ for Kafka Professional Editionインスタンスを使用している場合のみ、ストレージエンジンパラメーターをローカルストレージに設定できます。
ApsaraMQ for Kafka は、次のログクリーンアップポリシーを提供します。
Delete: デフォルトのログクリーンアップポリシー。システムに十分なストレージ容量がある場合、メッセージは最大保存期間に基づいて保持されます。ストレージ使用量が 85% を超えると、システムはサービスの可用性を確保するために最も古い格納済みメッセージを削除します。
Compact: Apache Kafkaで使用されるログ圧縮ポリシー。ログ圧縮により、同じキーを持つメッセージの最新の値が保持されます。このポリシーは、障害が発生したシステムの復元や、システムの再起動後のキャッシュの再読み込みなどのシナリオに適しています。たとえば、Kafka ConnectまたはConfluent Schema Registryを使用する場合、システムの状態と構成に関する情報をログ圧縮トピックに格納する必要があります。
重要ログ圧縮トピックは、Kafka ConnectやConfluent Schema Registryなどの特定のクラウドネイティブコンポーネントでのみ使用できます。詳細については、「aliware-kafka-demos」をご参照ください。
Compact
タグ
トピックに添付するタグ。
demo
トピックが作成されると、トピック管理 ページでトピックを表示できます。
ステップ 3:Filebeat を使用してメッセージを送信する
Filebeat がインストールされているサーバーで Filebeat を起動し、作成したトピックにメッセージを送信します。
cd コマンドを実行して、Filebeat のインストールディレクトリに切り替えます。
output.conf という名前の設定ファイルを作成します。
vim output.conf
コマンドを実行して、空の設定ファイルを作成します。i キーを押して、挿入モードに入ります。
次のコンテンツを入力します。
filebeat.inputs: - type: stdin output.kafka: hosts: ["alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092", "alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092", "alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092"] topic: 'filebeat_test' required_acks: 1 compression: none max_message_bytes: 1000000
パラメーター
説明
例
hosts
ApsaraMQ for Kafka インスタンスの VPC エンドポイント。ApsaraMQ for Kafka は、次の VPC エンドポイントをサポートしています。
デフォルトエンドポイント
Simple Authentication and Security Layer(SASL)エンドポイント
alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092, alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092, alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
topic
トピックの名前。
filebeat_test
required_acks
確認応答(ACK)の信頼性レベル。有効な値:
0: 応答なし
1: ローカルコミットを待機
-1: すべてのレプリカがコミットするのを待機
デフォルト値:1。
1
compression
データ圧縮コーデック。デフォルト値:gzip。有効な値:
none: なし
snappy: 圧縮と解凍に使用される C ++ 開発パッケージ
lz4: より高速な圧縮と解凍を可能にする可逆データ圧縮アルゴリズム
gzip: GNU フリーソフトウェア用のファイル圧縮プログラム
none
max_message_bytes
メッセージの最大サイズ。単位:バイト。デフォルト値:1000000。ApsaraMQ for Kafka に指定した最大メッセージサイズよりも小さい値である必要があります。
1000000
パラメーター設定の詳細については、「Kafka output plugin」をご参照ください。
Esc キーを押して、CLI モードに戻ります。
: キーを押して、ボトムラインモードに入ります。wq と入力し、Enter キーを押してファイルを保存して終了します。
作成したトピックにメッセージを送信します。
./filebeat -c ./output.yml
コマンドを実行します。test と入力し、Enter キーを押します。
手順 4: トピックのパーティションを表示する
トピックに送信されたメッセージを表示するには、次の操作を実行します。
ApsaraMQ for Kafka コンソールにログインします。
リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションペインで、トピック管理 をクリックします。
トピック管理 ページで、管理するトピックの名前をクリックします。トピックの詳細 ページで、パーテーションステータス タブをクリックします。
表 1. パーティションステータスに含まれるパラメーター
パラメーター
説明
Partition ID
パーティション ID。
Minimum Offset
パーティションの最小オフセット。
Maximum Offset
パーティションの最大オフセット。
Messages
パーティション内のメッセージ数。
Last Updated At
パーティション内の最後のメッセージが保存された時刻。
ステップ 5:オフセットでメッセージを照会する
送信されたメッセージを、パーティション ID とオフセット情報に基づいて照会できます。
ApsaraMQ for Kafka コンソールにログオンします。
リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションペインで、メッセージクエリ をクリックします。
メッセージクエリ ページで、位置によるクエリクエリモード ドロップダウンリストから を選択します。
Topic ドロップダウンリストからトピックを選択し、パーティション ドロップダウンリストからパーティションを選択し、開始点 フィールドにオフセット値を入力し、クエリ をクリックします。
指定したオフセット値以上のオフセット値を持つメッセージが表示されます。たとえば、[パーティション] パラメーターと [オフセット] パラメーターの値として 5 を指定すると、システムはパーティション 5 からオフセットが 5 以上のメッセージを照会します。
表 2. メッセージクエリ結果に含まれるパラメーター
パラメーター
説明
パーティション
メッセージが取得されるパーティション。
オフセット
メッセージのオフセット。
Key
メッセージキー。キーは文字列に変換されます。
Value
メッセージ値。メッセージコンテンツとも呼ばれます。メッセージ値は文字列に変換されます。
メッセージ作成時間
メッセージが送信された時点。値は、メッセージが送信されたときにクライアントが記録したタイムスタンプ、または
ProducerRecord
オブジェクトに指定したタイムスタンプフィールドの値です。説明タイムスタンプフィールドに値を指定した場合、指定した値が表示されます。
タイムスタンプフィールドに値を指定しなかった場合、メッセージが送信されたときのローカルシステム時刻が表示されます。
1970/x/x x:x:x 形式の値は、タイムスタンプフィールドが 0 または無効な値として指定されていることを示します。
ApsaraMQ for Kafka バージョン 0.9 以前のクライアントでは、タイムスタンプフィールドを指定できません。
操作
キーのダウンロード をクリックして、メッセージキーをダウンロードします。
値のダウンロード をクリックして、メッセージコンテンツをダウンロードします。
重要取得した各メッセージの最大 1 KB のコンテンツを ApsaraMQ for Kafka コンソールに表示できます。取得したメッセージのサイズが 1 KB を超える場合、システムはコンテンツを自動的に切り捨てます。完全なメッセージコンテンツを表示する場合は、メッセージをダウンロードしてください。
取得したメッセージは、一度に最大 10 MB までダウンロードできます。取得したメッセージのサイズが 10 MB を超える場合は、メッセージコンテンツの最初の 10 MB のみダウンロードできます。