Simple Log Service (SLS) は、Apache Flume と統合するための aliyun-log-flume プラグインを提供しています。このプラグインを使用すると、他のデータソースから SLS にログデータを書き込むことや、SLS からログデータを取得して Hadoop 分散ファイルシステム (HDFS) や Kafka などのダウンストリームシステムに送信することができます。
仕組み
Apache Flume は、Source-Channel-Sink のデータフローモデルを採用しています。aliyun-log-flume プラグインは、SLS を Flume パイプラインに接続するためのカスタム Sink およびカスタム Source を提供します。
Sink: Flume Channel からデータを受け取り、SLS Logstore に書き込みます。Flume を介して他のシステムから SLS にデータを取り込む場合にこのコンポーネントを使用します。
Source: SLS Logstore からログデータを取得し、Flume Channel に送信します。Flume を介して SLS のログデータを他のシステムに送信する場合にこのコンポーネントを使用します。
Channel は Source と Sink の間のバッファーとして機能します。Flume には Memory Channel や File Channel など、組み込みのチャネルタイプが用意されています。詳細については、「Apache Flume User Guide」をご参照ください。
プラグインのソースコードおよびリリースノートについては、「GitHub 上の aliyun-log-flume」をご参照ください。
前提条件
作業を開始する前に、以下の要件を満たしていることを確認してください。
Java: JDK 1.8 以降がインストールされていること。
Apache Flume: Flume 1.8.0 以降がインストールされていること。Flume のダウンロード方法については、「Apache Flume ダウンロードページ」をご参照ください。
SLS リソース: SLS プロジェクトおよび Logstore が作成済みであること。詳細については、SLS ドキュメントをご参照ください。
AccessKey ペア: AccessKey ID および AccessKey Secret を取得済みであること。セキュリティの観点から、RAM ユーザーの AccessKey ペアを使用することを推奨します。詳細については、「AccessKey ペア」をご参照ください。
プラグインのインストール
Flume をダウンロードしてインストールします。詳細については、「Apache Flume ダウンロードページ」をご参照ください。
aliyun-log-flume プラグインの JAR ファイルをダウンロードし、
<FLUME_HOME>/libディレクトリに保存します。ダウンロードリンク:aliyun-log-flume-1.9.jar<FLUME_HOME>/confディレクトリにflumejob.confという名前の構成ファイルを作成します。Sink の構成については「Sink の構成」をご参照ください。Source の構成については「Source の構成」をご参照ください。Flume を起動します。
bin/flume-ng agent -n agent -c conf -f conf/flumejob.conf
Sink の構成
SLS Sink を使用して、他のシステムから Flume を介して SLS Logstore にデータを書き込みます。Sink は、Flume イベントを SLS ログエントリに変換するための次の 3 つのシリアル化モードをサポートしています。
| モード | 動作 |
|---|---|
| SIMPLE | 各 Flume イベント本文が、単一フィールドとして SLS に書き込まれます。 |
| DELIMITED | 各 Flume イベント本文がデリミタに基づいてフィールドに分割され、設定されたカラム名にマッピングされます。 |
| JSON | 各 Flume イベント本文が JSON として解析されます。 |
Sink パラメーター
接続パラメーター
| パラメーター | 必須 | 説明 |
|---|---|---|
| type | はい | Sink のタイプです。com.aliyun.loghub.flume.sink.LoghubSink に設定します。 |
| endpoint | はい | SLS プロジェクトのエンドポイントです。例:http://cn-qingdao.log.aliyuncs.com。プロジェクトのリージョンに基づいてエンドポイントを選択してください。詳細については、「エンドポイント」をご参照ください。 |
| project | はい | SLS プロジェクトの名前です。 |
| logstore | はい | Logstore の名前です。 |
| accessKeyId | はい | Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID です。セキュリティの観点から、RAM ユーザーの AccessKey ペアを使用することを推奨します。詳細については、「AccessKey ペア」をご参照ください。 |
| accessKey | はい | Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret です。セキュリティの観点から、RAM ユーザーの AccessKey ペアを使用することを推奨します。詳細については、「AccessKey ペア」をご参照ください。 |
バッチ処理パラメーター
| パラメーター | 必須 | 説明 |
|---|---|---|
| batchSize | いいえ | 1 回のバッチで SLS に書き込むログエントリの数です。デフォルト値:1000。 |
| maxBufferSize | いいえ | 内部バッファーキューに許容されるログエントリの最大数です。デフォルト値:1000。 |
シリアル化パラメーター
| パラメーター | 必須 | 説明 |
|---|---|---|
| serializer | いいえ | Flume イベントを SLS ログエントリに変換するためのシリアル化モードです。有効な値:SIMPLE(デフォルト)、DELIMITED、JSON、または完全修飾カスタムシリアル化クラス名。 |
| columns | いいえ | カラム名のカンマ区切りリストです。serializer が DELIMITED に設定されている場合に必須です。カラムは、各レコード内のフィールドの出現順にマッピングされます。 |
| separatorChar | いいえ | フィールドを分割するために使用されるデリミタ文字です。1 文字である必要があります。serializer が DELIMITED に設定されている場合に必須です。デフォルト値:,(カンマ)。 |
| quoteChar | いいえ | フィールドを囲むために使用される引用符です。serializer が DELIMITED に設定されている場合に必須です。デフォルト値:"(二重引用符)。 |
| escapeChar | いいえ | エスケープ文字です。serializer が DELIMITED に設定されている場合に必須です。デフォルト値:"(二重引用符)。 |
タイムスタンプパラメーター
| パラメーター | 必須 | 説明 |
|---|---|---|
| useRecordTime | いいえ | SLS への書き込み時に、データエントリ内のタイムスタンプフィールドをログ時間として使用するかどうかを指定します。デフォルト値:false。false に設定されている場合、現在のシステム時間がログ時間として使用されます。 |
Sink 構成例
次の例では、Avro Source からデータを読み取り、DELIMITED シリアル化を使用して SLS Logstore に書き込みます。
# コンポーネントに名前を付けます
agent.sources = avroSrc
agent.channels = memCh
agent.sinks = slsSink
# Avro Source を構成します
agent.sources.avroSrc.type = avro
agent.sources.avroSrc.bind = 0.0.0.0
agent.sources.avroSrc.port = 4141
agent.sources.avroSrc.channels = memCh
# Memory Channel を構成します
agent.channels.memCh.type = memory
agent.channels.memCh.capacity = 1000
agent.channels.memCh.transactionCapacity = 100
# SLS Sink を構成します
agent.sinks.slsSink.type = com.aliyun.loghub.flume.sink.LoghubSink
agent.sinks.slsSink.channel = memCh
agent.sinks.slsSink.endpoint = http://cn-hangzhou.log.aliyuncs.com
agent.sinks.slsSink.project = your-project
agent.sinks.slsSink.logstore = your-logstore
agent.sinks.slsSink.accessKeyId = your-access-key-id
agent.sinks.slsSink.accessKey = your-access-key-secret
agent.sinks.slsSink.batchSize = 1000
agent.sinks.slsSink.serializer = DELIMITED
agent.sinks.slsSink.columns = col1,col2,col3
agent.sinks.slsSink.separatorChar = ,その他の構成例については、「GitHub 上の Sink の例」をご参照ください。
Source の構成
SLS Source を使用して、SLS Logstore からログデータを取得し、Flume を介してダウンストリームシステムに配信します。Source は、SLS ログエントリを Flume イベントに変換するための次の 2 つの逆シリアル化モードをサポートしています。
| モード | 動作 |
|---|---|
| DELIMITED | ログフィールドがデリミタで結合され、Flume イベント本文として書き込まれます。 |
| JSON | ログエントリが JSON としてシリアル化され、Flume イベント本文として書き込まれます。 |
Source パラメーター
接続パラメーター
| パラメーター | 必須 | 説明 |
|---|---|---|
| type | はい | Source のタイプです。com.aliyun.loghub.flume.source.LoghubSource に設定します。 |
| endpoint | はい | SLS プロジェクトのエンドポイントです。例:http://cn-qingdao.log.aliyuncs.com。プロジェクトのリージョンに基づいてエンドポイントを選択してください。詳細については、「エンドポイント」をご参照ください。 |
| project | はい | SLS プロジェクトの名前です。 |
| logstore | はい | Logstore の名前です。 |
| accessKeyId | はい | Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID です。セキュリティの観点から、RAM ユーザーの AccessKey ペアを使用することを推奨します。詳細については、「AccessKey ペア」をご参照ください。 |
| accessKey | はい | Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret です。セキュリティの観点から、RAM ユーザーの AccessKey ペアを使用することを推奨します。詳細については、「AccessKey ペア」をご参照ください。 |
使用者グループパラメーター
| パラメーター | 必須 | 説明 |
|---|---|---|
| consumerGroup | いいえ | 複数のコンシューマー間での消費を調整するために使用される使用者グループの名前です。このパラメーターを指定しない場合、使用者グループ名がランダムに生成されます。 |
| heartbeatIntervalMs | いいえ | コンシューマークライアントが SLS にハートビートメッセージを送信する間隔(ミリ秒単位)です。デフォルト値:30000。 |
| fetchIntervalMs | いいえ | SLS への連続するデータフェッチリクエスト間の間隔(ミリ秒単位)です。デフォルト値:100。 |
| fetchInOrder | いいえ | SLS への書き込み順にログデータを取得するかどうかを指定します。デフォルト値:false。 |
| batchSize | いいえ | 1 回のリクエストでフェッチするログエントリの数です。デフォルト値:100。 |
| initialPosition | いいえ | データ取得の開始位置です。有効な値:begin(デフォルト)、end、および timestamp。注記:指定された使用者グループに対して SLS にチェックポイントが存在する場合、この設定よりもチェックポイントが優先されます。 |
| timestamp | いいえ | データ取得を開始する時点を指定する UNIX タイムスタンプです。initialPosition が timestamp に設定されている場合に必須です。 |
逆シリアル化パラメーター
| パラメーター | 必須 | 説明 |
|---|---|---|
| deserializer | はい | SLS ログエントリを Flume イベントに変換するための逆シリアル化モードです。有効な値:DELIMITED(デフォルト)、JSON、または完全修飾カスタム逆シリアル化クラス名。 |
| columns | いいえ | カラム名のカンマ区切りリストです。deserializer が DELIMITED に設定されている場合に必須です。カラムは、各レコード内のフィールドの出現順にマッピングされます。 |
| separatorChar | いいえ | フィールドを結合するために使用されるデリミタ文字です。1 文字である必要があります。deserializer が DELIMITED に設定されている場合に必須です。デフォルト値:,(カンマ)。 |
| quoteChar | いいえ | フィールドを囲むために使用される引用符です。deserializer が DELIMITED に設定されている場合に必須です。デフォルト値:"(二重引用符)。 |
| escapeChar | いいえ | エスケープ文字です。deserializer が DELIMITED に設定されている場合に必須です。デフォルト値:"(二重引用符)。 |
| appendTimestamp | いいえ | ログタイムスタンプを追加フィールドとして付加するかどうかを指定します。deserializer が DELIMITED に設定されている場合に適用されます。デフォルト値:false。 |
JSON フィールドオプション
次のパラメーターは、deserializer が JSON に設定されている場合にのみ適用されます。
| パラメーター | 必須 | 説明 |
|---|---|---|
| sourceAsField | いいえ | ログソースを __source__ という名前のフィールドとして含めるかどうかを指定します。デフォルト値:false。 |
| tagAsField | いいえ | ログタグをフィールドとして含めるかどうかを指定します。各タグは __tag__:{タグ名} という名前のフィールドとして追加されます。デフォルト値:false。 |
| timeAsField | いいえ | ログ時間を __time__ という名前のフィールドとして含めるかどうかを指定します。デフォルト値:false。 |
タイムスタンプパラメーター
| パラメーター | 必須 | 説明 |
|---|---|---|
| useRecordTime | いいえ | ログエントリ内のタイムスタンプフィールドを Flume イベントのタイムスタンプとして使用するかどうかを指定します。デフォルト値:false。false に設定されている場合、現在のシステム時間が使用されます。 |
SPL 処理パラメーター
次のパラメーターを使用すると、SLS Search Processing Language (SPL) を使用して取得中にログデータをフィルターまたは変換できます。
| パラメーター | 必須 | 説明 |
|---|---|---|
| processor | いいえ | 取得中にログデータをフィルターまたは変換するために使用される SLS SPL 式です。SPL 構文の詳細については、SLS ドキュメントをご参照ください。 |
| query | いいえ | SLS SPL クエリ式です。非推奨:processor を代わりに使用してください。 |
Source 構成例
次の例では、JSON 逆シリアル化を使用して SLS Logstore からデータを取得し、ローカルログファイルに書き込みます。
# コンポーネントに名前を付けます
agent.sources = slsSrc
agent.channels = memCh
agent.sinks = loggerSink
# SLS Source を構成します
agent.sources.slsSrc.type = com.aliyun.loghub.flume.source.LoghubSource
agent.sources.slsSrc.channels = memCh
agent.sources.slsSrc.endpoint = http://cn-hangzhou.log.aliyuncs.com
agent.sources.slsSrc.project = your-project
agent.sources.slsSrc.logstore = your-logstore
agent.sources.slsSrc.accessKeyId = your-access-key-id
agent.sources.slsSrc.accessKey = your-access-key-secret
agent.sources.slsSrc.deserializer = JSON
agent.sources.slsSrc.sourceAsField = true
agent.sources.slsSrc.tagAsField = true
agent.sources.slsSrc.timeAsField = true
agent.sources.slsSrc.consumerGroup = flume-consumer
agent.sources.slsSrc.initialPosition = begin
# Memory Channel を構成します
agent.channels.memCh.type = memory
agent.channels.memCh.capacity = 1000
agent.channels.memCh.transactionCapacity = 100
# Logger Sink を構成します
agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memChその他の構成例については、「GitHub 上の Source の例」をご参照ください。
Channel の構成
Channel は、Flume パイプラインにおいて Source と Sink を接続するバッファーです。aliyun-log-flume プラグインは、標準の Flume Channel タイプすべてで動作します。最も一般的なオプションは次の 2 つです。
Memory Channel: イベントをメモリに格納します。高スループットを実現しますが、Flume エージェントプロセスが再起動するとイベントが失われます。データ損失が許容されるユースケースに適しています。
agent.channels.memCh.type = memory agent.channels.memCh.capacity = 10000 agent.channels.memCh.transactionCapacity = 1000File Channel: イベントをディスクに永続化します。スループットは低下しますが、耐久性を確保できます。データ損失が許容されない本番ワークロードに適しています。
agent.channels.fileCh.type = file agent.channels.fileCh.checkpointDir = /var/flume/checkpoint agent.channels.fileCh.dataDirs = /var/flume/data agent.channels.fileCh.capacity = 1000000 agent.channels.fileCh.transactionCapacity = 10000
重要: Channel の transactionCapacity は、Sink または Source で設定された batchSize 以上の値に設定してください。トランザクション容量がバッチサイズより小さい場合、Sink または Source は 1 つのトランザクション内でバッチを完了できず、エラーが発生します。