すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:Logstash からインターネット経由で ApsaraMQ for Kafka にメッセージを送信

最終更新日:Mar 12, 2026

Logstash は、SASL_SSL 認証を使用して、インターネット経由でログおよびイベントデータを ApsaraMQ for Kafka インスタンスに転送できます。このガイドでは、エンドポイントの取得、Topic の作成、Logstash の設定、メッセージの検証について説明します。

前提条件

開始する前に、以下をご確認ください:

ステップ 1: エンドポイントと認証情報の取得

Logstash は SSL エンドポイントを介して ApsaraMQ for Kafka に接続します。コンソールからエンドポイントと SASL 認証情報を取得します。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. [概要] ページの [リソース配布] セクションで、インスタンスが存在するリージョンを選択します。

  3. [インスタンス] ページで、ターゲットインスタンスの名前をクリックします。

  4. [インスタンス詳細] ページで、以下を収集します: endpoint

    • [エンドポイント情報] セクション: SSL エンドポイント (ポート 9093) をコピーします。

    • [設定情報] セクション: [ユーザー名][パスワード] をメモします。

説明

エンドポイントタイプに関する詳細については、「エンドポイントの比較」をご参照ください。

ステップ 2: Topic の作成

Logstash が送信するメッセージを受信するための Topic を作成します。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. [概要] ページの [リソース配布] セクションで、インスタンスが存在するリージョンを選択します。

    重要

    ご利用の Elastic Compute Service (ECS) インスタンスと同じリージョンに Topic を作成してください。Topic はリージョンをまたいで使用することはできません。たとえば、プロデューサーとコンシューマーが中国 (北京) リージョンの ECS インスタンスで実行されている場合、Topic も中国 (北京) に作成する必要があります。

  3. [インスタンス] ページで、ターゲットインスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで、[Topic] をクリックします。

  5. [トピック] ページで、[トピックの作成] をクリックします。

  6. [Topic の作成] パネルで、Topic のプロパティを設定し、[OK] をクリックします。Topic が作成されると、[Topic] ページに表示されます。

    パラメーター説明
    名前トピック名。demo
    説明トピックの簡単な説明。demo test
    パーティションパーティションの数。12
    ストレージエンジンストレージエンジンのタイプ。Professional Edition インスタンスでのみ設定可能です。Standard Edition はデフォルトで Cloud Storage になります。
    - Cloud Storage:3 つの分散レプリカを持つ Alibaba Cloud ディスクを使用します。低レイテンシー、高パフォーマンス、高い耐久性、高い信頼性を備えています。Standard (高書き込み) インスタンスに必須です。
    - Local Storage:オープンソースの Apache Kafka の同期レプリカ (ISR) アルゴリズムを使用し、3 つの分散レプリカを持ちます。




    Cloud Storage
    メッセージタイプメッセージの順序付け動作。
    - 通常メッセージ:同じキーを持つメッセージは、送信順に同じパーティションに送信されます。ブローカーに障害が発生した場合、順序は保証されないことがあります。ストレージエンジンCloud Storage の場合、自動的に選択されます。
    - パーティション順序付きメッセージ:同じキーを持つメッセージは、送信順に同じパーティションに送信されます。ブローカーに障害が発生した場合でも順序は保証されますが、影響を受けるパーティションは復元されるまで利用できません。ストレージエンジンLocal Storage の場合、自動的に選択されます。




    通常メッセージ
    ログクリーンアップポリシーストレージエンジンLocal Storage (Professional Edition) の場合にのみ設定可能です。
    - 削除:デフォルトのポリシーです。メッセージを最大保持期間まで保持します。ストレージ使用量が 85% を超えると、最も古いメッセージから削除されます。
    - 圧縮:各キーの最新の値のみを保持します。ログ圧縮されたトピックは、Kafka Connect や Confluent Schema Registry などの特定のクラウドネイティブコンポーネントでのみ使用できます。詳細については、「aliware-kafka-demos」をご参照ください。




    圧縮
    タグトピックにアタッチするオプションのタグ。demo

ステップ 3: Logstash の設定と実行

サーバー上で SSL 証明書、SASL 認証情報、Logstash 出力設定を行い、テストメッセージを送信します。

SSL 証明書のダウンロード

Logstash の bin ディレクトリに切り替え、トラストストア証明書をダウンロードします:

cd <logstash-install-dir>/bin
wget -O kafka.client.truststore.jks https://github.com/AliwareMQ/aliware-kafka-demos/raw/master/kafka-log-stash-demo/vpc-ssl/mix.4096.client.truststore.jks

JAAS 設定ファイルの作成

Logstash の bin ディレクトリに jaas.conf という名前のファイルを作成し、次の内容を記述します:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="<your-username>"
  password="<your-password>";
};

プレースホルダーを実際の値に置き換えます:

プレースホルダー説明
<your-username>[設定情報] セクションからの [ユーザー名]alikafka_pre-cn-v0h1\*\*\*
<your-password>[設定情報] セクションの [パスワード]GQiSmqbQVe3b9hdKLDcIlkrBK6\*\*\*
説明

