すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Flume を使用したログデータの取得

最終更新日:Feb 28, 2026

Simple Log Service (SLS) は、Apache Flume と統合するための aliyun-log-flume プラグインを提供しています。このプラグインを使用すると、他のデータソースから SLS にログデータを書き込むことや、SLS からログデータを取得して Hadoop 分散ファイルシステム (HDFS) や Kafka などのダウンストリームシステムに送信することができます。

仕組み

Apache Flume は、Source-Channel-Sink のデータフローモデルを採用しています。aliyun-log-flume プラグインは、SLS を Flume パイプラインに接続するためのカスタム Sink およびカスタム Source を提供します。

  • Sink: Flume Channel からデータを受け取り、SLS Logstore に書き込みます。Flume を介して他のシステムから SLS にデータを取り込む場合にこのコンポーネントを使用します。

  • Source: SLS Logstore からログデータを取得し、Flume Channel に送信します。Flume を介して SLS のログデータを他のシステムに送信する場合にこのコンポーネントを使用します。

Channel は Source と Sink の間のバッファーとして機能します。Flume には Memory Channel や File Channel など、組み込みのチャネルタイプが用意されています。詳細については、「Apache Flume User Guide」をご参照ください。

プラグインのソースコードおよびリリースノートについては、「GitHub 上の aliyun-log-flume」をご参照ください。

前提条件

作業を開始する前に、以下の要件を満たしていることを確認してください。

  • Java: JDK 1.8 以降がインストールされていること。

  • Apache Flume: Flume 1.8.0 以降がインストールされていること。Flume のダウンロード方法については、「Apache Flume ダウンロードページ」をご参照ください。

  • SLS リソース: SLS プロジェクトおよび Logstore が作成済みであること。詳細については、SLS ドキュメントをご参照ください。

  • AccessKey ペア: AccessKey ID および AccessKey Secret を取得済みであること。セキュリティの観点から、RAM ユーザーの AccessKey ペアを使用することを推奨します。詳細については、「AccessKey ペア」をご参照ください。

プラグインのインストール

  1. Flume をダウンロードしてインストールします。詳細については、「Apache Flume ダウンロードページ」をご参照ください。

  2. aliyun-log-flume プラグインの JAR ファイルをダウンロードし、<FLUME_HOME>/lib ディレクトリに保存します。ダウンロードリンク:aliyun-log-flume-1.9.jar

  3. <FLUME_HOME>/conf ディレクトリに flumejob.conf という名前の構成ファイルを作成します。Sink の構成については「Sink の構成」をご参照ください。Source の構成については「Source の構成」をご参照ください。

  4. Flume を起動します。

       bin/flume-ng agent -n agent -c conf -f conf/flumejob.conf

Sink の構成

SLS Sink を使用して、他のシステムから Flume を介して SLS Logstore にデータを書き込みます。Sink は、Flume イベントを SLS ログエントリに変換するための次の 3 つのシリアル化モードをサポートしています。

モード動作
SIMPLE各 Flume イベント本文が、単一フィールドとして SLS に書き込まれます。
DELIMITED各 Flume イベント本文がデリミタに基づいてフィールドに分割され、設定されたカラム名にマッピングされます。
JSON各 Flume イベント本文が JSON として解析されます。

Sink パラメーター

接続パラメーター

パラメーター必須説明
typeはいSink のタイプです。com.aliyun.loghub.flume.sink.LoghubSink に設定します。
endpointはいSLS プロジェクトのエンドポイントです。例:http://cn-qingdao.log.aliyuncs.com。プロジェクトのリージョンに基づいてエンドポイントを選択してください。詳細については、「エンドポイント」をご参照ください。
projectはいSLS プロジェクトの名前です。
logstoreはいLogstore の名前です。
accessKeyIdはいAlibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID です。セキュリティの観点から、RAM ユーザーの AccessKey ペアを使用することを推奨します。詳細については、「AccessKey ペア」をご参照ください。
accessKeyはいAlibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret です。セキュリティの観点から、RAM ユーザーの AccessKey ペアを使用することを推奨します。詳細については、「AccessKey ペア」をご参照ください。

バッチ処理パラメーター

パラメーター必須説明
batchSizeいいえ1 回のバッチで SLS に書き込むログエントリの数です。デフォルト値:1000
maxBufferSizeいいえ内部バッファーキューに許容されるログエントリの最大数です。デフォルト値:1000

シリアル化パラメーター

パラメーター必須説明
serializerいいえFlume イベントを SLS ログエントリに変換するためのシリアル化モードです。有効な値:SIMPLE(デフォルト)、DELIMITEDJSON、または完全修飾カスタムシリアル化クラス名。
columnsいいえカラム名のカンマ区切りリストです。serializerDELIMITED に設定されている場合に必須です。カラムは、各レコード内のフィールドの出現順にマッピングされます。
separatorCharいいえフィールドを分割するために使用されるデリミタ文字です。1 文字である必要があります。serializerDELIMITED に設定されている場合に必須です。デフォルト値:,(カンマ)。
quoteCharいいえフィールドを囲むために使用される引用符です。serializerDELIMITED に設定されている場合に必須です。デフォルト値:"(二重引用符)。
escapeCharいいえエスケープ文字です。serializerDELIMITED に設定されている場合に必須です。デフォルト値:"(二重引用符)。

