すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:VPC 内で ApsaraMQ for Kafka を Logstash の入力として使用

最終更新日:Mar 12, 2026

Logstash は、入力、フィルター、出力のパイプラインを通じてデータを処理します。ApsaraMQ for Kafka インスタンスを入力として接続することで、Logstash は Kafka のトピックからメッセージを消費し、Elasticsearch、ファイルストレージ、または標準出力 (stdout) などの送信先に転送します。

このアーキテクチャには、次の 2 つの利点があります:

  • 非同期処理: Kafka がメッセージをバッファリングするため、Logstash は自身のペースでメッセージを処理でき、トラフィックスパイク時のデータ損失を防ぎます。

  • デカップリング: Elasticsearch などのダウンストリームシステムがオフラインになっても、Kafka はシステムが回復するまでメッセージを保持します。アップストリームのプロデューサーは影響を受けません。

このトピックでは、仮想プライベートクラウド (VPC) 接続を介して ApsaraMQ for Kafka インスタンスからメッセージを消費するように Logstash を設定する方法について説明します。

前提条件

開始する前に、以下をご確認ください:

  • ApsaraMQ for Kafka インスタンスが購入およびデプロイされていること。詳細については、「VPC 接続インスタンスの購入とデプロイ」をご参照ください。

  • Logstash がインストールされていること。詳細については、「Logstash のダウンロード」をご参照ください。

  • Java 開発キット (JDK) 8 がインストールされていること。詳細については、「Java 8」のダウンロードページをご参照ください。

  • Logstash の Kafka 入力プラグインが利用可能であること。次のコマンドを実行して確認します。プラグインが表示されない場合は、インストールしてください:

      bin/logstash-plugin list | grep logstash-input-kafka
      bin/logstash-plugin install logstash-input-kafka

ステップ 1:エンドポイントの取得

Logstash は VPC エンドポイントを介して ApsaraMQ for Kafka に接続します。ApsaraMQ for Kafka は、2 種類の VPC エンドポイントを提供します:

エンドポイントタイプポート使用するケース
デフォルトエンドポイント9092認証なしの標準アクセス
Simple Authentication and Security Layer (SASL) エンドポイント9094認証アクセス。アクセス制御リスト (ACL) 機能を有効にする必要があります。詳細については、「ACL 機能の有効化」をご参照ください。

詳細については、「エンドポイント間の比較」をご参照ください。

エンドポイントを取得するには:

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. [概要] ページの [リソース配布] セクションで、インスタンスがデプロイされているリージョンを選択します。

  3. [インスタンス] ページで、ターゲットインスタンスの名前をクリックします。

  4. [インスタンス詳細] ページで、[エンドポイント情報] セクションのエンドポイントを見つけます。SASL エンドポイントを使用する場合は、[設定情報] セクションの [ユーザー名][パスワード] の値をメモしておきます。

endpoint

ステップ 2:トピックの作成

Logstash が消費するメッセージを保持するためのトピックを作成します。

重要

Elastic Compute Service (ECS) インスタンスと同じリージョンにトピックを作成してください。トピックはリージョンをまたいで使用することはできません。たとえば、プロデューサーとコンシューマーが中国 (北京) リージョンの ECS インスタンスで実行されている場合、トピックも中国 (北京) リージョンに存在する必要があります。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. [概要] ページの [リソース配布] セクションで、インスタンスがデプロイされているリージョンを選択します。

  3. [インスタンス] ページで、ターゲットインスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで、[Topics] をクリックします。

  5. [Topics] ページで、[トピックの作成] をクリックします。

  6. [トピックの作成] パネルで、次のパラメーターを設定し、[OK] をクリックします。

