Logstash は、入力、フィルター、出力のパイプラインを通じてデータを処理します。ApsaraMQ for Kafka インスタンスを入力として接続することで、Logstash は Kafka のトピックからメッセージを消費し、Elasticsearch、ファイルストレージ、または標準出力 (stdout) などの送信先に転送します。
このアーキテクチャには、次の 2 つの利点があります:
非同期処理: Kafka がメッセージをバッファリングするため、Logstash は自身のペースでメッセージを処理でき、トラフィックスパイク時のデータ損失を防ぎます。
デカップリング: Elasticsearch などのダウンストリームシステムがオフラインになっても、Kafka はシステムが回復するまでメッセージを保持します。アップストリームのプロデューサーは影響を受けません。
このトピックでは、仮想プライベートクラウド (VPC) 接続を介して ApsaraMQ for Kafka インスタンスからメッセージを消費するように Logstash を設定する方法について説明します。
前提条件
開始する前に、以下をご確認ください:
ApsaraMQ for Kafka インスタンスが購入およびデプロイされていること。詳細については、「VPC 接続インスタンスの購入とデプロイ」をご参照ください。
Logstash がインストールされていること。詳細については、「Logstash のダウンロード」をご参照ください。
Java 開発キット (JDK) 8 がインストールされていること。詳細については、「Java 8」のダウンロードページをご参照ください。
Logstash の Kafka 入力プラグインが利用可能であること。次のコマンドを実行して確認します。プラグインが表示されない場合は、インストールしてください:
bin/logstash-plugin list | grep logstash-input-kafkabin/logstash-plugin install logstash-input-kafka
ステップ 1:エンドポイントの取得
Logstash は VPC エンドポイントを介して ApsaraMQ for Kafka に接続します。ApsaraMQ for Kafka は、2 種類の VPC エンドポイントを提供します:
| エンドポイントタイプ | ポート | 使用するケース |
|---|---|---|
| デフォルトエンドポイント | 9092 | 認証なしの標準アクセス |
| Simple Authentication and Security Layer (SASL) エンドポイント | 9094 | 認証アクセス。アクセス制御リスト (ACL) 機能を有効にする必要があります。詳細については、「ACL 機能の有効化」をご参照ください。 |
詳細については、「エンドポイント間の比較」をご参照ください。
エンドポイントを取得するには:
ApsaraMQ for Kafka コンソールにログインします。
[概要] ページの [リソース配布] セクションで、インスタンスがデプロイされているリージョンを選択します。
[インスタンス] ページで、ターゲットインスタンスの名前をクリックします。
[インスタンス詳細] ページで、[エンドポイント情報] セクションのエンドポイントを見つけます。SASL エンドポイントを使用する場合は、[設定情報] セクションの [ユーザー名] と [パスワード] の値をメモしておきます。