タイムスタンプパラメーター

パラメーター必須説明
useRecordTimeいいえSLS への書き込み時に、データエントリ内のタイムスタンプフィールドをログ時間として使用するかどうかを指定します。デフォルト値:falsefalse に設定されている場合、現在のシステム時間がログ時間として使用されます。

Sink 構成例

次の例では、Avro Source からデータを読み取り、DELIMITED シリアル化を使用して SLS Logstore に書き込みます。

# コンポーネントに名前を付けます
agent.sources = avroSrc
agent.channels = memCh
agent.sinks = slsSink

# Avro Source を構成します
agent.sources.avroSrc.type = avro
agent.sources.avroSrc.bind = 0.0.0.0
agent.sources.avroSrc.port = 4141
agent.sources.avroSrc.channels = memCh

# Memory Channel を構成します
agent.channels.memCh.type = memory
agent.channels.memCh.capacity = 1000
agent.channels.memCh.transactionCapacity = 100

# SLS Sink を構成します
agent.sinks.slsSink.type = com.aliyun.loghub.flume.sink.LoghubSink
agent.sinks.slsSink.channel = memCh
agent.sinks.slsSink.endpoint = http://cn-hangzhou.log.aliyuncs.com
agent.sinks.slsSink.project = your-project
agent.sinks.slsSink.logstore = your-logstore
agent.sinks.slsSink.accessKeyId = your-access-key-id
agent.sinks.slsSink.accessKey = your-access-key-secret
agent.sinks.slsSink.batchSize = 1000
agent.sinks.slsSink.serializer = DELIMITED
agent.sinks.slsSink.columns = col1,col2,col3
agent.sinks.slsSink.separatorChar = ,

その他の構成例については、「GitHub 上の Sink の例」をご参照ください。

Source の構成

SLS Source を使用して、SLS Logstore からログデータを取得し、Flume を介してダウンストリームシステムに配信します。Source は、SLS ログエントリを Flume イベントに変換するための次の 2 つの逆シリアル化モードをサポートしています。

モード動作
DELIMITEDログフィールドがデリミタで結合され、Flume イベント本文として書き込まれます。
JSONログエントリが JSON としてシリアル化され、Flume イベント本文として書き込まれます。

Source パラメーター

接続パラメーター

パラメーター必須説明
typeはいSource のタイプです。com.aliyun.loghub.flume.source.LoghubSource に設定します。
endpointはいSLS プロジェクトのエンドポイントです。例:http://cn-qingdao.log.aliyuncs.com。プロジェクトのリージョンに基づいてエンドポイントを選択してください。詳細については、「エンドポイント」をご参照ください。
projectはいSLS プロジェクトの名前です。
logstoreはいLogstore の名前です。
accessKeyIdはいAlibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID です。セキュリティの観点から、RAM ユーザーの AccessKey ペアを使用することを推奨します。詳細については、「AccessKey ペア」をご参照ください。
accessKeyはいAlibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret です。セキュリティの観点から、RAM ユーザーの AccessKey ペアを使用することを推奨します。詳細については、「AccessKey ペア」をご参照ください。

使用者グループパラメーター

パラメーター必須説明
consumerGroupいいえ複数のコンシューマー間での消費を調整するために使用される使用者グループの名前です。このパラメーターを指定しない場合、使用者グループ名がランダムに生成されます。
heartbeatIntervalMsいいえコンシューマークライアントが SLS にハートビートメッセージを送信する間隔(ミリ秒単位)です。デフォルト値:30000
fetchIntervalMsいいえSLS への連続するデータフェッチリクエスト間の間隔(ミリ秒単位)です。デフォルト値:100
fetchInOrderいいえSLS への書き込み順にログデータを取得するかどうかを指定します。デフォルト値:false
batchSizeいいえ1 回のリクエストでフェッチするログエントリの数です。デフォルト値:100
initialPositionいいえデータ取得の開始位置です。有効な値:begin(デフォルト)、end、および timestamp注記:指定された使用者グループに対して SLS にチェックポイントが存在する場合、この設定よりもチェックポイントが優先されます。
timestampいいえデータ取得を開始する時点を指定する UNIX タイムスタンプです。initialPositiontimestamp に設定されている場合に必須です。

逆シリアル化パラメーター

