すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:インターネット経由で Logstash に入力として接続する

最終更新日:Nov 09, 2025

ApsaraMQ for Kafka インスタンスを Logstash に入力として接続できます。このトピックでは、Logstash を使用してインターネット経由で ApsaraMQ for Kafka インスタンスからメッセージを消費する方法について説明します。

前提条件

開始する前に、次のタスクを完了してください:

ステップ 1: アクセス情報を取得する

Logstash は、エンドポイントを使用して ApsaraMQ for Kafka に接続します。認証には、ApsaraMQ for Kafka インスタンスのユーザー名とパスワードが必要です。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. 概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. インスタンスリスト ページで、Logstash に入力として接続するインスタンスの名前をクリックします。

  4. インスタンスの詳細 ページの アクセスポイント情報 セクションで、インスタンスのエンドポイントを表示します。設定情報 セクションで、ユーザー名 および パスワード パラメーターの値を取得します。

    endpoint

    説明

    さまざまなタイプのエンドポイントの違いについては、「エンドポイントの比較」をご参照ください。

ステップ 2: Topic を作成する

メッセージを保存するための Topic を作成します。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. 概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

    重要

    Elastic Compute Service (ECS) インスタンスがデプロイされているリージョンに Topic を作成する必要があります。Topic はリージョンをまたいで使用することはできません。たとえば、メッセージのプロデューサーとコンシューマーが中国 (北京) リージョンにデプロイされている ECS インスタンス上で実行されている場合、Topic も中国 (北京) リージョンに作成する必要があります。

  3. インスタンスリスト ページで、管理するインスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで、トピック管理 をクリックします。

  5. トピック管理 ページで、トピックの作成 をクリックします。

  6. トピックの作成 パネルで、Topic のプロパティを指定し、[OK] をクリックします。

    パラメーター

    説明

    名前

    Topic 名。

    demo

    記述

    Topic の説明。

    demo test

    パーティションの数

    Topic 内のパーティションの数。

    12

    ストレージエンジン

    説明

    ストレージエンジンのタイプを指定できるのは、非サーバーレスの Professional Edition インスタンスを使用する場合のみです。他のタイプのインスタンスでは、デフォルトで [クラウドストレージ] が選択されます。

    Topic 内のメッセージを保存するために使用されるストレージエンジンのタイプ。

    ApsaraMQ for Kafka は、次のタイプのストレージエンジンをサポートしています:

    • クラウドストレージ: この値を選択すると、システムは Topic に Alibaba Cloud ディスクを使用し、分散モードで 3 つのレプリカにデータを保存します。このストレージエンジンは、低レイテンシー、高性能、高耐久性、高信頼性を特徴としています。インスタンスを作成するときに 仕様タイプ パラメーターを Standard Edition (High Write) に設定した場合、このパラメーターは クラウドストレージ にのみ設定できます。

    • ローカルストレージ: この値を選択すると、システムはオープンソースの Apache Kafka の In-Sync Replicas (ISR) アルゴリズムを使用し、分散モードで 3 つのレプリカにデータを保存します。

    クラウドストレージ

    メッセージタイプ

    Topic のメッセージタイプ。有効な値:

    • 通常のメッセージ: デフォルトでは、同じキーを持つメッセージは、メッセージが送信された順序で同じパーティションに保存されます。クラスター内のブローカーに障害が発生した場合、パーティションに保存されているメッセージの順序は保持されないことがあります。ストレージエンジン パラメーターを クラウドストレージ に設定すると、このパラメーターは自動的に 通常のメッセージ に設定されます。

    • パーティション順位メッセージ: デフォルトでは、同じキーを持つメッセージは、メッセージが送信された順序で同じパーティションに保存されます。クラスター内のブローカーに障害が発生した場合でも、メッセージは送信された順序でパーティションに保存されます。一部のパーティションのメッセージは、パーティションが復元されるまで送信できません。ストレージエンジン パラメーターを ローカルストレージ に設定すると、このパラメーターは自動的に パーティション順位メッセージ に設定されます。

    通常のメッセージ

    ログリリースポリシー

    Topic で使用されるログクリーンアップポリシー。

    ストレージエンジン パラメーターを ローカルストレージ に設定する場合は、ログリリースポリシー パラメーターを設定する必要があります。ストレージエンジンパラメーターをローカルストレージに設定できるのは、ApsaraMQ for Kafka Professional Edition インスタンスを使用する場合のみです。

    ApsaraMQ for Kafka は、次のログクリーンアップポリシーを提供します:

    • Delete: デフォルトのログクリーンアップポリシー。システムに十分なストレージ容量がある場合、メッセージは最大保持期間に基づいて保持されます。ストレージ使用量が 85% を超えると、システムはサービスの可用性を確保するために最も古いメッセージを削除します。

    • Compact: Apache Kafka で使用されるログ圧縮ポリシー。ログ圧縮により、同じキーを持つメッセージの最新の値が保持されます。このポリシーは、障害が発生したシステムの復元や、システム再起動後のキャッシュの再読み込みなどのシナリオに適しています。たとえば、Kafka Connect または Confluent Schema Registry を使用する場合、システムステータスと構成に関する情報をログ圧縮された Topic に保存する必要があります。

      重要

      ログ圧縮された Topic は、Kafka Connect や Confluent Schema Registry などの特定のクラウドネイティブコンポーネントでのみ使用できます。詳細については、「aliware-kafka-demos」をご参照ください。

    Compact

    タグ

    Topic にアタッチするタグ。

    demo

    Topic が作成されると、トピック管理 ページで Topic を表示できます。

