すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:VPC 環境で Logstash にアウトプットとして接続する

最終更新日:Nov 09, 2025

ApsaraMQ for Kafka は、アウトプットとして Logstash に接続できます。このトピックでは、VPC 環境で Logstash を使用して ApsaraMQ for Kafka にメッセージを送信する方法について説明します。

前提条件

  • ApsaraMQ for Kafka インスタンスを購入してデプロイ済みであること。このトピックでは、非サーバーレスインスタンスを例として使用します。詳細については、「VPC タイプのインスタンスに接続する」をご参照ください。

  • Logstash をダウンロードしてインストール済みであること。詳細については、「Logstash のダウンロード」をご参照ください。

  • JDK 8 をダウンロードしてインストール済みであること。詳細については、「JDK 8 のダウンロード」をご参照ください。

ステップ 1: エンドポイントの取得

Logstash は ApsaraMQ for Kafka エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続します。

説明

ApsaraMQ for Kafka は、次の VPC エンドポイントをサポートしています。

  • デフォルトエンドポイント: ポート番号は 9092 です。

  • SASL エンドポイント: ポート番号は 9094 です。SASL エンドポイントを使用するには、アクセス制御リスト (ACL) 機能を有効にする必要があります。詳細については、「ACL 機能を有効にする」をご参照ください。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. 概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが配置されているリージョンを選択します。

  3. インスタンスリスト ページで、Logstash にアウトプットとして接続するインスタンスの名前をクリックします。

  4. インスタンスの詳細 ページの アクセスポイント情報 セクションで、インスタンスのエンドポイントを表示します。設定情報 セクションで、ユーザー名 および パスワード パラメーターの値を取得します。

    endpoint

    説明

    さまざまなタイプのエンドポイントの違いについては、「エンドポイントの比較」をご参照ください。

ステップ 2: Topic の作成

メッセージを保存するための Topic を作成します。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. 概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが配置されているリージョンを選択します。

    重要

    Elastic Compute Service (ECS) インスタンスがデプロイされているリージョンに Topic を作成する必要があります。Topic はリージョンをまたいで使用することはできません。たとえば、メッセージのプロデューサーとコンシューマーが中国 (北京) リージョンにデプロイされている ECS インスタンス上で実行されている場合、Topic も中国 (北京) リージョンに作成する必要があります。

  3. インスタンスリスト ページで、管理するインスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで、トピック管理 をクリックします。

  5. トピック管理 ページで、トピックの作成 をクリックします。

  6. トピックの作成 パネルで、Topic のプロパティを指定し、[OK] をクリックします。

    パラメーター

    説明

    名前

    Topic 名。

    demo

    記述

    Topic の説明。

    demo test

    パーティションの数

    Topic のパーティション数。

    12

    ストレージエンジン

    説明

    ストレージエンジンのタイプは、非サーバーレスのプロフェッショナル版インスタンスを使用する場合にのみ指定できます。他のタイプのインスタンスでは、デフォルトで [クラウドストレージ] が選択されます。

    Topic 内のメッセージを保存するために使用されるストレージエンジンのタイプ。

    ApsaraMQ for Kafka は、次のタイプのストレージエンジンをサポートしています。

    • クラウドストレージ: この値を選択すると、システムは Topic に Alibaba Cloud ディスクを使用し、分散モードで 3 つのレプリカにデータを保存します。このストレージエンジンは、低レイテンシー、高性能、高耐久性、高信頼性を特徴としています。インスタンス作成時に 仕様タイプ パラメーターを Standard Edition (High Write) に設定した場合、このパラメーターは クラウドストレージ にしか設定できません。

    • ローカルストレージ: この値を選択すると、システムはオープンソースの Apache Kafka の In-Sync Replicas (ISR) アルゴリズムを使用し、分散モードで 3 つのレプリカにデータを保存します。

    クラウドストレージ

    メッセージタイプ

    Topic のメッセージタイプ。有効な値:

    • 通常のメッセージ: デフォルトでは、同じキーを持つメッセージは、送信された順序で同じパーティションに保存されます。クラスター内のブローカーに障害が発生した場合、パーティションに保存されているメッセージの順序は保持されないことがあります。ストレージエンジン パラメーターを クラウドストレージ に設定した場合、このパラメーターは自動的に 通常のメッセージ に設定されます。

    • パーティション順位メッセージ: デフォルトでは、同じキーを持つメッセージは、送信された順序で同じパーティションに保存されます。クラスター内のブローカーに障害が発生した場合でも、メッセージは送信された順序でパーティションに保存されます。一部のパーティションのメッセージは、パーティションが復元されるまで送信できません。ストレージエンジン パラメーターを ローカルストレージ に設定した場合、このパラメーターは自動的に パーティション順位メッセージ に設定されます。

    通常のメッセージ

    ログリリースポリシー

    Topic で使用されるログクリーンアップポリシー。

    ストレージエンジン パラメーターを ローカルストレージ に設定した場合は、ログリリースポリシー パラメーターを設定する必要があります。ストレージエンジンパラメーターをローカルストレージに設定できるのは、ApsaraMQ for Kafka プロフェッショナル版インスタンスを使用する場合のみです。

    ApsaraMQ for Kafka は、次のログクリーンアップポリシーを提供します。

    • Delete: デフォルトのログクリーンアップポリシー。システムに十分なストレージ容量がある場合、メッセージは最大保持期間に基づいて保持されます。ストレージ使用量が 85% を超えると、システムはサービスの可用性を確保するために最も古いメッセージを削除します。

    • Compact: Apache Kafka で使用されるログ圧縮ポリシー。ログ圧縮により、同じキーを持つメッセージの最新の値が保持されることが保証されます。このポリシーは、障害が発生したシステムの復元や、システム再起動後のキャッシュの再読み込みなどのシナリオに適しています。たとえば、Kafka Connect または Confluent Schema Registry を使用する場合、システムの状態と構成に関する情報をログ圧縮された Topic に保存する必要があります。

      重要

      ログ圧縮された Topic は、Kafka Connect や Confluent Schema Registry などの特定のクラウドネイティブコンポーネントでのみ使用できます。詳細については、「aliware-kafka-demos」をご参照ください。

    Compact

    タグ

    Topic にアタッチするタグ。

    demo

    Topic が作成されると、トピック管理 ページで Topic を表示できます。

