このガイドでは、Alibaba Cloud Logstash を使用して Azure Event Hubs からデータを取り込み、Elasticsearch クラスターに同期する方法を説明します。
前提条件
Alibaba Cloud Elasticsearch クラスターが作成済みであること。
この例では、バージョン 7.10 を使用します。
Elasticsearch クラスターの YML 設定で自動インデックス作成が有効になっていること。
これにより、Logstash は宛先インデックスを自動的に作成できます。詳細については、「YML パラメーターの設定」をご参照ください。
Alibaba Cloud Logstash インスタンスが作成済みであること。
この例では、バージョン 7.4 を使用します。
Logstash VPC 用に SNAT ルールを持つ NAT Gateway が設定されていること。
これにより、Logstash (VPC 内) と Azure Event Hubs (パブリックネットワーク上) が通信できるようになります。詳細については、「インターネットデータ伝送のための NAT Gateway の設定」をご参照ください。
Azure Event Hubs 環境を準備済み:
アクティブな Event Hub の名前空間とインスタンス。
オフセット追跡 (チェックポイント) のための接続文字列 (プライマリキー) と Blob ストレージアカウント。
ステップ1:Logstash パイプラインの作成と設定
[Logstash クラスターページ] に移動します。
対象のクラスターに移動します。
上部のナビゲーションバーで、クラスターが存在するリージョンを選択します。
Logstash クラスター ページで、対象のクラスターを見つけて、その ID をクリックします。
左側のナビゲーションメニューで、パイプライン管理 をクリックします。
パイプラインの作成 をクリックします。
タスクの作成 ページで、パイプライン ID を入力し、次のコードを Config Settings に貼り付けます:
input { azure_event_hubs { event_hub_connections => ["Endpoint=sb://abc-****.****.cn/;SharedAccessKeyName=gem-****-es-consumer;SharedAccessKey=******;EntityPath=xxxxxx"] initial_position => "beginning" threads => 2 decorate_events => true consumer_group => "group-kl" storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxxxx;AccountKey=*******;EndpointSuffix=core.****.cn" storage_container => "lettie_container" } } filter { } output { elasticsearch { hosts => ["es-cn-tl****5r50005adob.elasticsearch.aliyuncs.com:9200"] index => "test-log" password => "xxxxxx" user => "elastic" } }パラメーター
カテゴリ
パラメーター
説明
Azure
event_hub_connections
ハブの接続文字列。
EntityPath(ハブ名) を含みます。詳細については、「event_hub_connections」をご参照ください。説明event_hub_connections パラメーターはイベントハブごとに定義されます。他のパラメーターはすべてのイベントハブで共有されます。
initial_position
イベントハブのデータを読み取る開始位置。有効な値:beginning (デフォルト)、end、look_back。詳細については、「initial_position」をご参照ください。
threads
イベント処理のためのスレッドの総数。詳細については、「threads」をご参照ください。
decorate_events
イベントハブのメタデータを同期するかどうかを指定します。メタデータには、イベントハブ名、consumer_group、processor_host、パーティション、オフセット、シーケンス、タイムスタンプ、event_size が含まれます。詳細については、「decorate_events」をご参照ください。
consumer_group
Logstash 専用のグループを使用します。このグループ内の複数の Logstash ノードが負荷を共有します。詳細については、「consumer_group」をご参照ください。
storage_connection
Azure Blob Storage の接続文字列。これによりオフセットが永続化され、Logstash は再起動後に中断したところから再開できます。詳細については、「storage_connection」をご参照ください。
storage_container
オフセットを永続化し、複数の Logstash ノードが連携して動作できるようにするために使用されるストレージコンテナーの名前。詳細については、「storage_container」をご参照ください。
説明オフセットの上書きを避けるために、異なる storage_container 名を使用してください。同じデータが異なるサービスに書き込まれる場合は、このパラメーターを異なる名前に設定する必要があります。
Elasticsearch
hosts
ご利用の Elasticsearch エンドポイント。値を
http://<Alibaba Cloud Elasticsearch インスタンス ID>.elasticsearch.aliyuncs.com:9200に設定します。index
Elasticsearch のターゲットインデックス名。
user
Elasticsearch にアクセスするためのユーザー名。デフォルト:
elastic。password
Elasticsearch ユーザーのパスワード。
詳細については、「Logstash 設定ファイル」をご参照ください。
[次へ] をクリックし、パイプラインパラメーターを設定します。

パラメーター
説明
Pipeline Workers
パイプラインのフィルターおよび出力ステージを並行して実行するワーカースレッドの数。イベントがバックログされている場合や CPU が飽和していない場合は、スレッド数を増やして CPU 処理能力をより有効に活用することを検討してください。デフォルト値:インスタンスの CPU コア数。
Pipeline Batch Size
単一のワーカースレッドが、フィルターと出力を実行しようとする前に入力から収集できるイベントの最大数。バッチサイズを大きくすると、メモリのオーバーヘッドが増加する可能性があります。この値を効果的に使用するには、LS_HEAP_SIZE 変数を設定して JVM ヒープサイズを増やすことができます。デフォルト値:125。
Pipeline Batch Delay
小さなバッチをパイプラインのワーカースレッドにディスパッチする前に、各イベントを待機する時間 (ミリ秒単位)。デフォルト値:50 ms。
Queue Type
イベントバッファリングのための内部キューイングモデル。有効な値:
MEMORY:デフォルト。従来のメモリ内キュー。
PERSISTED:ディスクベースの ACK 付きキュー (永続キュー)。
Queue Max Bytes
キューが格納できる最大データ量(単位:
MB)。この値は、1から2<sup>53</sup>-1の範囲の整数である必要があります。デフォルト値は1024 MBです。説明この値が合計ディスク容量より小さいことを確認してください。
Queue Checkpoint Writes
永続キューが有効な場合、これはチェックポイントが強制される前に書き込むことができるイベントの最大数です。値が 0 の場合は制限がないことを意味します。デフォルト値:1024。
警告パイプラインをデプロイまたは更新すると、Logstash クラスターの再起動がトリガーされます。これがメンテナンスウィンドウと一致することを確認してください。
保存 または 保存とデプロイ をクリックします。
保存:パイプライン構成を Logstash に保存します。構成はデプロイされるまで有効になりません。保存後、パイプラインリスト ページに戻ります。操作 列の デプロイ をクリックしてインスタンスを再起動し、構成を適用します。
保存とデプロイ:構成を保存してデプロイします。これにより、インスタンスが再起動され、構成が適用されます。
ステップ3:データ同期の検証
ご利用の Elasticsearch クラスターの Kibana コンソールにログインし、Kibana ホームページに移動します。
左側のナビゲーションメニューで、[開発ツール] をクリックします。
[コンソール] で、次のコマンドを実行してデータが流れていることを確認します:
GET test-log3/_search { "query":{ "match":{ "message":"L23" } } }期待される結果:
