すべてのプロダクト
Search
ドキュメントセンター

Elasticsearch:logstash-input-oss プラグインの使用

最終更新日:Mar 27, 2026

logstash-input-oss プラグインは、Alibaba Cloud Logstash を Simple Message Queue (旧称:MNS) (SMQ) を介して Object Storage Service (OSS) に接続します。OSS オブジェクトが更新されると、SMQ はプラグインに通知を配信し、Logstash が OSS から最新データを読み取るようにトリガーします。

logstash-input-oss はオープンソースプラグインです。詳細については、GitHub リポジトリをご参照ください。

仕組み

  1. OSS オブジェクトが作成または更新されます (例: PutObject または AppendObject を介して)。

  2. OSS はイベント通知を SMQ キューに送信します。

  3. プラグインは SMQ 通知を受信し、オブジェクトキーを抽出します。

  4. Logstash はオブジェクト内のすべてのデータを読み取り、パイプラインを通じてダウンストリームに送信します。

注意事項

  • SMQ 通知を受信した後、Logstash は関連するオブジェクト内のすべてのデータを同期します。差分のみではありません。

  • .gz または .gzip 形式のオブジェクトは gzip として処理されます。その他のすべての形式はプレーンテキストとして処理されます。

  • バイナリ形式 (例: .jar または .bin) のオブジェクトは、文字化けして読み取られる場合があります。

前提条件

開始する前に、以下を確認してください。

パイプラインの作成

構成ファイルを使用したパイプラインの管理」で説明されているように、構成ファイルを使用してパイプラインを作成します。「パラメーター」セクションでパラメーターを構成し、パイプラインを保存してデプロイします。

次の例は、OSS バケットからデータを読み取り、Alibaba Cloud Elasticsearch に書き込む方法を示しています。

input {
  oss {
    endpoint => "oss-cn-hangzhou-internal.aliyuncs.com"
    bucket => "zl-ossou****"
    access_key_id => "******"
    access_key_secret => "*********"
    prefix => "file-sample-prefix"
    mns_settings => {
      endpoint => "******.mns.cn-hangzhou-internal.aliyuncs.com"
      queue => "aliyun-es-sample-mns"
    }
    codec => json {
      charset => "UTF-8"
    }
  }
}

output {
  elasticsearch {
    hosts => ["http://es-cn-***.elasticsearch.aliyuncs.com:9200"]
    index => "aliyun-es-sample"
    user => "elastic"
    password => "changeme"
  }
}
重要

SMQ エンドポイントは内部エンドポイントである必要があり、http をプレフィックスとして含めてはいけません。外部エンドポイントを使用するとエラーが発生します。

パラメーター

必須パラメーター

パラメーター タイプ 説明
endpoint string OSS にアクセスするために使用されるエンドポイント。「リージョン、エンドポイント、およびオープンポート」をご参照ください。
bucket string OSS バケットの名前。
access_key_id string ご利用の Alibaba Cloud アカウントの AccessKey ID。
access_key_secret string ご利用の Alibaba Cloud アカウントの AccessKey Secret。
mns_settings hash SMQ 構成。「mns_settings サブパラメーター」をご参照ください。

オプションパラメーター

パラメーター タイプ デフォルト 説明
prefix string 名前プレフィックスでオブジェクトをフィルタリングします。Logstash は、この値で始まる名前のオブジェクトのみを読み取ります。正規表現ではありません。このパラメーターを使用して、バケット内の特定のディレクトリから読み取ります。
additional_oss_settings hash 追加の OSS クライアント設定。サポートされているサブパラメーター: secure_connection_enabled (セキュアな接続を有効にする) および max_connections_to_oss (OSS への最大接続数)。
delete boolean false 読み取り後に処理されたオブジェクトをソース OSS バケットから削除するかどうか。
backup_to_bucket string 処理されたオブジェクトがバックアップされる OSS バケットの名前。
backup_to_dir string 処理されたファイルがバックアップされるローカルディレクトリ。
backup_add_prefix string 処理後にオブジェクトキーに追加されるプレフィックス。キーは、OSS 内のオブジェクト名を含む完全なパスです。このパラメーターを使用して、同じバケット内または異なるバケット内の特定のフォルダにバックアップを保存します。
include_object_properties boolean last_modifiedcontent_type、および metadata などの OSS オブジェクトプロパティを [@metadata][oss] に含めるかどうか。設定されていない場合、[@metadata][oss][key] のみが利用可能です。
exclude_pattern string スキップするオブジェクトキーに一致する Ruby 正規表現。例: "\/logs\/debug\/" は、パスに /logs/debug/ が含まれるすべてのオブジェクトを除外します。

mns_settings サブパラメーター

サブパラメーター 必須 デフォルト 説明
endpoint はい SMQ エンドポイント。内部エンドポイントである必要があり、http を含めてはなりません。
queue はい SMQ キューの名前。
poll_interval_seconds いいえ 10 キューが空の場合の ReceiveMessage リクエストの最大待機時間 (秒単位)。「ReceiveMessage」をご参照ください。
wait_seconds いいえ ReceiveMessage リクエストの最大ポーリング待機時間 (秒単位)。

オブジェクトメタデータ

include_object_properties が有効な場合、プラグインは [@metadata][oss] に次のフィールドを公開します。これらのフィールドをフィルターまたは出力ステージで使用して、イベントをエンリッチしたり、条件付きロジックを適用したりできます。

フィールド タイプ 説明
[@metadata][oss][key] string include_object_properties に関係なく、常に利用可能な OSS 内の完全なオブジェクトキー (パス)。
[@metadata][oss][last_modified] string オブジェクトが最後に変更されたタイムスタンプ。include_object_propertiestrue の場合に利用可能です。
[@metadata][oss][content_type] string オブジェクトの MIME タイプ。include_object_propertiestrue の場合に利用可能です。
[@metadata][oss][metadata] hash オブジェクトにアタッチされたカスタムユーザー定義メタデータ。include_object_propertiestrue の場合に利用可能です。

よくある質問

なぜプラグインは OSS を直接ポーリングするのではなく、SMQ に基づいているのですか?

OSS オブジェクト更新イベントは SMQ とシームレスに統合されており、メッセージベースの通知は自然な適合です。代替案である ListObjects API を使用するには、ローカルストレージでどのオブジェクトが処理済みで、どのオブジェクトが未処理であるかを追跡する必要があります。オブジェクト数が増加すると、ListObjects のパフォーマンスが低下します。より広範なオブジェクトストレージエコシステム (Amazon S3 オープンソースコミュニティを含む) も、ListObjects からメッセージ通知メカニズムへと移行しています。

OSS がまだオブジェクトにデータを書き込んでいる間に Logstash がトリガーされた場合、どうなりますか?

プラグインは、SMQ キューに書き込まれたオブジェクトを記録し、Logstash パイプラインを通じてそれらを送信します。まだ OSS に書き込まれていないデータは引き続き書き込まれます。Logstash が次回トリガーされると、プラグインは OSS から残りのデータを読み取ります。

次のステップ