すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:VPC で Filebeat を出力として接続する

最終更新日:Nov 09, 2025

ApsaraMQ for Kafka を Filebeat に出力として接続できます。このトピックでは、Virtual Private Cloud (VPC) 環境で Filebeat を使用して ApsaraMQ for Kafka にメッセージを送信する方法について説明します。

前提条件

開始する前に、次のタスクを完了してください:

  • ApsaraMQ for Kafka インスタンスを購入してデプロイします。このトピックでは、非サーバーレスインスタンスを例として使用します。詳細については、「VPC タイプのインスタンスに接続する」をご参照ください。

  • Filebeat をダウンロードしてインストールします。詳細については、「Filebeat のダウンロード」をご参照ください。

  • JDK 8 をダウンロードしてインストールします。詳細については、「JDK 8 のダウンロード」をご参照ください。

ステップ 1: エンドポイントの取得

Filebeat は、ApsaraMQ for Kafka インスタンスのエンドポイントを使用して ApsaraMQ for Kafka に接続します。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. 概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. インスタンスリスト ページで、Filebeat に接続するインスタンスの名前をクリックします。

  4. インスタンスの詳細 ページの アクセスポイント情報 セクションで、インスタンスのエンドポイントを表示します。設定情報 セクションで、ユーザー名 および パスワード パラメーターの値を取得します。

    endpoint

    説明

    さまざまなタイプのエンドポイントの違いについては、「エンドポイントの比較」をご参照ください。

ステップ 2: Topic の作成

メッセージを保存するための Topic を作成します。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. 概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

    重要

    Topic は、Elastic Compute Service (ECS) インスタンスがデプロイされているリージョンに作成する必要があります。Topic はリージョンをまたいで使用することはできません。たとえば、メッセージのプロデューサーとコンシューマーが中国 (北京) リージョンにデプロイされた ECS インスタンスで実行されている場合、Topic も中国 (北京) リージョンに作成する必要があります。

  3. インスタンスリスト ページで、管理するインスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで、トピック管理 をクリックします。

  5. トピック管理 ページで、トピックの作成 をクリックします。

  6. トピックの作成 パネルで、Topic のプロパティを指定し、[OK] をクリックします。

    パラメーター

    説明

    名前

    Topic 名。

    demo

    記述

    Topic の説明。

    demo test

    パーティションの数

    Topic 内のパーティションの数。

    12

    ストレージエンジン

    説明

    ストレージエンジンのタイプは、非サーバーレスの Professional Edition インスタンスを使用する場合にのみ指定できます。他のタイプのインスタンスでは、デフォルトで [クラウドストレージ] が選択されます。

    Topic 内のメッセージを保存するために使用されるストレージエンジンのタイプ。

    ApsaraMQ for Kafka は、次のタイプのストレージエンジンをサポートしています:

    • クラウドストレージ: この値を選択すると、システムは Topic に Alibaba Cloud ディスクを使用し、分散モードで 3 つのレプリカにデータを保存します。このストレージエンジンは、低レイテンシー、高性能、長期耐久性、高信頼性を特徴としています。インスタンス作成時に 仕様タイプ パラメーターを Standard Edition (High Write) に設定した場合、このパラメーターは クラウドストレージ にしか設定できません。

    • ローカルストレージ: この値を選択すると、システムはオープンソースの Apache Kafka の In-Sync Replicas (ISR) アルゴリズムを使用し、分散モードで 3 つのレプリカにデータを保存します。

    クラウドストレージ

    メッセージタイプ

    Topic のメッセージタイプ。有効な値:

    • 通常のメッセージ: デフォルトでは、同じキーを持つメッセージは、メッセージが送信された順序で同じパーティションに保存されます。クラスター内のブローカーに障害が発生した場合、パーティションに保存されているメッセージの順序は維持されない可能性があります。ストレージエンジン パラメーターを クラウドストレージ に設定した場合、このパラメーターは自動的に 通常のメッセージ に設定されます。

    • パーティション順位メッセージ: デフォルトでは、同じキーを持つメッセージは、メッセージが送信された順序で同じパーティションに保存されます。クラスター内のブローカーに障害が発生した場合でも、メッセージは送信された順序でパーティションに保存されます。一部のパーティションのメッセージは、パーティションが復元されるまで送信できません。ストレージエンジン パラメーターを ローカルストレージ に設定した場合、このパラメーターは自動的に パーティション順位メッセージ に設定されます。

    通常のメッセージ

    ログリリースポリシー

    Topic で使用されるログクリーンアップポリシー。

    ストレージエンジン パラメーターを ローカルストレージ に設定した場合は、ログリリースポリシー パラメーターを設定する必要があります。ストレージエンジンパラメーターをローカルストレージに設定できるのは、ApsaraMQ for Kafka Professional Edition インスタンスを使用する場合のみです。

    ApsaraMQ for Kafka は、次のログクリーンアップポリシーを提供します:

    • Delete: デフォルトのログクリーンアップポリシー。システムに十分なストレージ容量がある場合、メッセージは最大保存期間に基づいて保持されます。ストレージ使用量が 85% を超えると、システムはサービスの可用性を確保するために最も古いメッセージを削除します。

    • Compact: Apache Kafka で使用されるログ圧縮ポリシー。ログ圧縮により、同じキーを持つメッセージの最新の値が保持されることが保証されます。このポリシーは、障害が発生したシステムの復元や、システム再起動後のキャッシュの再読み込みなどのシナリオに適しています。たとえば、Kafka Connect または Confluent Schema Registry を使用する場合、システムの状態と構成に関する情報をログ圧縮された Topic に保存する必要があります。

      重要

      ログ圧縮された Topic は、Kafka Connect や Confluent Schema Registry などの特定のクラウドネイティブコンポーネントでのみ使用できます。詳細については、「aliware-kafka-demos」をご参照ください。

    Compact

    タグ

    Topic にアタッチするタグ。

    demo

    Topic が作成されると、トピック管理 ページで Topic を表示できます。

