ApsaraMQ for Kafka インスタンスを Logstash に入力として接続できます。このトピックでは、Logstash を使用してインターネット経由で ApsaraMQ for Kafka インスタンスからメッセージを消費する方法について説明します。
前提条件
開始する前に、次のタスクを完了してください:
ApsaraMQ for Kafka インスタンスを購入してデプロイします。このトピックでは、非サーバーレスインスタンスを例として使用します。詳細については、「インターネットおよび VPC 経由でインスタンスに接続する」をご参照ください。
Logstash をダウンロードしてインストールします。詳細については、「Logstash のダウンロード」をご参照ください。
JDK 8 をダウンロードしてインストールします。詳細については、「JDK 8 のダウンロード」をご参照ください。
ステップ 1: アクセス情報を取得する
Logstash は、エンドポイントを使用して ApsaraMQ for Kafka に接続します。認証には、ApsaraMQ for Kafka インスタンスのユーザー名とパスワードが必要です。
ApsaraMQ for Kafka コンソールにログインします。
概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
インスタンスリスト ページで、Logstash に入力として接続するインスタンスの名前をクリックします。
インスタンスの詳細 ページの アクセスポイント情報 セクションで、インスタンスのエンドポイントを表示します。設定情報 セクションで、ユーザー名 および パスワード パラメーターの値を取得します。
説明さまざまなタイプのエンドポイントの違いについては、「エンドポイントの比較」をご参照ください。
ステップ 2: Topic を作成する
メッセージを保存するための Topic を作成します。
ApsaraMQ for Kafka コンソールにログインします。
概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
重要Elastic Compute Service (ECS) インスタンスがデプロイされているリージョンに Topic を作成する必要があります。Topic はリージョンをまたいで使用することはできません。たとえば、メッセージのプロデューサーとコンシューマーが中国 (北京) リージョンにデプロイされている ECS インスタンス上で実行されている場合、Topic も中国 (北京) リージョンに作成する必要があります。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、トピック管理 をクリックします。
トピック管理 ページで、トピックの作成 をクリックします。
トピックの作成 パネルで、Topic のプロパティを指定し、[OK] をクリックします。
パラメーター
説明
例
名前
Topic 名。
demo
記述
Topic の説明。
demo test
パーティションの数
Topic 内のパーティションの数。
12
ストレージエンジン
説明ストレージエンジンのタイプを指定できるのは、非サーバーレスの Professional Edition インスタンスを使用する場合のみです。他のタイプのインスタンスでは、デフォルトで [クラウドストレージ] が選択されます。
Topic 内のメッセージを保存するために使用されるストレージエンジンのタイプ。
ApsaraMQ for Kafka は、次のタイプのストレージエンジンをサポートしています:
クラウドストレージ: この値を選択すると、システムは Topic に Alibaba Cloud ディスクを使用し、分散モードで 3 つのレプリカにデータを保存します。このストレージエンジンは、低レイテンシー、高性能、高耐久性、高信頼性を特徴としています。インスタンスを作成するときに 仕様タイプ パラメーターを Standard Edition (High Write) に設定した場合、このパラメーターは クラウドストレージ にのみ設定できます。
ローカルストレージ: この値を選択すると、システムはオープンソースの Apache Kafka の In-Sync Replicas (ISR) アルゴリズムを使用し、分散モードで 3 つのレプリカにデータを保存します。
クラウドストレージ
メッセージタイプ
Topic のメッセージタイプ。有効な値:
通常のメッセージ: デフォルトでは、同じキーを持つメッセージは、メッセージが送信された順序で同じパーティションに保存されます。クラスター内のブローカーに障害が発生した場合、パーティションに保存されているメッセージの順序は保持されないことがあります。ストレージエンジン パラメーターを クラウドストレージ に設定すると、このパラメーターは自動的に 通常のメッセージ に設定されます。
パーティション順位メッセージ: デフォルトでは、同じキーを持つメッセージは、メッセージが送信された順序で同じパーティションに保存されます。クラスター内のブローカーに障害が発生した場合でも、メッセージは送信された順序でパーティションに保存されます。一部のパーティションのメッセージは、パーティションが復元されるまで送信できません。ストレージエンジン パラメーターを ローカルストレージ に設定すると、このパラメーターは自動的に パーティション順位メッセージ に設定されます。
通常のメッセージ
ログリリースポリシー
Topic で使用されるログクリーンアップポリシー。
ストレージエンジン パラメーターを ローカルストレージ に設定する場合は、ログリリースポリシー パラメーターを設定する必要があります。ストレージエンジンパラメーターをローカルストレージに設定できるのは、ApsaraMQ for Kafka Professional Edition インスタンスを使用する場合のみです。
ApsaraMQ for Kafka は、次のログクリーンアップポリシーを提供します:
Delete: デフォルトのログクリーンアップポリシー。システムに十分なストレージ容量がある場合、メッセージは最大保持期間に基づいて保持されます。ストレージ使用量が 85% を超えると、システムはサービスの可用性を確保するために最も古いメッセージを削除します。
Compact: Apache Kafka で使用されるログ圧縮ポリシー。ログ圧縮により、同じキーを持つメッセージの最新の値が保持されます。このポリシーは、障害が発生したシステムの復元や、システム再起動後のキャッシュの再読み込みなどのシナリオに適しています。たとえば、Kafka Connect または Confluent Schema Registry を使用する場合、システムステータスと構成に関する情報をログ圧縮された Topic に保存する必要があります。
重要ログ圧縮された Topic は、Kafka Connect や Confluent Schema Registry などの特定のクラウドネイティブコンポーネントでのみ使用できます。詳細については、「aliware-kafka-demos」をご参照ください。
Compact
タグ
Topic にアタッチするタグ。
demo
Topic が作成されると、トピック管理 ページで Topic を表示できます。
ステップ 3: メッセージを送信する
作成した Topic にメッセージを送信します。
ApsaraMQ for Kafka コンソールにログインします。
概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、トピック管理 をクリックします。
トピック管理 ページで、管理する Topic の名前をクリックします。トピックの詳細 ページの右上隅にある メッセージの送信を体験する をクリックします。名前
メッセージ送受信のクイック体験 パネルで、テスト用のメッセージを送信するためのパラメーターを設定します。
送信方法 パラメーターを コンソール に設定した場合は、次の手順を実行します:
メッセージキー フィールドにメッセージキーを入力します。例: demo。
メッセージの内容 フィールドにメッセージ内容を入力します。例: {"key": "test"}。
指定されたパーティションに送信 パラメーターを設定して、テストメッセージを特定のパーティションに送信するかどうかを指定します。
テストメッセージを特定のパーティションに送信する場合は、はい をクリックし、パーティション ID フィールドにパーティション ID を入力します。例: 0。パーティション ID のクエリ方法については、「パーティションステータスの表示」をご参照ください。
テストメッセージを特定のパーティションに送信しない場合は、いいえ をクリックします。
ApsaraMQ for Kafka SDK を使用するか、[メッセージの送受信を開始] パネルに表示される Docker コマンドを実行して、テストメッセージをサブスクライブします。
送信方法 パラメーターを [Docker] に設定した場合は、次の手順を実行して Docker コンテナーを実行します:
Docker コンテナーを実行してサンプルメッセージを生成する セクションに表示される Docker コマンドを実行して、テストメッセージを送信します。
送信後にメッセージを消費するにはどうすればよいですか? セクションに表示される Docker コマンドを実行して、テストメッセージをサブスクライブします。
送信方法 パラメーターを [SDK] に設定した場合は、必要なプログラミング言語またはフレームワークの SDK とアクセス方法を選択して、テストメッセージを送信およびサブスクライブします。
ステップ 4: グループを作成する
Logstash 用の Group を作成します。
ApsaraMQ for Kafka コンソールにログインします。
概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、Group の管理 をクリックします。
Group の管理 ページで、グループの作成 をクリックします。
グループの作成 パネルで、Group ID フィールドにグループ名、記述 フィールドにグループの説明を入力し、グループにタグをアタッチしてから、[OK] をクリックします。
コンシューマーグループを作成すると、Group の管理 ページでコンシューマーグループを表示できます。
ステップ 5: Logstash を使用してメッセージを消費する
Logstash がインストールされているマシンで Logstash を起動し、作成した Topic からメッセージを消費します。
cd コマンドを実行して、Logstash の bin ディレクトリに切り替えます。
次のコマンドを実行して、kafka.client.truststore.jks 証明書ファイルをダウンロードします。
wget -O kafka.client.truststore.jks https://github.com/AliwareMQ/aliware-kafka-demos/blob/master/kafka-log-stash-demo/vpc-ssl/mix.4096.client.truststore.jksjaas.conf という名前の構成ファイルを作成します。
vim jaas.confコマンドを実行して、空の構成ファイルを作成します。i キーを押して挿入モードに入ります。
次の内容を入力します。
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX"; };パラメーター
説明
例
username
インターネットおよび VPC タイプのインスタンスのユーザー名。
alikafka_pre-cn-v0h1***
password
インターネットおよび VPC タイプのインスタンスのパスワード。
GQiSmqbQVe3b9hdKLDcIlkrBK6***
Esc キーを押して CLI モードに戻ります。
: キーを押してボトムラインモードに入ります。wq と入力して Enter キーを押し、ファイルを保存して終了します。
input.conf という名前の構成ファイルを作成します。
vim input.confコマンドを実行して、空の構成ファイルを作成します。i キーを押して挿入モードに入ります。
次の内容を入力します。
input { kafka { bootstrap_servers => "alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093" topics => ["logstash_test"] security_protocol => "SASL_SSL" sasl_mechanism => "PLAIN" jaas_path => "/home/logstash-7.6.2/bin/jaas.conf" ssl_truststore_password => "KafkaOnsClient" ssl_truststore_location => "/home/logstash-7.6.2/bin/kafka.client.truststore.jks" ssl_endpoint_identification_algorithm => "" group_id => "logstash_group" consumer_threads => 3 auto_offset_reset => "earliest" } } output { stdout { codec => rubydebug } }パラメーター
説明
例
bootstrap_servers
ApsaraMQ for Kafka によって提供されるインターネットエンドポイントは SSL エンドポイントです。
alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
topics
Topic の名前。
logstash_test
security_protocol
セキュリティプロトコル。デフォルト値は SASL_SSL です。この値を変更する必要はありません。
SASL_SSL
sasl_mechanism
セキュリティ認証メカニズム。デフォルト値は PLAIN です。この値を変更する必要はありません。
PLAIN
jaas_path
jaas.conf 構成ファイルのパス。
/home/logstash-7.6.2/bin/jaas.conf
ssl_truststore_password
kafka.client.truststore.jks 証明書のパスワード。デフォルト値は KafkaOnsClient です。この値を変更する必要はありません。
KafkaOnsClient
ssl_truststore_location
kafka.client.truststore.jks 証明書のパス。
/home/logstash-7.6.2/bin/kafka.client.truststore.jks
ssl_endpoint_identification_algorithm
このパラメーターは Logstash 6.x 以降で必要です。
空の値
group_id
コンシューマーグループの名前。
logstash_group
consumer_threads
コンシューマースレッドの数。このパラメーターは、Topic のパーティション数と同じ値に設定することをお勧めします。
3
auto_offset_reset
オフセットをリセットします。有効な値:
earliest: 最も古いメッセージを読み取ります。
latest: 最新のメッセージを読み取ります。
earliest
Esc キーを押して CLI モードに戻ります。
: キーを押してボトムラインモードに入ります。wq と入力して Enter キーを押し、ファイルを保存して終了します。
次のコマンドを実行してメッセージを消費します。
./logstash -f input.conf次の出力が返されます。

詳細情報
パラメーター設定の詳細については、「Kafka 入力プラグイン」をご参照ください。