ステップ 2:トピックの作成
Logstash が消費するメッセージを保持するためのトピックを作成します。
Elastic Compute Service (ECS) インスタンスと同じリージョンにトピックを作成してください。トピックはリージョンをまたいで使用することはできません。たとえば、プロデューサーとコンシューマーが中国 (北京) リージョンの ECS インスタンスで実行されている場合、トピックも中国 (北京) リージョンに存在する必要があります。
ApsaraMQ for Kafka コンソールにログインします。
[概要] ページの [リソース配布] セクションで、インスタンスがデプロイされているリージョンを選択します。
[インスタンス] ページで、ターゲットインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、[Topics] をクリックします。
[Topics] ページで、[トピックの作成] をクリックします。
[トピックの作成] パネルで、次のパラメーターを設定し、[OK] をクリックします。
| パラメーター | 説明 | 例 |
|---|---|---|
| 名前 | トピック名。 | demo |
| 説明 | トピックの説明。 | demo test |
| パーティション | パーティションの数。 | 12 |
| ストレージエンジン | ストレージエンジンのタイプ。Professional Edition インスタンスでのみ利用可能です。Standard Edition インスタンスは、デフォルトでクラウドストレージを使用します。オプション: [Cloud Storage] -- 分散モードで 3 つのレプリカを持つ Alibaba Cloud ディスクを使用します。低レイテンシー、高性能、高信頼性を提供します。インスタンスエディションが [Standard (高書き込み)] の場合に必要です。[Local Storage] -- 分散モードで 3 つのレプリカを持つオープンソースの Apache Kafka の同期レプリカ (ISR) アルゴリズムを使用します。 | クラウド ストレージ |
| メッセージタイプ | メッセージの順序タイプ。[Normal Message] -- 同じキーを持つメッセージは、送信順に同じパーティションに保存されます。ブローカーの障害発生時にパーティションの順序が維持されない場合があります。[Storage Engine] が [Cloud Storage] の場合に自動的に選択されます。[Partitionally Ordered Message] -- 同じキーを持つメッセージは、ブローカーの障害発生時でも同じパーティション内で順序が維持されます。一部のパーティションが一時的に利用できなくなる場合があります。[Storage Engine] が [Local Storage] の場合に自動的に選択されます。 | 通常メッセージ |
| ログクリーンアップポリシー | [Storage Engine] が [Local Storage] (Professional Edition のみ) の場合にのみ利用可能なログクリーンアップポリシー。[Delete] -- デフォルトのポリシー。最大保持期間に基づいてメッセージを保持します。ストレージ使用量が 85% を超えると、最も古いメッセージから削除されます。[Compact] -- 各メッセージキーの最新の値のみを保持します。Kafka Connect や Confluent Schema Registry などのコンポーネントで使用されます。詳細については、「aliware-kafka-demos」をご参照ください。 重要 ログ圧縮トピックは、Kafka Connect や Confluent Schema Registry などの特定のクラウドネイティブコンポーネントでのみ使用できます。 | [Compact] |
| タグ | トピックにアタッチするタグ。 | demo |
トピックが作成されると、[Topics] ページに表示されます。
ステップ 3:テストメッセージの送信
テストメッセージを送信して、トピックが消費可能な状態であることを確認します。
ApsaraMQ for Kafka コンソールにログインします。
ターゲットインスタンスに移動し、左側のナビゲーションウィンドウで [Topics] をクリックします。
トピック名をクリックし、[トピック詳細] ページの右上隅にある [メッセージの送信] をクリックします。
[メッセージの送受信を開始] パネルで、[送信方法] ([コンソール]、[Docker]、または [SDK]) を選択してテストメッセージを送信します。 メッセージを特定のパーティションに送信する場合は、パーティション ID を指定できます。 パーティション ID のクエリ方法については、「パーティションステータスの表示」をご参照ください。
ステップ 4:コンシューマーグループの作成
Logstash が消費オフセットを追跡するためのコンシューマーグループを作成します。
ApsaraMQ for Kafka コンソールにログインします。
ターゲットインスタンスに移動し、左側のナビゲーションウィンドウで [Groups] をクリックします。
[Groups] ページで、[グループの作成] をクリックします。
[グループの作成] パネルで、[グループ ID] と [説明] を入力し、必要に応じてタグをアタッチして、[OK] をクリックします。
コンシューマーグループが作成されると、[Groups] ページに表示されます。
ステップ 5:Logstash の設定と実行
Logstash のインストールディレクトリに移動します。
binディレクトリにinput.confという名前の設定ファイルを作成します:
input {
kafka {
bootstrap_servers => "<your-kafka-endpoint>" # VPC エンドポイント (例:alikafka-pre-cn-zv****-1-vpc.alikafka.aliyuncs.com:9092)
group_id => "<your-consumer-group-id>" # ステップ 4 で作成したコンシューマーグループ
topics => ["<your-topic-name>"] # ステップ 2 で作成したトピック
consumer_threads => 12 # トピックのパーティション数に合わせてください
auto_offset_reset => "earliest" # 最も古いメッセージから開始
}
}
output {
stdout { codec => rubydebug }
}Logstash を実行します:
./logstash -f input.conf設定パラメーター
| パラメーター | 説明 | 例 |
|---|---|---|
bootstrap_servers | ApsaraMQ for Kafka インスタンスの VPC エンドポイントのカンマ区切りリスト。デフォルトエンドポイントまたは SASL エンドポイント。 | alikafka-pre-cn-zv\*\*\*\*-1-vpc.alikafka.aliyuncs.com:9092 |
group_id | コンシューマーグループの識別子。 | logstash_group |
topics | 消費するトピック名。 | logstash_test |
consumer_threads | コンシューマースレッドの数。トピックのパーティション数に設定することを推奨します。 | 12 |
auto_offset_reset | オフセットリセット戦略。earliest:利用可能な最初のメッセージから消費します。latest:新しいメッセージのみを消費します。 | earliest |
メッセージ消費の確認
Logstash が起動すると、消費されたメッセージが Ruby デバッグ形式で標準出力 (stdout) に出力されます:

関連情報
Kafka 入力プラグイン -- Logstash の Kafka 入力プラグインの完全なパラメーターリファレンス。
エンドポイント間の比較 -- ApsaraMQ for Kafkaのエンドポイントタイプの違い。