ステップ 3: Filebeat を使用してメッセージを送信する

Filebeat がインストールされているマシンで Filebeat を起動し、作成した Topic にメッセージを送信します。

  1. cd コマンドを実行して、Filebeat のインストールディレクトリに切り替えます。

  2. output.conf 設定ファイルを作成できます。

    1. コマンド vim output.conf を実行して、空の設定ファイルを作成できます。

    2. i キーを押して挿入モードに入ります。

    3. 次の内容を入力します。

      filebeat.inputs:
      - type: stdin
      
      output.kafka:
        hosts: ["alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092", "alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092", "alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092"]
      
        topic: 'filebeat_test'
      
        required_acks: 1
        compression: none
        max_message_bytes: 1000000

      パラメーター

      説明

      hosts

      ApsaraMQ for Kafka は、次の VPC エンドポイントを提供します:

      • デフォルトのエンドポイント

      • SASL エンドポイント

      alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092, alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092, alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092

      topic

      Topic の名前。

      filebeat_test

      required_acks

      確認応答 (ACK) の信頼性レベル。有効な値:

      • 0: 応答は不要です。

      • 1: システムはローカルコミットを待ちます。

      • -1: システムはすべてのレプリカのコミットを待ちます。

      デフォルト値: 1。

      1

      compression

      データ圧縮エンコーダー。デフォルト値: gzip。有効な値:

      • None

      • snappy: 圧縮と展開のための C++ 開発パッケージ。

      • lz4: 圧縮と展開の速度に重点を置いた可逆データ圧縮アルゴリズム。

      • gzip: GNU フリーソフトウェア用のファイル圧縮プログラム。

      none

      max_message_bytes

      最大メッセージサイズ (バイト単位)。デフォルト値: 1000000。この値は、ApsaraMQ for Kafka インスタンスに設定した最大メッセージサイズより小さくする必要があります。

      1000000

      パラメーターの詳細については、「Kafka 出力プラグイン」をご参照ください。

    4. Esc キーを押して CLI モードに戻ります。

    5. : キーを押してボトムラインモードに入ります。wq と入力して Enter キーを押し、ファイルを保存して終了します。

  3. 作成した Topic にメッセージを送信します。

    1. ./filebeat -c ./output.yml コマンドを実行します。

    2. test と入力して Enter キーを押します。

ステップ 4: Topic パーティションの表示

Topic に送信したメッセージのステータスを表示できます。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. 概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. インスタンスリスト ページで、管理するインスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで、トピック管理 をクリックします。

  5. トピック管理 ページで、管理する Topic の名前をクリックします。トピックの詳細 ページで、パーテーションステータス タブをクリックします。名前

    表 1. パーティションステータスに含まれるパラメーター

    パラメーター

    説明

    パーティション ID

    パーティション ID。

    最小オフセット

    パーティション内の最小オフセット。

    最大オフセット

    パーティション内の最大オフセット。

    メッセージ

    パーティション内のメッセージ数。

    最終更新日時

    パーティション内の最後のメッセージが保存された時刻。

    分区状态信息

ステップ 5: オフセットによるメッセージのクエリ

送信したメッセージを、そのパーティション ID とオフセットに基づいてクエリできます。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. 概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. インスタンスリスト ページで、管理するインスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで、メッセージクエリ をクリックします。

  5. メッセージクエリ ページで、クエリモード ドロップダウンリストから 位置によるクエリ を選択します。

  6. Topic ドロップダウンリストから Topic を選択し、パーティション ドロップダウンリストからパーティションを選択し、開始点 フィールドにオフセット値を入力してから、クエリ をクリックします。

    指定したオフセット値以上のオフセット値を持つメッセージが表示されます。たとえば、Partition パラメーターと Offset パラメーターの値として 5 を指定すると、システムはパーティション 5 からオフセットが 5 以上のメッセージをクエリします。

    表 2. メッセージクエリ結果に含まれるパラメーター

    パラメーター

    説明

    パーティション

    メッセージが取得されたパーティション。

    オフセット

    メッセージのオフセット。

    Key

    メッセージキー。キーは文字列に変換されます。

    Value

    メッセージの値。メッセージコンテンツとも呼ばれます。メッセージの値は文字列に変換されます。

    メッセージ作成時間

    メッセージが送信された時点。値は、メッセージが送信されたときにクライアントが記録したタイムスタンプ、または ProducerRecord オブジェクトに指定したタイムスタンプフィールドの値です。

    説明
    • タイムスタンプフィールドに値を指定した場合、指定した値が表示されます。

    • タイムスタンプフィールドに値を指定しなかった場合、メッセージが送信されたときのシステム時刻が表示されます。

    • 1970/x/x x:x:x 形式の値は、タイムスタンプフィールドが 0 または無効な値として指定されていることを示します。

    • ApsaraMQ for Kafka バージョン 0.9 以前のクライアントでは、タイムスタンプフィールドを指定できません。

    操作

    • キーのダウンロード をクリックしてメッセージキーをダウンロードします。

    • 値のダウンロード をクリックしてメッセージコンテンツをダウンロードします。

    重要
    • 取得した各メッセージのコンテンツは、ApsaraMQ for Kafka コンソールに最大 1 KB まで表示できます。取得したメッセージが 1 KB を超える場合、システムは自動的にコンテンツを切り捨てます。完全なメッセージコンテンツを表示したい場合は、メッセージをダウンロードしてください。

    • 一度に最大 10 MB の取得済みメッセージをダウンロードできます。取得したメッセージが 10 MB を超える場合、最初の 10 MB のメッセージコンテンツのみがダウンロードできます。