すべてのプロダクト
Search
ドキュメントセンター

Elasticsearch:Logstash を介した Azure Event Hubs から Alibaba Cloud Elasticsearch へのデータ同期

最終更新日:Mar 03, 2026

このガイドでは、Alibaba Cloud Logstash を使用して Azure Event Hubs からデータを取り込み、Elasticsearch クラスターに同期する方法を説明します。

前提条件

ステップ1:Logstash パイプラインの作成と設定

  1. [Logstash クラスターページ] に移動します。

  2. 対象のクラスターに移動します。

    1. 上部のナビゲーションバーで、クラスターが存在するリージョンを選択します。

    2. Logstash クラスター ページで、対象のクラスターを見つけて、その ID をクリックします。

  3. 左側のナビゲーションメニューで、パイプライン管理 をクリックします。

  4. パイプラインの作成 をクリックします。

  5. タスクの作成 ページで、パイプライン ID を入力し、次のコードを Config Settings に貼り付けます:

    input {
      azure_event_hubs {
         event_hub_connections => ["Endpoint=sb://abc-****.****.cn/;SharedAccessKeyName=gem-****-es-consumer;SharedAccessKey=******;EntityPath=xxxxxx"]
         initial_position => "beginning"
         threads => 2
         decorate_events => true
         consumer_group => "group-kl"
         storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxxxx;AccountKey=*******;EndpointSuffix=core.****.cn"
         storage_container => "lettie_container"
       }
    }
    filter {
    
    }
    output {
      elasticsearch {
        hosts => ["es-cn-tl****5r50005adob.elasticsearch.aliyuncs.com:9200"]
        index => "test-log"
        password => "xxxxxx"
        user => "elastic"
      }
    }

    パラメーター

    カテゴリ

    パラメーター

    説明

    Azure

    event_hub_connections

    ハブの接続文字列。EntityPath (ハブ名) を含みます。詳細については、「event_hub_connections」をご参照ください。

    説明

    event_hub_connections パラメーターはイベントハブごとに定義されます。他のパラメーターはすべてのイベントハブで共有されます。

    initial_position

    イベントハブのデータを読み取る開始位置。有効な値:beginning (デフォルト)、end、look_back。詳細については、「initial_position」をご参照ください。

    threads

    イベント処理のためのスレッドの総数。詳細については、「threads」をご参照ください。

    decorate_events

    イベントハブのメタデータを同期するかどうかを指定します。メタデータには、イベントハブ名、consumer_group、processor_host、パーティション、オフセット、シーケンス、タイムスタンプ、event_size が含まれます。詳細については、「decorate_events」をご参照ください。

    consumer_group

    Logstash 専用のグループを使用します。このグループ内の複数の Logstash ノードが負荷を共有します。詳細については、「consumer_group」をご参照ください。

    storage_connection

    Azure Blob Storage の接続文字列。これによりオフセットが永続化され、Logstash は再起動後に中断したところから再開できます。詳細については、「storage_connection」をご参照ください。

    storage_container

    オフセットを永続化し、複数の Logstash ノードが連携して動作できるようにするために使用されるストレージコンテナーの名前。詳細については、「storage_container」をご参照ください。

    説明

    オフセットの上書きを避けるために、異なる storage_container 名を使用してください。同じデータが異なるサービスに書き込まれる場合は、このパラメーターを異なる名前に設定する必要があります。

    Elasticsearch

    hosts

    ご利用の Elasticsearch エンドポイント。値を http://<Alibaba Cloud Elasticsearch インスタンス ID>.elasticsearch.aliyuncs.com:9200 に設定します。

    index

    Elasticsearch のターゲットインデックス名。

    user

    Elasticsearch にアクセスするためのユーザー名。デフォルト:elastic

    password

    Elasticsearch ユーザーのパスワード。

    詳細については、「Logstash 設定ファイル」をご参照ください。

  6. [次へ] をクリックし、パイプラインパラメーターを設定します。

    管道参数配置

    パラメーター

    説明

    Pipeline Workers

    パイプラインのフィルターおよび出力ステージを並行して実行するワーカースレッドの数。イベントがバックログされている場合や CPU が飽和していない場合は、スレッド数を増やして CPU 処理能力をより有効に活用することを検討してください。デフォルト値:インスタンスの CPU コア数。

    Pipeline Batch Size

    単一のワーカースレッドが、フィルターと出力を実行しようとする前に入力から収集できるイベントの最大数。バッチサイズを大きくすると、メモリのオーバーヘッドが増加する可能性があります。この値を効果的に使用するには、LS_HEAP_SIZE 変数を設定して JVM ヒープサイズを増やすことができます。デフォルト値:125。

    Pipeline Batch Delay

    小さなバッチをパイプラインのワーカースレッドにディスパッチする前に、各イベントを待機する時間 (ミリ秒単位)。デフォルト値:50 ms。

    Queue Type

    イベントバッファリングのための内部キューイングモデル。有効な値:

    • MEMORY:デフォルト。従来のメモリ内キュー。

    • PERSISTED:ディスクベースの ACK 付きキュー (永続キュー)。

    Queue Max Bytes

    キューが格納できる最大データ量(単位:MB)。この値は、1 から 2<sup>53</sup>-1 の範囲の整数である必要があります。デフォルト値は 1024 MB です。

    説明

    この値が合計ディスク容量より小さいことを確認してください。

    Queue Checkpoint Writes

    永続キューが有効な場合、これはチェックポイントが強制される前に書き込むことができるイベントの最大数です。値が 0 の場合は制限がないことを意味します。デフォルト値:1024。

    警告

    パイプラインをデプロイまたは更新すると、Logstash クラスターの再起動がトリガーされます。これがメンテナンスウィンドウと一致することを確認してください。

  7. 保存 または 保存とデプロイ をクリックします。

    • 保存:パイプライン構成を Logstash に保存します。構成はデプロイされるまで有効になりません。保存後、パイプラインリスト ページに戻ります。操作 列の デプロイ をクリックしてインスタンスを再起動し、構成を適用します。

    • 保存とデプロイ:構成を保存してデプロイします。これにより、インスタンスが再起動され、構成が適用されます。

ステップ3:データ同期の検証

  1. ご利用の Elasticsearch クラスターの Kibana コンソールにログインし、Kibana ホームページに移動します。

  2. 左側のナビゲーションメニューで、[開発ツール] をクリックします。

  3. [コンソール] で、次のコマンドを実行してデータが流れていることを確認します:

    GET test-log3/_search
    {
      "query":{
        "match":{
          "message":"L23"
         }
       }
    }

    期待される結果:预期结果