すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Flume との連携

最終更新日:Mar 26, 2026

aliyun-log-flume プラグインを使用して Log Service と Flume を連携させ、ログデータの書き込みと消費を行います。

背景情報

aliyun-log-flume プラグインは Log Service と Flume を連携させ、HDFS や Kafka などの他のデータシステムとの接続を可能にします。このプラグインは、シンクとソースを提供します。

  • シンク:Flume が他のデータソースからデータを読み取り、Log Service に書き込みます。

  • ソース:Flume が Log Service からログデータを消費し、他のシステムに書き込みます。

詳細については、「aliyun-log-flume」をご参照ください。

操作手順

  1. Flume をダウンロードしてインストールします。

    詳細については、「Flume」をご参照ください。

  2. aliyun-log-flume プラグインをダウンロードし、<FLUME_HOME>/lib ディレクトリに配置します。

    詳細については、「aliyun-log-flume-1.3.jar」をご参照ください。

  3. <FLUME_HOME>/conf ディレクトリに、flumejob.conf という名前の設定ファイルを作成します。

    • シンクの設定と例については、「Sink」をご参照ください。

    • ソースの設定と例については、「Source」をご参照ください。

  4. Flume を起動します。

Sink

シンクを使用して、他のソースからのデータを Flume 経由で Log Service に書き込みます。以下の 2 つの解析形式がサポートされています:

  • SIMPLE:Flume イベント全体を単一のフィールドとして Log Service に書き込みます。

  • DELIMITED:Flume イベント全体を区切り文字データとして扱います。データは、設定されたカラム名に基づいてフィールドに解析され、Log Service に書き込まれます。

次の表に、シンク設定のパラメーターを示します。

パラメーター

必須

説明

type

はい

シンクのクラス名。値を com.aliyun.Loghub.flume.sink.LoghubSink に設定します。

endpoint

はい

プロジェクトのサービスエンドポイント。例:http://cn-qingdao.log.aliyuncs.com。例を実際のサービスエンドポイントに置き換えてください。詳細については、「サービスエンドポイント」をご参照ください。

project

はい

プロジェクトの名前。

LogStore

はい

LogStore の名前。

accessKeyId

はい

ユーザーを識別するために使用される AccessKey ID。セキュリティのため、RAM ユーザーの AccessKey ペアを使用してください。AccessKey ペアの取得方法については、「AccessKey ペア」をご参照ください。

accessKey

はい

ユーザーを認証するために使用される AccessKey Secret。セキュリティのため、RAM ユーザーの AccessKey ペアを使用してください。AccessKey ペアの取得方法については、「AccessKey ペア」をご参照ください。

batchSize

いいえ

各バッチで Log Service に書き込むデータエントリの数。デフォルト値:1000。

maxBufferSize

いいえ

キャッシュキューのサイズ。デフォルト値:1000。

serializer

いいえ

Flume イベントのシリアル化形式。有効な値:

  • [DELIMITED]:区切り文字形式でデータを解析します。

  • [SIMPLE]:単一行形式でデータを解析します。これがデフォルトの形式です。

  • [JSON]:JSON 形式でデータを解析します。

  • カスタム [serializer]:カスタムシリアライザーの完全修飾クラス名に設定します。

columns

いいえ

このパラメーターは、serializer[DELIMITED] に設定されている場合に必須です。カラム名をコンマ区切りで指定します。カラム名は、データ内のフィールドと同じ順序である必要があります。

separatorChar

いいえ

serializer[DELIMITED] に設定されている場合、このパラメーターはフィールドの区切り文字を指定します。値は単一の文字である必要があります。デフォルト値はコンマ (,) です。

quoteChar

いいえ