パラメーター説明
名前トピック名。demo
説明トピックの説明。demo test
パーティションパーティションの数。12
ストレージエンジンストレージエンジンのタイプ。Professional Edition インスタンスでのみ利用可能です。Standard Edition インスタンスは、デフォルトでクラウドストレージを使用します。オプション: [Cloud Storage] -- 分散モードで 3 つのレプリカを持つ Alibaba Cloud ディスクを使用します。低レイテンシー、高性能、高信頼性を提供します。インスタンスエディションが [Standard (高書き込み)] の場合に必要です。[Local Storage] -- 分散モードで 3 つのレプリカを持つオープンソースの Apache Kafka の同期レプリカ (ISR) アルゴリズムを使用します。クラウド ストレージ
メッセージタイプメッセージの順序タイプ。[Normal Message] -- 同じキーを持つメッセージは、送信順に同じパーティションに保存されます。ブローカーの障害発生時にパーティションの順序が維持されない場合があります。[Storage Engine][Cloud Storage] の場合に自動的に選択されます。[Partitionally Ordered Message] -- 同じキーを持つメッセージは、ブローカーの障害発生時でも同じパーティション内で順序が維持されます。一部のパーティションが一時的に利用できなくなる場合があります。[Storage Engine][Local Storage] の場合に自動的に選択されます。通常メッセージ
ログクリーンアップポリシー[Storage Engine][Local Storage] (Professional Edition のみ) の場合にのみ利用可能なログクリーンアップポリシー。[Delete] -- デフォルトのポリシー。最大保持期間に基づいてメッセージを保持します。ストレージ使用量が 85% を超えると、最も古いメッセージから削除されます。[Compact] -- 各メッセージキーの最新の値のみを保持します。Kafka Connect や Confluent Schema Registry などのコンポーネントで使用されます。詳細については、「aliware-kafka-demos」をご参照ください。
重要

ログ圧縮トピックは、Kafka Connect や Confluent Schema Registry などの特定のクラウドネイティブコンポーネントでのみ使用できます。

[Compact]
タグトピックにアタッチするタグ。demo

トピックが作成されると、[Topics] ページに表示されます。

ステップ 3:テストメッセージの送信

テストメッセージを送信して、トピックが消費可能な状態であることを確認します。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. ターゲットインスタンスに移動し、左側のナビゲーションウィンドウで [Topics] をクリックします。

  3. トピック名をクリックし、[トピック詳細] ページの右上隅にある [メッセージの送信] をクリックします。

  4. [メッセージの送受信を開始] パネルで、[送信方法] ([コンソール][Docker]、または [SDK]) を選択してテストメッセージを送信します。 メッセージを特定のパーティションに送信する場合は、パーティション ID を指定できます。 パーティション ID のクエリ方法については、「パーティションステータスの表示」をご参照ください。

ステップ 4:コンシューマーグループの作成

Logstash が消費オフセットを追跡するためのコンシューマーグループを作成します。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. ターゲットインスタンスに移動し、左側のナビゲーションウィンドウで [Groups] をクリックします。

  3. [Groups] ページで、[グループの作成] をクリックします。

  4. [グループの作成] パネルで、[グループ ID][説明] を入力し、必要に応じてタグをアタッチして、[OK] をクリックします。

コンシューマーグループが作成されると、[Groups] ページに表示されます。

ステップ 5:Logstash の設定と実行

  1. Logstash のインストールディレクトリに移動します。

  2. bin ディレクトリに input.conf という名前の設定ファイルを作成します:

input {
  kafka {
    bootstrap_servers => "<your-kafka-endpoint>"       # VPC エンドポイント (例:alikafka-pre-cn-zv****-1-vpc.alikafka.aliyuncs.com:9092)
    group_id          => "<your-consumer-group-id>"     # ステップ 4 で作成したコンシューマーグループ
    topics            => ["<your-topic-name>"]          # ステップ 2 で作成したトピック
    consumer_threads  => 12                             # トピックのパーティション数に合わせてください
    auto_offset_reset => "earliest"                     # 最も古いメッセージから開始
  }
}
output {
  stdout { codec => rubydebug }
}
  1. Logstash を実行します:

./logstash -f input.conf

設定パラメーター

パラメーター説明
bootstrap_serversApsaraMQ for Kafka インスタンスの VPC エンドポイントのカンマ区切りリスト。デフォルトエンドポイントまたは SASL エンドポイント。alikafka-pre-cn-zv\*\*\*\*-1-vpc.alikafka.aliyuncs.com:9092
group_idコンシューマーグループの識別子。logstash_group
topics消費するトピック名。logstash_test
consumer_threadsコンシューマースレッドの数。トピックのパーティション数に設定することを推奨します。12
auto_offset_resetオフセットリセット戦略。earliest:利用可能な最初のメッセージから消費します。latest:新しいメッセージのみを消費します。earliest

メッセージ消費の確認

Logstash が起動すると、消費されたメッセージが Ruby デバッグ形式で標準出力 (stdout) に出力されます:

logstash_5

関連情報