すべてのプロダクト
Search
ドキュメントセンター

Elasticsearch:Azure Event Hubs から Alibaba Cloud Elasticsearch クラスタにデータを同期するために Logstash を使用する

最終更新日:Jun 11, 2025

このトピックでは、Alibaba Cloud Logstash を使用して Azure Event Hubs から Alibaba Cloud Elasticsearch クラスタにデータを同期する方法について説明します。

手順

  1. 手順 1:準備を行う

  2. 手順 2:Logstash パイプラインを作成および構成する

  3. 手順 3:結果を確認する

手順 1:準備を行う

  1. Alibaba Cloud Elasticsearch クラスタを作成し、クラスタの自動インデックス作成機能を有効にします。この例では、V7.10 クラスタを使用しています。

    詳細については、「Alibaba Cloud Elasticsearch クラスタを作成する」および「YML ファイルを構成する」をご参照ください。

  2. Alibaba Cloud Logstash クラスタを作成します。この例では、V7.4 クラスタを使用しています。

    詳細については、「Alibaba Cloud Logstash クラスタを作成する」をご参照ください。

    Alibaba Cloud Logstash クラスタは、仮想プライベートクラウド(VPC)にデプロイされます。インターネット経由でクラスタを Azure Event Hubs に接続するには、ネットワークアドレス変換(NAT)ゲートウェイを構成し、ゲートウェイを使用してクラスタをインターネットに接続する必要があります。詳細については、「インターネット経由のデータ転送用に NAT ゲートウェイを構成する」をご参照ください。

    説明

    セルフマネージド Logstash クラスタを使用している場合は、Alibaba Cloud Elasticsearch クラスタと同じ VPC に存在する Elastic Compute Service(ECS)インスタンスを購入する必要があります。このような ECS インスタンスがある場合は、エラスティック IP アドレス(EIP)を ECS インスタンスにバインドします。

  3. Azure Event Hubs 用のセルフマネージド環境を準備します。

    Azure Event Hubs のドキュメントを参照してください。