jaas_path の設定は JVM 全体に適用されます。単一の Logstash インスタンスで複数の Kafka 出力を実行し、それぞれに異なる認証情報が必要な場合は、代わりにインラインの sasl_jaas_config パラメーターを使用してください。詳細については、「Kafka 出力プラグイン」リファレンスをご参照ください。

Logstash 出力設定ファイルの作成

Logstash の bin ディレクトリに output.conf という名前のファイルを作成し、次の内容を記述します:

input {
    stdin {}
}

output {
    stdout { codec => json }
    kafka {
        bootstrap_servers => "<your-endpoint>"
        topic_id => "<your-topic>"
        security_protocol => "SASL_SSL"
        sasl_mechanism => "PLAIN"
        jaas_path => "<logstash-install-dir>/bin/jaas.conf"
        ssl_truststore_password => "KafkaOnsClient"
        ssl_truststore_location => "<logstash-install-dir>/bin/kafka.client.truststore.jks"
        ssl_endpoint_identification_algorithm => ""
    }
}

これらのプレースホルダーを実際の値に置き換えます:

プレースホルダー説明
<your-endpoint>[エンドポイント情報] セクションの SSL エンドポイント (ポート 9093)。alikafka-pre-cn-zv\*\*\*\*\*-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv\*\*\*\*\*-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv\*\*\*\*\*-3.alikafka.aliyuncs.com:9093
<your-topic>ステップ 2 で作成した Topic の名前。logstash_test
<logstash-install-dir>Logstash のインストールディレクトリへの絶対パス。/home/logstash-7.6.2

残りのパラメーターは固定値を使用します。変更しないでください:

パラメーター固定値説明
security_protocolSASL_SSLインターネット接続用のセキュリティプロトコル。
sasl_mechanismPLAINSASL 認証メカニズム。
ssl_truststore_passwordKafkaOnsClientトラストストア証明書のパスワード。
ssl_endpoint_identification_algorithm"" (空の文字列)Logstash 6.x 以降で必須。ホスト名検証を無効にします。

テストメッセージの送信

  1. 出力設定を使用して Logstash を起動します:

       ./logstash -f output.conf
  2. Logstash が起動したら、test と入力して Enter キーを押します。stdout 出力はメッセージをローカルに JSON 形式で表示し、kafka 出力はメッセージをご利用の ApsaraMQ for Kafka インスタンスに送信します。

    output_result

結果の確認

コンソールでパーティションステータスの確認とメッセージのクエリを行い、メッセージがご利用の ApsaraMQ for Kafka インスタンスに到達したことを確認します。

パーティションステータスの確認

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. [概要] ページの [リソース配布] セクションで、インスタンスが存在するリージョンを選択します。

  3. [インスタンス] ページで、ターゲットインスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで、[Topic] をクリックします。

  5. Topic 名をクリックし、[Topic 詳細] ページで [パーティションステータス] タブをクリックします。

    パラメーター説明
    パーティション IDパーティションの ID。
    最小オフセットパーティション内の最も古いオフセット。
    最大オフセットパーティション内の最新のオフセット。
    メッセージパーティション内のメッセージの総数。
    最終更新日時最新のメッセージが保存された時刻。

    partition status

オフセットによるメッセージのクエリ

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. [概要] ページの [リソース配布] セクションで、インスタンスが存在するリージョンを選択します。

  3. [インスタンス] ページで、ターゲットインスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで、[メッセージクエリ] をクリックします。

  5. [検索方法] ドロップダウンリストから、[オフセットで検索] を選択します。

  6. [Topic] ドロップダウンリストから Topic を選択し、[パーティション] ドロップダウンリストからパーティションを選択し、[オフセット] フィールドにオフセット値を入力して、[検索] をクリックします。コンソールは、指定された値以上のオフセットを持つすべてのメッセージを返します。たとえば、[パーティション]5 に、[オフセット]5 に設定すると、パーティション 5 からオフセット 5 以上のすべてのメッセージが返されます。

    パラメーター説明
    パーティションメッセージが保存されているパーティション。
    オフセットパーティション内のメッセージオフセット。
    キーメッセージキー。文字列として表示されます。
    [値]メッセージ本文。文字列として表示されます。
    [作成日時]メッセージが送信されたときのタイムスタンプ。ProducerRecord のタイムスタンプフィールドに値を指定した場合、その値が表示されます。値を指定しなかった場合は、メッセージが送信されたときのローカルシステム時刻が表示されます。タイムスタンプフィールドが 0 または無効な値に設定されている場合、時刻は 1970/x/x x:x:x 形式で表示されます。ApsaraMQ for Kafka バージョン 0.9 以前のクライアントでは、このフィールドを設定できません。
    [操作][キーのダウンロード]: メッセージキーをダウンロードします。[値のダウンロード]: メッセージ本文をダウンロードします。コンソールにはメッセージごとに最大 1 KB まで表示されます。1 KB を超える内容を表示するには、メッセージをダウンロードしてください。一度にダウンロードできるメッセージは最大 10 MB です。

参考文献