すべてのプロダクト
Search
ドキュメントセンター

Elasticsearch:logstash-input-sls プラグインの使用

最終更新日:Jun 24, 2025

logstash-input-sls は、Alibaba Cloud Logstash の組み込み入力プラグインです。このプラグインは、ログサービスからログを取得します。

説明

logstash-input-sls は、Alibaba Cloud によってメンテナンスされているオープンソースプラグインです。詳細については、「logstash-input-logservice」をご参照ください。

機能

  • 分散協調消費: 複数のサーバーを構成して、ログストア内のログデータを同時に消費できます。

    説明

    複数の Logstash サーバーを使用して分散協調消費を実装する場合は、logstash-input-sls がインストールされているパイプラインが各サーバーに 1 つだけデプロイされていることを確認してください。これは、logstash-input-sls プラグインの制限によるものです。 logstash-input-sls がインストールされている複数のパイプラインが単一のサーバーにデプロイされている場合、出力に重複データが存在する可能性があります。

  • 高性能: Java コンシューマーグループを使用する場合、シングルコア CPU の消費速度は 20 MB/s に達する可能性があります。

  • 高信頼性: logstash-input-sls は、サーバー上の消費進捗状況を保存します。サーバーが例外から回復した場合、最後の消費チェックポイントからデータの消費を続行できます。

  • 自動ロードバランシング: シャードは、コンシューマーグループ内のコンシューマーの数に基づいて自動的に割り当てられます。コンシューマーがコンシューマーグループに参加または脱退すると、logstash-input-sls はシャードを自動的に再割り当てします。

前提条件

  • logstash-input-sls プラグインがインストールされていること。

    詳細については、「プラグインのインストールと削除」をご参照ください。

  • ログサービス プロジェクトとログストアが作成されていること。データが収集されていること。

    詳細については、「クイックスタート」をご参照ください。

logstash-input-sls アプリケーション

前提条件が満たされたら、「構成ファイルを使用したパイプラインの管理」に記載されている手順に基づいてパイプラインを作成できます。パイプラインを作成するときは、「パラメーター」セクションの表に記載されている説明に基づいてパイプライン パラメーターを構成します。パラメーターを構成したら、設定を保存してパイプラインをデプロイする必要があります。その後、Alibaba Cloud Logstash がトリガーされ、ログサービスからデータを取得します。

重要

RAM ユーザーが logstash-input-sls プラグインを使用する前に、コンシューマーグループ関連の権限を RAM ユーザーに付与する必要があります。詳細については、「コンシューマーグループを使用したログの消費」をご参照ください。

次の例は、Alibaba Cloud Logstash を使用してログストア内のログデータを消費し、そのデータを Alibaba Cloud Elasticsearch に転送する方法を示しています。

input {
 logservice{
  endpoint => "your project endpoint" // プロジェクトエンドポイント
  access_id => "your access id" // アクセスID
  access_key => "your access key" // アクセスキー
  project => "your project name" // プロジェクト名
  logstore => "your logstore name" // ログストア名
  consumer_group => "consumer group name" // コンシューマーグループ名
  consumer_name => "consumer name" // コンシューマー名
  position => "end" // 開始位置
  checkpoint_second => 30 // チェックポイントの間隔
  include_meta => true // メタデータを含めるかどうか
  consumer_name_with_ip => true // コンシューマー名にIPアドレスを含めるかどうか
  }
}

output {
  elasticsearch {
    hosts => ["http://es-cn-***.elasticsearch.aliyuncs.com:9200"]
    index => "<your_index>"
    user => "elastic"
    password => "changeme"
  }
}

次の仮定を行います。ログストアには 10 個のシャードがあります。各シャードのデータ トラフィックは 1 MB/s です。各 Logstash サーバーの処理能力は 3 MB/s です。 5 台の Logstash サーバーが割り当てられ、logstash-input-sls がインストールされた 1 つのパイプラインが各サーバーにデプロイされます。各サーバーは、consumer_groupconsumer_name の同じ設定を使用します。consumer_name_with_ip は true に設定されています。