手順 2:Logstash パイプラインを作成および構成する

  1. Alibaba Cloud Elasticsearch コンソールの Logstash クラスタページに移動します。

  2. 目的のクラスタに移動します。

    1. 上部のナビゲーションバーで、クラスタが存在するリージョンを選択します。

    2. [Logstash クラスタ] ページで、クラスタを見つけて ID をクリックします。

  3. 表示されるページの左側のナビゲーションウィンドウで、[パイプライン] をクリックします。

  4. パイプラインページで、[パイプラインの作成] をクリックします。

  5. [作成] ウィザードで、パイプライン ID を入力し、パイプラインを構成します。

    この例では、パイプラインに次の構成を使用します。

    input {
      azure_event_hubs {
         event_hub_connections => ["Endpoint=sb://abc-****.****.cn/;SharedAccessKeyName=gem-****-es-consumer;SharedAccessKey=******;EntityPath=xxxxxx"]
         initial_position => "beginning"
         threads => 2
         decorate_events => true
         consumer_group => "group-kl"
         storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxxxx;AccountKey=*******;EndpointSuffix=core.****.cn"
         storage_container => "lettie_container"
       }
    }
    filter {
    
    }
    output {
      elasticsearch {
        hosts => ["es-cn-tl****5r50005adob.elasticsearch.aliyuncs.com:9200"]
        index => "test-log"
        password => "xxxxxx"
        user => "elastic"
      }
    }

    表 1. 入力部分のパラメータ

    パラメータ

    説明

    event_hub_connections

    データを同期するイベントハブの接続文字列。各接続文字列には EntityPath が含まれている必要があります。詳細については、event_hub_connections を参照してください。

    説明

    event_hub_connections パラメータは、イベントハブごとに定義されます。他のすべてのパラメータは、イベントハブ間で共有されます。

    initial_position

    データの読み取りを開始する位置。有効な値:beginning、end、look_back。デフォルト値:beginning。詳細については、initial_position を参照してください。

    threads

    データの処理に使用されるスレッドの総数。詳細については、threads を参照してください。

    decorate_events

    イベントハブのメタデータ(名前、consumer_group、processor_host、パーティション、オフセット、シーケンス、タイムスタンプ、event_size など)を同期するかどうかを指定します。詳細については、decorate_events を参照してください。

    consumer_group

    イベントハブからデータを読み取るために使用されるコンシューマーグループ。Logstash クラスタ専用のコンシューマーグループを作成し、Logstash クラスタのすべてのノードがそのコンシューマーグループを使用していることを確認する必要があります。これにより、ノードは連携して動作できます。詳細については、consumer_group を参照してください。

    storage_connection

    BLOB アカウントストレージの接続文字列。BLOB アカウントストレージは、再起動間のオフセットを永続化し、Logstash クラスタ内のノードが異なるパーティションを処理することを保証します。このパラメータを構成すると、再起動は処理が中断された位置から再開されます。このパラメータを構成しない場合、再起動は initial_position パラメータの値で示される位置から再開されます。詳細については、storage_connection を参照してください。

    storage_container

    オフセットを永続化し、Logstash クラスタ内の複数のノードが連携して動作できるようにするために使用されるストレージコンテナの名前。詳細については、storage_container を参照してください。

    説明

    オフセットの上書きを防ぐために、異なる ストレージコンテナ を使用できます。同じデータを異なるサービスに書き込む場合は、複数のストレージコンテナを指定する必要があります。

    表 2. 出力部分のパラメータ

    パラメータ

    説明

    hosts

    Elasticsearch クラスタのエンドポイント。このパラメータの値は、http://<Elasticsearch クラスタ ID>.elasticsearch.aliyuncs.com:9200 の形式である必要があります。

    index

    デスティネーションインデックスの名前。

    user

    Elasticsearch クラスタにアクセスするために使用されるユーザー名。デフォルトのユーザー名は elastic です。

    password

    elastic アカウントのパスワード。パスワードは、クラスタの作成時に指定します。パスワードを忘れた場合は、リセットできます。パスワードをリセットする手順と注意事項については、「Elasticsearch クラスタのアクセスパスワードをリセットする」をご参照ください。

    詳細については、「Logstash 構成ファイル」をご参照ください。

  6. [次へ] をクリックして、パイプラインパラメータを構成します。

    管道参数配置

    パラメータ

    説明

    パイプラインワーカー

    パイプラインのフィルターおよび出力プラグインを並列で実行する ワーカースレッド の数。イベントのバックログが存在する場合、または一部の CPU リソースが使用されていない場合は、スレッド数を増やして CPU 使用率を最大化することをお勧めします。このパラメータのデフォルト値は vCPU の数です。

    パイプラインバッチサイズ

    単一の ワーカースレッド がフィルターおよび出力プラグインの実行を試みる前に、入力プラグインから収集できるイベントの最大数。このパラメータを大きな値に設定すると、単一の ワーカースレッド はより多くのイベントを収集できますが、より多くのメモリを消費します。 ワーカースレッド により多くのイベントを収集するための十分なメモリがあることを確認するには、LS_HEAP_SIZE 変数を指定して Java 仮想マシン(JVM)ヒープサイズを増やします。デフォルト値:125。

    パイプラインバッチ遅延

    イベントの待機時間。この時間は、小さなバッチを パイプライン ワーカースレッド に割り当てる前、およびパイプラインイベントのバッチタスクを作成した後に発生します。デフォルト値:50。単位:ミリ秒。

    キュータイプ

    イベントをバッファリングするための内部 キュー モデル。有効な値:

    • メモリ:従来のメモリベースの キュー 。これがデフォルト値です。

    • 永続化:ディスクベースの ACKed キュー 。永続 キュー です。

    キュー最大バイト数

    キュー の最大データサイズ。単位:MB。有効な値:1 から 253 - 1 の範囲の整数。デフォルト値:1024。

    説明

    値はディスクの合計容量よりも小さくなければなりません。

    キューチェックポイント書き込み

    永続 キュー が有効になっている場合、チェックポイントが適用される前に書き込まれるイベントの最大数。値 0 は制限がないことを示します。デフォルト値:1024。

    警告

    パラメータを構成した後、設定を保存してパイプラインをデプロイする必要があります。これにより、Logstash クラスタの再起動がトリガーされます。続行する前に、再起動がビジネスに影響を与えないことを確認してください。

  7. [保存] または [保存してデプロイ] をクリックします。

    • [保存]:このボタンをクリックすると、システムはパイプライン設定を保存し、クラスタの変更をトリガーします。ただし、設定は有効になりません。[保存] をクリックすると、[パイプライン] ページが表示されます。[パイプライン] ページで、作成したパイプラインを見つけて、[アクション] 列の [今すぐデプロイ] をクリックします。その後、システムは Logstash クラスタを再起動して、設定を有効にします。

    • [保存してデプロイ]:このボタンをクリックすると、システムは Logstash クラスタを再起動して、設定を有効にします。

手順 3:結果を確認する

  1. Elasticsearch クラスタの Kibana コンソールにログインし、プロンプトに従って Kibana コンソールのホームページに移動します。

    Kibana コンソールへのログイン方法の詳細については、「Kibana コンソールにログインする」をご参照ください。

    説明

    この例では、Elasticsearch V7.10.0 クラスタを使用しています。他のバージョンのクラスタでの操作は異なる場合があります。コンソールでの実際の操作が優先されます。

  2. 表示されるページの右上隅にある [Dev Tools] をクリックします。

  3. 表示されるページの [コンソール] タブで、次のコマンドを実行して同期されたデータを表示します。

    GET test-log3/_search
    {
      "query":{
        "match":{
          "message":"L23"
         }
       }
    }

    コマンドが正常に実行されると、図に示されている結果が返されます。Expected result