ステップ 3: Logstash を使用してメッセージを送信する

Logstash をインストールしたマシンで、Logstash を起動し、作成した Topic にメッセージを送信します。

  1. cd コマンドを実行して、Logstash の bin ディレクトリに移動します。

  2. output.conf 設定ファイルを作成します。

    1. vim output.conf コマンドを実行して、空の設定ファイルを作成します。

    2. i キーを押して挿入モードに入ります。

    3. 次の内容を入力します。

      input {
          input {
            stdin{}
        }
      }
      
      output {
         kafka {
              bootstrap_servers => "alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092"
              topic_id => "logstash_test"
             }
      }

      パラメーター

      説明

      bootstrap_servers

      ApsaraMQ for Kafka は、次の VPC エンドポイントを提供します。

      • デフォルトエンドポイント

      • SASL エンドポイント

      alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092

      topic_id

      Topic の名前。

      logstash_test

    4. Esc キーを押して CLI モードに戻ります。

    5. : キーを押してボトムラインモードに入ります。wq と入力して Enter キーを押し、ファイルを保存して終了します。

  3. 作成した Topic にメッセージを送信します。

    1. ./logstash -f output.conf を実行します。

    2. test と入力して Enter キーを押します。

      次の結果が返されます。

      result

ステップ 4: Topic パーティションの表示

パーティションのステータスを表示して、Topic に送信されたメッセージを確認します。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. 概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが配置されているリージョンを選択します。

  3. インスタンスリスト ページで、Logstash にアウトプットとして接続したインスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで、トピック管理 をクリックします。

  5. トピック管理 ページで、管理する Topic の名前をクリックします。トピックの詳細 ページで、パーテーションステータス タブをクリックします。名前

    表 1. パーティションステータスに含まれるパラメーター

    パラメーター

    説明

    パーティション ID

    パーティション ID。

    最小オフセット

    パーティション内の最小オフセット。

    最大オフセット

    パーティション内の最大オフセット。

    メッセージ

    パーティション内のメッセージ数。

    最終更新日時

    パーティション内の最後のメッセージが保存された時刻。

    分区状态信息

ステップ 5: オフセットによるメッセージのクエリ

パーティション ID とオフセットに基づいて、送信されたメッセージをクエリできます。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. 概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが配置されているリージョンを選択します。

  3. インスタンスリスト ページで、管理するインスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで、メッセージクエリ をクリックします。

  5. メッセージクエリ ページで、クエリモード ドロップダウンリストから 位置によるクエリ を選択します。

  6. Topic ドロップダウンリストから Topic を選択し、パーティション ドロップダウンリストからパーティションを選択し、開始点 フィールドにオフセット値を入力してから、クエリ をクリックします。

    オフセット値が指定されたオフセット値以上のメッセージが表示されます。たとえば、Partition パラメーターと Offset パラメーターの値として 5 を指定すると、システムはパーティション 5 からオフセットが 5 以上のメッセージをクエリします。

    表 2. メッセージクエリ結果に含まれるパラメーター

    パラメーター

    説明

    パーティション

    メッセージが取得されたパーティション。

    オフセット

    メッセージのオフセット。

    Key

    メッセージキー。キーは文字列に変換されます。

    Value

    メッセージの値。メッセージコンテンツとも呼ばれます。メッセージの値は文字列に変換されます。

    メッセージ作成時間

    メッセージが送信された時点。値は、メッセージが送信されたときにクライアントが記録したタイムスタンプ、または ProducerRecord オブジェクトに指定したタイムスタンプフィールドの値です。

    説明
    • タイムスタンプフィールドに値を指定した場合、指定した値が表示されます。

    • タイムスタンプフィールドに値を指定しなかった場合、メッセージが送信されたときのシステム時刻が表示されます。

    • 1970/x/x x:x:x 形式の値は、タイムスタンプフィールドが 0 または無効な値として指定されていることを示します。

    • ApsaraMQ for Kafka バージョン 0.9 以前のクライアントでは、タイムスタンプフィールドを指定できません。

    操作

    • キーのダウンロード をクリックしてメッセージキーをダウンロードします。

    • 値のダウンロード をクリックしてメッセージコンテンツをダウンロードします。

    重要
    • 取得した各メッセージのコンテンツは、ApsaraMQ for Kafka コンソールに最大 1 KB まで表示できます。取得したメッセージのサイズが 1 KB を超える場合、システムは自動的にコンテンツを切り捨てます。完全なメッセージコンテンツを表示したい場合は、メッセージをダウンロードしてください。

    • 一度にダウンロードできる取得済みメッセージは最大 10 MB です。取得したメッセージのサイズが 10 MB を超える場合、最初の 10 MB のメッセージコンテンツのみがダウンロードできます。

詳細情報

パラメーター設定の詳細については、「Kafka output plugin」をご参照ください。