ApsaraMQ for Kafka を出力として Filebeat に接続できます。このトピックでは、Filebeat を使用してインターネット経由で ApsaraMQ for Kafka にメッセージを送信する方法について説明します。
前提条件
開始する前に、次の操作を完了してください。
ApsaraMQ for Kafka インスタンスを購入してデプロイします。このトピックでは、非サーバーレスインスタンスを例として使用します。詳細については、「インターネットおよび VPC 接続インスタンスに接続する」をご参照ください。
Filebeat をダウンロードしてインストールします。詳細については、「Download Filebeat」をご参照ください。
Java Development Kit (JDK) 8 をダウンロードしてインストールします。詳細については、「Download JDK 8」をご参照ください。
ステップ 1: エンドポイント、ユーザー名、パスワードの取得
Filebeat は ApsaraMQ for Kafka エンドポイントを使用して ApsaraMQ for Kafka に接続します。
ApsaraMQ for Kafka コンソールにログインします。
概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが配置されているリージョンを選択します。
インスタンスリスト ページで、Filebeat に接続するインスタンスの名前をクリックします。
インスタンスの詳細 ページの アクセスポイント情報 セクションで、インスタンスのエンドポイントを表示します。設定情報 セクションで、ユーザー名 および パスワード パラメーターの値を取得します。
説明さまざまな種類のエンドポイントの違いについては、「エンドポイントの比較」をご参照ください。
ステップ 2: Topic の作成
メッセージを保存するための Topic を作成します。
ApsaraMQ for Kafka コンソールにログインします。
概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが配置されているリージョンを選択します。
重要Elastic Compute Service (ECS) インスタンスがデプロイされているリージョンに Topic を作成する必要があります。Topic はリージョンをまたいで使用することはできません。たとえば、メッセージのプロデューサーとコンシューマーが中国 (北京) リージョンにデプロイされている ECS インスタンスで実行されている場合、Topic も中国 (北京) リージョンに作成する必要があります。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、トピック管理 をクリックします。
トピック管理 ページで、トピックの作成 をクリックします。
トピックの作成 パネルで、Topic のプロパティを指定し、[OK] をクリックします。
パラメーター
説明
例
名前
Topic 名。
demo
記述
Topic の説明。
demo test
パーティションの数
Topic 内のパーティションの数。
12
ストレージエンジン
説明ストレージエンジンの種類は、非サーバーレスの Professional Edition インスタンスを使用する場合にのみ指定できます。他の種類のインスタンスでは、デフォルトで [クラウドストレージ] が選択されます。
Topic 内のメッセージを保存するために使用されるストレージエンジンの種類。
ApsaraMQ for Kafka は、次の種類のストレージエンジンをサポートしています。
クラウドストレージ: この値を選択すると、システムは Topic に Alibaba Cloud ディスクを使用し、分散モードで 3 つのレプリカにデータを保存します。このストレージエンジンは、低レイテンシー、高性能、高耐久性、高信頼性を特徴としています。インスタンスの作成時に 仕様タイプ パラメーターを Standard Edition (High Write) に設定した場合、このパラメーターは クラウドストレージ にのみ設定できます。
ローカルストレージ: この値を選択すると、システムはオープンソースの Apache Kafka の In-Sync Replicas (ISR) アルゴリズムを使用し、分散モードで 3 つのレプリカにデータを保存します。
クラウドストレージ
メッセージタイプ
Topic のメッセージタイプ。有効な値:
通常のメッセージ: デフォルトでは、同じキーを持つメッセージは、メッセージが送信された順序で同じパーティションに保存されます。クラスター内のブローカーに障害が発生した場合、パーティションに保存されているメッセージの順序が維持されないことがあります。ストレージエンジン パラメーターを クラウドストレージ に設定した場合、このパラメーターは自動的に 通常のメッセージ に設定されます。
パーティション順位メッセージ: デフォルトでは、同じキーを持つメッセージは、メッセージが送信された順序で同じパーティションに保存されます。クラスター内のブローカーに障害が発生した場合でも、メッセージは送信された順序でパーティションに保存されます。一部のパーティションのメッセージは、パーティションが復元されるまで送信できません。ストレージエンジン パラメーターを ローカルストレージ に設定した場合、このパラメーターは自動的に パーティション順位メッセージ に設定されます。
通常のメッセージ
ログリリースポリシー
Topic で使用されるログクリーンアップポリシー。
ストレージエンジン パラメーターを ローカルストレージ に設定した場合は、ログリリースポリシー パラメーターを設定する必要があります。ストレージエンジンパラメーターをローカルストレージに設定できるのは、ApsaraMQ for Kafka Professional Edition インスタンスを使用する場合のみです。
ApsaraMQ for Kafka は、次のログクリーンアップポリシーを提供します。
Delete: デフォルトのログクリーンアップポリシー。システムに十分なストレージ容量がある場合、メッセージは最大保持期間に基づいて保持されます。ストレージ使用量が 85% を超えると、システムはサービスの可用性を確保するために最も早く保存されたメッセージを削除します。
Compact: Apache Kafka で使用されるログ圧縮ポリシー。ログ圧縮により、同じキーを持つメッセージの最新の値が保持されます。このポリシーは、障害が発生したシステムの復元や、システム再起動後のキャッシュの再読み込みなどのシナリオに適しています。たとえば、Kafka Connect または Confluent Schema Registry を使用する場合、システムステータスと構成に関する情報をログ圧縮された Topic に保存する必要があります。
重要ログ圧縮された Topic は、Kafka Connect や Confluent Schema Registry などの特定のクラウドネイティブコンポーネントでのみ使用できます。詳細については、「aliware-kafka-demos」をご参照ください。
Compact
タグ
Topic にアタッチするタグ。
demo
Topic が作成されると、トピック管理 ページで Topic を表示できます。
ステップ 3: Filebeat を使用してメッセージを送信する
インストールされているマシンで Filebeat を起動し、作成した Topic にメッセージを送信します。
cd コマンドを実行して、Filebeat のインストールディレクトリに切り替えます。
次のコマンドを実行して CA 証明書ファイルをダウンロードします。
wget https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20220826/ytsw/only-4096-ca-certoutput.conf 設定ファイルを作成します。
vim output.confコマンドを実行して設定ファイルを作成し、開きます。i キーを押して挿入モードに入ります。
次の内容を入力します。
filebeat.inputs: - type: stdin output.kafka: hosts: ["alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093", "alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093", "alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093"] username: "alikafka_pre-cn-v641e1d***" password: "aeN3WLRoMPRXmAP2jvJuGk84Kuuo***" topic: 'filebeat_test' partition.round_robin: reachable_only: false ssl.certificate_authorities: ["/home/admin/filebeat/filebeat-7.7.0-linux-x86_64/tasks/vpc_ssl/ca-cert"] ssl.verification_mode: none required_acks: 1 compression: none max_message_bytes: 1000000パラメーター
説明
例
hosts
ApsaraMQ for Kafka によって提供されるパブリックエンドポイントは SSL エンドポイントです。
alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093, alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093, alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
username
インターネットおよび VPC 接続インスタンスのユーザー名。
alikafka_pre-cn-v641e1d***
password
インターネットおよび VPC 接続インスタンスのパスワード。
aeN3WLRoMPRXmAP2jvJuGk84Kuuo***
topic
Topic の名前。
filebeat_test
reachable_only
メッセージが利用可能なパーティションにのみ送信されるかどうかを指定します。有効な値:
true: プライマリパーティションが利用できない場合、出力がブロックされる可能性があります。
false: プライマリパーティションが利用できない場合でも、出力はブロックされません。
false
ssl.certificate_authorities
CA 証明書が保存されているパス。
/home/admin/filebeat/filebeat-7.7.0-linux-x86_64/ca-cert
ssl.verification_mode
認証モード。
none
required_acks
確認応答 (ACK) の信頼性レベル。有効な値:
0: 応答は不要です。
1: システムはローカルコミットを待ちます。
-1: システムはすべてのレプリカがコミットするのを待ちます。
デフォルト値: 1。
1
compression
データ圧縮エンコーダー。デフォルト値: gzip。有効な値:
None。
snappy: 圧縮および展開用の C++ 開発パッケージ。
lz4: 圧縮および展開速度に重点を置いたロスレスデータ圧縮アルゴリズム。
gzip: GNU フリーソフトウェア用のファイル圧縮プログラム。
none
max_message_bytes
最大メッセージサイズ (バイト単位)。デフォルト値: 1000000。この値は、ApsaraMQ for Kafka インスタンスに設定した最大メッセージサイズより小さくする必要があります。
1000000
パラメーターの詳細については、「Kafka output plugin」をご参照ください。
Esc キーを押して CLI モードに戻ります。
: キーを押してボトムラインモードに入ります。wq と入力して Enter キーを押し、ファイルを保存して終了します。
作成した Topic にメッセージを送信します。
./filebeat -c ./output.ymlを実行します。test と入力して Enter キーを押します。
ステップ 4: Topic パーティションの表示
Topic に送信されたメッセージのステータスを表示できます。
ApsaraMQ for Kafka コンソールにログインします。
概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが配置されているリージョンを選択します。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、トピック管理 をクリックします。
トピック管理 ページで、管理する Topic の名前をクリックします。トピックの詳細 ページで、パーテーションステータス タブをクリックします。名前
表 1. パーティションステータスに含まれるパラメーター
パラメーター
説明
パーティション ID
パーティション ID。
最小オフセット
パーティション内の最小オフセット。
最大オフセット
パーティション内の最大オフセット。
メッセージ
パーティション内のメッセージの数。
最終更新日時
パーティション内の最後のメッセージが保存された時刻。