serializer[DELIMITED] に設定されている場合、このパラメーターは引用符を指定します。デフォルト値は二重引用符 (") です。

escapeChar

いいえ

serializer[DELIMITED] に設定されている場合、このパラメーターはエスケープ文字を指定します。デフォルト値は二重引用符 (") です。

useRecordTime

いいえ

データから timestamp フィールドをログ時間として使用するかどうかを指定します。false に設定した場合、現在のシステム時刻が使用されます。デフォルト値は false です。

シンクの設定例については、「GitHub」をご参照ください。

Source

ソースを使用して Log Service からログデータを消費し、Flume を介して他のデータソースに送信します。以下の 2 つの出力形式がサポートされています:

  • DELIMITED:区切り文字形式のログフォーマットで Flume にデータを出力します。

  • JSON:JSON 形式のログフォーマットで Flume にデータを出力します。

次の表に、ソース設定のパラメーターを示します。

パラメーター

必須

説明

type

はい

ソースのクラス名。値を com.aliyun.Loghub.flume.source.LoghubSource に設定します。

endpoint

はい

プロジェクトのサービスエンドポイント。例:http://cn-qingdao.log.aliyuncs.com。例を実際のサービスエンドポイントに置き換えてください。詳細については、「サービスエンドポイント」をご参照ください。

project

はい

プロジェクトの名前。

LogStore

はい

LogStore の名前。

accessKeyId

はい

ユーザーを識別するために使用される AccessKey ID。セキュリティのため、RAM ユーザーの AccessKey ペアを使用してください。AccessKey ペアの取得方法については、「AccessKey ペア」をご参照ください。

accessKey

はい

ユーザーを認証するために使用される AccessKey Secret。セキュリティのため、RAM ユーザーの AccessKey ペアを使用してください。AccessKey ペアの取得方法については、「AccessKey ペア」をご参照ください。

heartbeatIntervalMs

いいえ

クライアントと Log Service 間のハートビート間隔。デフォルト値は 30,000 ミリ秒です。

fetchIntervalMs

いいえ

データフェッチの間隔。デフォルト値は 100 ミリ秒です。

fetchInOrder

いいえ

データを順次消費するかどうかを指定します。デフォルト値は false です。

batchSize

いいえ

各バッチで読み取るデータエントリの数。デフォルト値は 100 です。

consumerGroup

いいえ

コンシューマーグループの名前。

initialPosition

いいえ

データ消費の開始位置。有効な値は [begin][end][timestamp] です。デフォルト値は [begin] です。

説明

サーバー側にチェックポイントが存在する場合、それが優先されます。

timestamp

いいえ

このパラメーターは、initialPosition[タイムスタンプ] に設定されている場合に必須です。開始時刻を UNIX タイムスタンプ形式で指定します。

deserializer

はい

Flume イベントの逆シリアル化形式。有効な値:

  • [DELIMITED]:区切り文字形式でデータを解析します。これがデフォルトの形式です。

  • [JSON]:JSON 形式でデータを解析します。

  • カスタムデシリアライザー: カスタムデシリアライザーの完全修飾クラス名を設定します。

columns

いいえ

このパラメーターは、deserializer[DELIMITED] に設定されている場合に必須です。カラム名をコンマ区切りで指定します。カラム名は、データ内のフィールドと同じ順序である必要があります。

separatorChar

いいえ

deserializer[DELIMITED] に設定されている場合、このパラメーターはフィールドの区切り文字を指定します。値は単一の文字である必要があります。デフォルト値はコンマ (,) です。

quoteChar

いいえ

deserializer[DELIMITED] に設定されている場合、このパラメーターは引用符を指定します。デフォルト値は二重引用符 (") です。

escapeChar

いいえ

deserializer[DELIMITED] に設定されている場合、このパラメーターはエスケープ文字を指定します。デフォルト値は二重引用符 (") です。

appendTimestamp

いいえ

deserializer[DELIMITED] に設定されている場合、このパラメーターはタイムスタンプを各行の末尾にフィールドとして自動的に追加するかどうかを指定します。デフォルト値は false です。

sourceAsField

いいえ

deserializer[JSON] に設定されている場合、ログソースを __source__ という名前のフィールドとして追加するかどうかを指定します。デフォルト値は false です。

tagAsField

いいえ

deserializer[JSON] に設定されている場合、ログタグをフィールドとして追加するかどうかを指定します。各タグは、__tag__:{tag_name} の形式の名前を持つ個別のフィールドとして追加されます。デフォルト値は false です。

timeAsField

いいえ

deserializer[JSON] に設定されている場合、ログ時間を __time__ という名前のフィールドとして追加するかどうかを指定します。デフォルト値は false です。

useRecordTime

いいえ

ログの元のタイムスタンプを使用するかどうかを指定します。false に設定した場合、現在のシステム時刻が使用されます。デフォルト値は false です。

ソースの設定例については、「GitHub」をご参照ください。