ステップ 3: メッセージを送信する

作成した Topic にメッセージを送信します。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. 概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. インスタンスリスト ページで、管理するインスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで、トピック管理 をクリックします。

  5. トピック管理 ページで、管理する Topic の名前をクリックします。トピックの詳細 ページの右上隅にある メッセージの送信を体験する をクリックします。名前

  6. メッセージ送受信のクイック体験 パネルで、テスト用のメッセージを送信するためのパラメーターを設定します。

    • 送信方法 パラメーターを コンソール に設定した場合は、次の手順を実行します:

      1. メッセージキー フィールドにメッセージキーを入力します。例: demo。

      2. メッセージの内容 フィールドにメッセージ内容を入力します。例: {"key": "test"}。

      3. 指定されたパーティションに送信 パラメーターを設定して、テストメッセージを特定のパーティションに送信するかどうかを指定します。

        • テストメッセージを特定のパーティションに送信する場合は、はい をクリックし、パーティション ID フィールドにパーティション ID を入力します。例: 0。パーティション ID のクエリ方法については、「パーティションステータスの表示」をご参照ください。

        • テストメッセージを特定のパーティションに送信しない場合は、いいえ をクリックします。

      4. ApsaraMQ for Kafka SDK を使用するか、[メッセージの送受信を開始] パネルに表示される Docker コマンドを実行して、テストメッセージをサブスクライブします。

    • 送信方法 パラメーターを [Docker] に設定した場合は、次の手順を実行して Docker コンテナーを実行します:

      1. Docker コンテナーを実行してサンプルメッセージを生成する セクションに表示される Docker コマンドを実行して、テストメッセージを送信します。

      2. 送信後にメッセージを消費するにはどうすればよいですか? セクションに表示される Docker コマンドを実行して、テストメッセージをサブスクライブします。

    • 送信方法 パラメーターを [SDK] に設定した場合は、必要なプログラミング言語またはフレームワークの SDK とアクセス方法を選択して、テストメッセージを送信およびサブスクライブします。

ステップ 4: グループを作成する

Logstash 用の Group を作成します。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. 概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. インスタンスリスト ページで、管理するインスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで、Group の管理 をクリックします。

  5. Group の管理 ページで、グループの作成 をクリックします。

  6. グループの作成 パネルで、Group ID フィールドにグループ名、記述 フィールドにグループの説明を入力し、グループにタグをアタッチしてから、[OK] をクリックします。

    コンシューマーグループを作成すると、Group の管理 ページでコンシューマーグループを表示できます。

ステップ 5: Logstash を使用してメッセージを消費する