パラメーター必須説明
deserializerはいSLS ログエントリを Flume イベントに変換するための逆シリアル化モードです。有効な値:DELIMITED(デフォルト)、JSON、または完全修飾カスタム逆シリアル化クラス名。
columnsいいえカラム名のカンマ区切りリストです。deserializerDELIMITED に設定されている場合に必須です。カラムは、各レコード内のフィールドの出現順にマッピングされます。
separatorCharいいえフィールドを結合するために使用されるデリミタ文字です。1 文字である必要があります。deserializerDELIMITED に設定されている場合に必須です。デフォルト値:,(カンマ)。
quoteCharいいえフィールドを囲むために使用される引用符です。deserializerDELIMITED に設定されている場合に必須です。デフォルト値:"(二重引用符)。
escapeCharいいえエスケープ文字です。deserializerDELIMITED に設定されている場合に必須です。デフォルト値:"(二重引用符)。
appendTimestampいいえログタイムスタンプを追加フィールドとして付加するかどうかを指定します。deserializerDELIMITED に設定されている場合に適用されます。デフォルト値:false

JSON フィールドオプション

次のパラメーターは、deserializerJSON に設定されている場合にのみ適用されます。

パラメーター必須説明
sourceAsFieldいいえログソースを __source__ という名前のフィールドとして含めるかどうかを指定します。デフォルト値:false
tagAsFieldいいえログタグをフィールドとして含めるかどうかを指定します。各タグは __tag__:{タグ名} という名前のフィールドとして追加されます。デフォルト値:false
timeAsFieldいいえログ時間を __time__ という名前のフィールドとして含めるかどうかを指定します。デフォルト値:false

タイムスタンプパラメーター

パラメーター必須説明
useRecordTimeいいえログエントリ内のタイムスタンプフィールドを Flume イベントのタイムスタンプとして使用するかどうかを指定します。デフォルト値:falsefalse に設定されている場合、現在のシステム時間が使用されます。

SPL 処理パラメーター

次のパラメーターを使用すると、SLS Search Processing Language (SPL) を使用して取得中にログデータをフィルターまたは変換できます。

パラメーター必須説明
processorいいえ取得中にログデータをフィルターまたは変換するために使用される SLS SPL 式です。SPL 構文の詳細については、SLS ドキュメントをご参照ください。
queryいいえSLS SPL クエリ式です。非推奨processor を代わりに使用してください。

Source 構成例

次の例では、JSON 逆シリアル化を使用して SLS Logstore からデータを取得し、ローカルログファイルに書き込みます。

# コンポーネントに名前を付けます
agent.sources = slsSrc
agent.channels = memCh
agent.sinks = loggerSink

# SLS Source を構成します
agent.sources.slsSrc.type = com.aliyun.loghub.flume.source.LoghubSource
agent.sources.slsSrc.channels = memCh
agent.sources.slsSrc.endpoint = http://cn-hangzhou.log.aliyuncs.com
agent.sources.slsSrc.project = your-project
agent.sources.slsSrc.logstore = your-logstore
agent.sources.slsSrc.accessKeyId = your-access-key-id
agent.sources.slsSrc.accessKey = your-access-key-secret
agent.sources.slsSrc.deserializer = JSON
agent.sources.slsSrc.sourceAsField = true
agent.sources.slsSrc.tagAsField = true
agent.sources.slsSrc.timeAsField = true
agent.sources.slsSrc.consumerGroup = flume-consumer
agent.sources.slsSrc.initialPosition = begin

# Memory Channel を構成します
agent.channels.memCh.type = memory
agent.channels.memCh.capacity = 1000
agent.channels.memCh.transactionCapacity = 100

# Logger Sink を構成します
agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memCh

その他の構成例については、「GitHub 上の Source の例」をご参照ください。

Channel の構成

Channel は、Flume パイプラインにおいて Source と Sink を接続するバッファーです。aliyun-log-flume プラグインは、標準の Flume Channel タイプすべてで動作します。最も一般的なオプションは次の 2 つです。

  • Memory Channel: イベントをメモリに格納します。高スループットを実現しますが、Flume エージェントプロセスが再起動するとイベントが失われます。データ損失が許容されるユースケースに適しています。

      agent.channels.memCh.type = memory
      agent.channels.memCh.capacity = 10000
      agent.channels.memCh.transactionCapacity = 1000
  • File Channel: イベントをディスクに永続化します。スループットは低下しますが、耐久性を確保できます。データ損失が許容されない本番ワークロードに適しています。

      agent.channels.fileCh.type = file
      agent.channels.fileCh.checkpointDir = /var/flume/checkpoint
      agent.channels.fileCh.dataDirs = /var/flume/data
      agent.channels.fileCh.capacity = 1000000
      agent.channels.fileCh.transactionCapacity = 10000

重要: Channel の transactionCapacity は、Sink または Source で設定された batchSize 以上の値に設定してください。トランザクション容量がバッチサイズより小さい場合、Sink または Source は 1 つのトランザクション内でバッチを完了できず、エラーが発生します。

参考情報