ステップ 5: オフセットによるメッセージのクエリ
パーティション ID とオフセットによって送信されたメッセージをクエリできます。
ApsaraMQ for Kafka コンソールにログインします。
概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが配置されているリージョンを選択します。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、メッセージクエリ をクリックします。
メッセージクエリ ページで、クエリモード ドロップダウンリストから 位置によるクエリ を選択します。
Topic ドロップダウンリストから Topic を選択し、パーティション ドロップダウンリストからパーティションを選択し、開始点 フィールドにオフセット値を入力してから、クエリ をクリックします。
指定したオフセット値以上のオフセット値を持つメッセージが表示されます。たとえば、Partition パラメーターと Offset パラメーターの値として 5 を指定した場合、システムはパーティション 5 からオフセットが 5 以上のメッセージをクエリします。
表 2. メッセージクエリ結果に含まれるパラメーター
パラメーター
説明
パーティション
メッセージが取得されたパーティション。
オフセット
メッセージのオフセット。
Key
メッセージキー。キーは文字列に変換されます。
Value
メッセージ値。メッセージ内容とも呼ばれます。メッセージ値は文字列に変換されます。
メッセージ作成時間
メッセージが送信された時点。値は、メッセージが送信されたときにクライアントが記録したタイムスタンプ、または
ProducerRecordオブジェクトに指定したタイムスタンプフィールドの値です。説明タイムスタンプフィールドに値を指定した場合、指定した値が表示されます。
タイムスタンプフィールドに値を指定しなかった場合、メッセージが送信されたときのシステム時刻が表示されます。
1970/x/x x:x:x 形式の値は、タイムスタンプフィールドが 0 または無効な値として指定されていることを示します。
ApsaraMQ for Kafka バージョン 0.9 以前のクライアントでは、タイムスタンプフィールドを指定できません。
操作
キーのダウンロード をクリックしてメッセージキーをダウンロードします。
値のダウンロード をクリックしてメッセージ内容をダウンロードします。
重要取得した各メッセージの最大 1 KB のコンテンツが ApsaraMQ for Kafka コンソールに表示されます。取得したメッセージのサイズが 1 KB を超える場合、システムは自動的にコンテンツを切り捨てます。完全なメッセージ内容を表示するには、メッセージをダウンロードしてください。
一度に最大 10 MB の取得済みメッセージをダウンロードできます。取得したメッセージのサイズが 10 MB を超える場合、最初の 10 MB のメッセージ内容のみがダウンロードできます。