Logstash がインストールされているマシンで Logstash を起動し、作成した Topic からメッセージを消費します。

  1. cd コマンドを実行して、Logstash の bin ディレクトリに切り替えます。

  2. 次のコマンドを実行して、kafka.client.truststore.jks 証明書ファイルをダウンロードします。

    wget -O kafka.client.truststore.jks https://github.com/AliwareMQ/aliware-kafka-demos/blob/master/kafka-log-stash-demo/vpc-ssl/mix.4096.client.truststore.jks
  3. jaas.conf という名前の構成ファイルを作成します。

    1. vim jaas.conf コマンドを実行して、空の構成ファイルを作成します。

    2. i キーを押して挿入モードに入ります。

    3. 次の内容を入力します。

      KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="XXX"
        password="XXX";
      };

      パラメーター

      説明

      username

      インターネットおよび VPC タイプのインスタンスのユーザー名。

      alikafka_pre-cn-v0h1***

      password

      インターネットおよび VPC タイプのインスタンスのパスワード。

      GQiSmqbQVe3b9hdKLDcIlkrBK6***

    4. Esc キーを押して CLI モードに戻ります。

    5. : キーを押してボトムラインモードに入ります。wq と入力して Enter キーを押し、ファイルを保存して終了します。

  4. input.conf という名前の構成ファイルを作成します。

    1. vim input.conf コマンドを実行して、空の構成ファイルを作成します。

    2. i キーを押して挿入モードに入ります。

    3. 次の内容を入力します。

      input {
          kafka {
              bootstrap_servers => "alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093"
              topics => ["logstash_test"]
      
              security_protocol => "SASL_SSL"
              sasl_mechanism => "PLAIN"
      
              jaas_path => "/home/logstash-7.6.2/bin/jaas.conf"
      
              ssl_truststore_password => "KafkaOnsClient"
              ssl_truststore_location => "/home/logstash-7.6.2/bin/kafka.client.truststore.jks"
      
              ssl_endpoint_identification_algorithm => ""
      
              group_id => "logstash_group"
              consumer_threads => 3
              auto_offset_reset => "earliest"
          }
      }
      
      output {
          stdout {
              codec => rubydebug
          }
      }

      パラメーター

      説明

      bootstrap_servers

      ApsaraMQ for Kafka によって提供されるインターネットエンドポイントは SSL エンドポイントです。

      alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093

      topics

      Topic の名前。

      logstash_test

      security_protocol

      セキュリティプロトコル。デフォルト値は SASL_SSL です。この値を変更する必要はありません。

      SASL_SSL

      sasl_mechanism

      セキュリティ認証メカニズム。デフォルト値は PLAIN です。この値を変更する必要はありません。

      PLAIN

      jaas_path

      jaas.conf 構成ファイルのパス。

      /home/logstash-7.6.2/bin/jaas.conf

      ssl_truststore_password

      kafka.client.truststore.jks 証明書のパスワード。デフォルト値は KafkaOnsClient です。この値を変更する必要はありません。

      KafkaOnsClient

      ssl_truststore_location

      kafka.client.truststore.jks 証明書のパス。

      /home/logstash-7.6.2/bin/kafka.client.truststore.jks

      ssl_endpoint_identification_algorithm

      このパラメーターは Logstash 6.x 以降で必要です。

      空の値

      group_id

      コンシューマーグループの名前。

      logstash_group

      consumer_threads

      コンシューマースレッドの数。このパラメーターは、Topic のパーティション数と同じ値に設定することをお勧めします。

      3

      auto_offset_reset

      オフセットをリセットします。有効な値:

      • earliest: 最も古いメッセージを読み取ります。

      • latest: 最新のメッセージを読み取ります。

      earliest

    4. Esc キーを押して CLI モードに戻ります。

    5. : キーを押してボトムラインモードに入ります。wq と入力して Enter キーを押し、ファイルを保存して終了します。

  5. 次のコマンドを実行してメッセージを消費します。

    ./logstash -f input.conf

    次の出力が返されます。

    result

詳細情報

パラメーター設定の詳細については、「Kafka 入力プラグイン」をご参照ください。