この場合、各サーバーに 2 つのシャードが割り当てられ、各シャードは 2 MB/s でデータを処理します。

パラメーター

次の表は、logstash-input-sls でサポートされているパラメーターについて説明しています。

パラメーター

タイプ

必須

説明

endpoint

String

はい

仮想プライベートクラウド (VPC) 内のログサービス プロジェクトのエンドポイント。詳細については、「エンドポイント」をご参照ください。

access_id

String

はい

コンシューマーグループに対する権限を持つアカウントの AccessKey ID。詳細については、「コンシューマーグループを使用したログの消費」をご参照ください。

access_key

String

はい

コンシューマーグループに対する権限を持つアカウントの AccessKey シークレット。詳細については、「コンシューマーグループを使用したログの消費」をご参照ください。

project

String

はい

ログサービス プロジェクトの名前。

logstore

String

はい

ログストアの名前。

consumer_group

String

はい

コンシューマーグループの名前。カスタマイズできます。

consumer_name

String

はい

コンシューマーの名前。カスタマイズできます。各コンシューマーの名前は、コンシューマーグループ内で一意である必要があります。そうでない場合、未定義の動作が発生する可能性があります。

position

String

はい

ログデータの消費を開始する位置。有効な値:

  • begin: ログストアに書き込まれた最初のログエントリからデータが消費されます。

  • end: 現在の時刻からデータが消費されます。

  • yyyy-MM-dd HH:mm:ss: 指定された時刻からデータが消費されます。

checkpoint_second

Number

いいえ

チェックポイントをトリガーする間隔。 10 秒から 60 秒の範囲の値に設定することをお勧めします。最小値: 10。デフォルト値: 30。単位: 秒。

include_meta

Boolean

いいえ

入力ログに、ログソース、時間、タグ、トピックなどのメタデータを含めるかどうかを指定します。デフォルト値: true。

consumer_name_with_ip

Boolean

いいえ

コンシューマー名に IP アドレスを含めるかどうかを指定します。デフォルト値: true。分散協調消費の場合は、値を true に設定します。

パフォーマンステスト

  • テスト環境

    • CPU: Intel(R) Xeon(R) Platinum 8163 プロセッサー @ 2.50 GHz、4 コア

    • メモリ: 8 GB

    • オペレーティング システム: Linux

  • Logstash パイプラインの構成

    input {
      logservice{
      endpoint => "cn-hangzhou-intranet.log.aliyuncs.com" // エンドポイント
      access_id => "***" // アクセスID
      access_key => "***" // アクセスキー
      project => "test-project" // プロジェクト名
      logstore => "logstore1" // ログストア名
      consumer_group => "consumer_group1" // コンシューマーグループ名
      consumer_name => "consumer1" // コンシューマー名
      position => "end" // 開始位置
      checkpoint_second => 30 // チェックポイントの間隔
      include_meta => true // メタデータを含めるかどうか
      consumer_name_with_ip => true // コンシューマー名にIPアドレスを含めるかどうか
      }
    }
    output {
      elasticsearch {
        hosts => ["http://es-cn-***.elasticsearch.aliyuncs.com:9200"]
        index => "myindex"
        user => "elastic"
        password => "changeme"
      }
    }
  • テスト手順

    1. Java プロデューサーを使用して、2 MB、4 MB、8 MB、16 MB、および 32 MB のデータを毎秒ログストアに送信します。

      各ログエントリには 10 個のキーと値のペアが含まれており、長さは約 500 バイトです。

    2. Logstash を起動して、ログストア内のデータを消費します。消費レイテンシが増加せず、消費速度が生成速度に追いつくことができることを確認します。

  • テスト結果

    トラフィック (MB/s)

    CPU 使用率 (%)

    メモリ使用量 (GB)

    32

    170.3

    1.3

    16

    83.3

    1.3

    8

    41.5

    1.3

    4

    21.0

    1.3

    2

    11.3

    1.3