logstash-input-oss プラグインは、Alibaba Cloud Logstash を Simple Message Queue (旧称:MNS) (SMQ) を介して Object Storage Service (OSS) に接続します。OSS オブジェクトが更新されると、SMQ はプラグインに通知を配信し、Logstash が OSS から最新データを読み取るようにトリガーします。
logstash-input-oss はオープンソースプラグインです。詳細については、GitHub リポジトリをご参照ください。
仕組み
-
OSS オブジェクトが作成または更新されます (例: PutObject または AppendObject を介して)。
-
OSS はイベント通知を SMQ キューに送信します。
-
プラグインは SMQ 通知を受信し、オブジェクトキーを抽出します。
-
Logstash はオブジェクト内のすべてのデータを読み取り、パイプラインを通じてダウンストリームに送信します。
注意事項
-
SMQ 通知を受信した後、Logstash は関連するオブジェクト内のすべてのデータを同期します。差分のみではありません。
-
.gzまたは.gzip形式のオブジェクトは gzip として処理されます。その他のすべての形式はプレーンテキストとして処理されます。 -
バイナリ形式 (例:
.jarまたは.bin) のオブジェクトは、文字化けして読み取られる場合があります。
前提条件
開始する前に、以下を確認してください。
-
logstash-input-oss プラグインがインストールされていること。「Logstash プラグインのインストールまたは削除」をご参照ください。
-
OSS と SMQ がアクティブ化されており、OSS バケットが SMQ キューまたはトピックと同じリージョンにあること。「OSS のアクティブ化」および「SMQ のアクティブ化と RAM ユーザーへの SMQ アクセス権限付与」をご参照ください。
-
OSS でイベント通知ルールが構成されていること。「イベント通知ルールの構成」をご参照ください。
パイプラインの作成
「構成ファイルを使用したパイプラインの管理」で説明されているように、構成ファイルを使用してパイプラインを作成します。「パラメーター」セクションでパラメーターを構成し、パイプラインを保存してデプロイします。
次の例は、OSS バケットからデータを読み取り、Alibaba Cloud Elasticsearch に書き込む方法を示しています。
input {
oss {
endpoint => "oss-cn-hangzhou-internal.aliyuncs.com"
bucket => "zl-ossou****"
access_key_id => "******"
access_key_secret => "*********"
prefix => "file-sample-prefix"
mns_settings => {
endpoint => "******.mns.cn-hangzhou-internal.aliyuncs.com"
queue => "aliyun-es-sample-mns"
}
codec => json {
charset => "UTF-8"
}
}
}
output {
elasticsearch {
hosts => ["http://es-cn-***.elasticsearch.aliyuncs.com:9200"]
index => "aliyun-es-sample"
user => "elastic"
password => "changeme"
}
}
SMQ エンドポイントは内部エンドポイントである必要があり、http をプレフィックスとして含めてはいけません。外部エンドポイントを使用するとエラーが発生します。
パラメーター
必須パラメーター
| パラメーター | タイプ | 説明 |
|---|---|---|
endpoint |
string | OSS にアクセスするために使用されるエンドポイント。「リージョン、エンドポイント、およびオープンポート」をご参照ください。 |
bucket |
string | OSS バケットの名前。 |
access_key_id |
string | ご利用の Alibaba Cloud アカウントの AccessKey ID。 |
access_key_secret |
string | ご利用の Alibaba Cloud アカウントの AccessKey Secret。 |
mns_settings |
hash | SMQ 構成。「mns_settings サブパラメーター」をご参照ください。 |
オプションパラメーター
| パラメーター | タイプ | デフォルト | 説明 |
|---|---|---|---|
prefix |
string | — | 名前プレフィックスでオブジェクトをフィルタリングします。Logstash は、この値で始まる名前のオブジェクトのみを読み取ります。正規表現ではありません。このパラメーターを使用して、バケット内の特定のディレクトリから読み取ります。 |
additional_oss_settings |
hash | — | 追加の OSS クライアント設定。サポートされているサブパラメーター: secure_connection_enabled (セキュアな接続を有効にする) および max_connections_to_oss (OSS への最大接続数)。 |
delete |
boolean | false |
読み取り後に処理されたオブジェクトをソース OSS バケットから削除するかどうか。 |
backup_to_bucket |
string | — | 処理されたオブジェクトがバックアップされる OSS バケットの名前。 |
backup_to_dir |
string | — | 処理されたファイルがバックアップされるローカルディレクトリ。 |
backup_add_prefix |
string | — | 処理後にオブジェクトキーに追加されるプレフィックス。キーは、OSS 内のオブジェクト名を含む完全なパスです。このパラメーターを使用して、同じバケット内または異なるバケット内の特定のフォルダにバックアップを保存します。 |
include_object_properties |
boolean | — | last_modified、content_type、および metadata などの OSS オブジェクトプロパティを [@metadata][oss] に含めるかどうか。設定されていない場合、[@metadata][oss][key] のみが利用可能です。 |
exclude_pattern |
string | — | スキップするオブジェクトキーに一致する Ruby 正規表現。例: "\/logs\/debug\/" は、パスに /logs/debug/ が含まれるすべてのオブジェクトを除外します。 |
mns_settings サブパラメーター
| サブパラメーター | 必須 | デフォルト | 説明 |
|---|---|---|---|
endpoint |
はい | — | SMQ エンドポイント。内部エンドポイントである必要があり、http を含めてはなりません。 |
queue |
はい | — | SMQ キューの名前。 |
poll_interval_seconds |
いいえ | 10 |
キューが空の場合の ReceiveMessage リクエストの最大待機時間 (秒単位)。「ReceiveMessage」をご参照ください。 |
wait_seconds |
いいえ | — | ReceiveMessage リクエストの最大ポーリング待機時間 (秒単位)。 |
オブジェクトメタデータ
include_object_properties が有効な場合、プラグインは [@metadata][oss] に次のフィールドを公開します。これらのフィールドをフィルターまたは出力ステージで使用して、イベントをエンリッチしたり、条件付きロジックを適用したりできます。
| フィールド | タイプ | 説明 |
|---|---|---|
[@metadata][oss][key] |
string | include_object_properties に関係なく、常に利用可能な OSS 内の完全なオブジェクトキー (パス)。 |
[@metadata][oss][last_modified] |
string | オブジェクトが最後に変更されたタイムスタンプ。include_object_properties が true の場合に利用可能です。 |
[@metadata][oss][content_type] |
string | オブジェクトの MIME タイプ。include_object_properties が true の場合に利用可能です。 |
[@metadata][oss][metadata] |
hash | オブジェクトにアタッチされたカスタムユーザー定義メタデータ。include_object_properties が true の場合に利用可能です。 |
よくある質問
なぜプラグインは OSS を直接ポーリングするのではなく、SMQ に基づいているのですか?
OSS オブジェクト更新イベントは SMQ とシームレスに統合されており、メッセージベースの通知は自然な適合です。代替案である ListObjects API を使用するには、ローカルストレージでどのオブジェクトが処理済みで、どのオブジェクトが未処理であるかを追跡する必要があります。オブジェクト数が増加すると、ListObjects のパフォーマンスが低下します。より広範なオブジェクトストレージエコシステム (Amazon S3 オープンソースコミュニティを含む) も、ListObjects からメッセージ通知メカニズムへと移行しています。
OSS がまだオブジェクトにデータを書き込んでいる間に Logstash がトリガーされた場合、どうなりますか?
プラグインは、SMQ キューに書き込まれたオブジェクトを記録し、Logstash パイプラインを通じてそれらを送信します。まだ OSS に書き込まれていないデータは引き続き書き込まれます。Logstash が次回トリガーされると、プラグインは OSS から残りのデータを読み取ります。