すべてのプロダクト
Search
ドキュメントセンター

DataHub:Flume 用 DataHub プラグイン

最終更新日:Jan 25, 2025

Flume 用 DataHub プラグインは、データのサブスクライブとパブリッシュを行うための DataHub プラグインです。このプラグインは Apache Flume をベースに開発されています。このプラグインを使用すると、収集したデータを DataHub に書き込んだり、DataHub からデータを読み取って他のシステムに書き込んだりできます。このプラグインは Apache Flume プラグインの開発規則に準拠しており、インストールも簡単です。このプラグインを使用すると、DataHub にデータをパブリッシュしたり、DataHub 内のデータをサブスクライブしたりできます。

Flume 用 DataHub プラグインのインストール

インストールの制限事項

  • Java 開発キット ( JDK ) のバージョンは 1.8 以降である必要があります。

  • Apache Maven のバージョンは 3.X である必要があります。

  • Flume-NG のバージョンは 1.X である必要があります。

Flume 用 DataHub プラグインのインストール

  1. Apache Flume をダウンロードします。Apache Flume がすでにダウンロードされている場合は、この手順をスキップします。

    $ tar zxvf apache-flume-1.11.0-bin.tar.gz
    説明

    説明を簡単にするために、以下の情報では ${FLUME_HOME} を使用して Apache Flume のホームディレクトリを指定します。

  2. Flume 用 DataHub プラグインをインストールします。

    • Flume 用 DataHub プラグインを直接インストールします。

      1. Flume 用 DataHub プラグインをダウンロードします。

      2. パッケージから Flume 用 DataHub プラグインを抽出し、 ディレクトリに保存します。${FLUME_HOME}/plugins.d ディレクトリ。

        $ tar aliyun-flume-datahub-sink-x.x.x.tar.gz
        $ cd aliyun-flume-datahub-sink-x.x.x
        $ mkdir ${FLUME_HOME}/plugins.d
        $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
    • ソースコードを使用して Flume 用 DataHub プラグインをインストールします。

      1. aliyun-maxcompute-data-collectors からソースコードをダウンロードします。

      2. ソースコードをコンパイルしてプラグインをインストールします。

        $ cd aliyun-maxcompute-data-collectors
        $ mvn clean package -DskipTests=true  -Dmaven.javadoc.skip=true
        $ cd flume-plugin/target
        $ tar zxvf aliyun-flume-datahub-sink-x.x.x.tar.gz
        $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d

パラメーター

Sink 関連のパラメーター

パラメーター

デフォルト値

必須

説明

datahub.endPoint

-

はい

DataHub のエンドポイント。

datahub.accessId

-

はい

Alibaba Cloud アカウントの AccessKey ID。

datahub.accessKey

-

はい

Alibaba Cloud アカウントの AccessKey シークレット。

datahub.project

-

はい

DataHub プロジェクトの名前。

datahub.topic

-

はい

DataHub トピックの名前。

datahub.shard.ids

すべてのシャードの ID

いいえ

データを書き込む DataHub 内のシャードの ID。複数の ID はカンマ ( , ) で区切ります (例: 0,1,2 )。指定されたシャードリストからシャードがランダムに選択されるたびに、DataHub 内のシャードにデータが書き込まれます。このパラメーターが設定されておらず、シャードがマージまたは分割された場合、プラグインはシャードリストを自動的に調整します。構成ファイルを修正して、シャードリストを調整することもできます。

datahub.enablePb

true

いいえ

データ転送に Protobuf を有効にするかどうかを指定します。Apsara Stack を使用しているときに Protobuf がサポートされていない場合は、このパラメーターを false に設定する必要があります。

datahub.compressType

none

いいえ

データ転送に圧縮を有効にするかどうかを指定します。LZ4 および DEFLATE 圧縮形式がサポートされています。

datahub.batchSize

1000

いいえ

一度に DataHub に書き込むことができるレコードの最大数。

datahub.maxBufferSize

2*1024*1024

いいえ

一度に DataHub に書き込むことができるデータの最大量。単位: バイト。このパラメーターを大きい値に変更しないことをお勧めします。一度に書き込むデータ量が大きい場合、書き込み操作が失敗する可能性があります。

datahub.batchTimeout

5

いいえ

DataHub からデータが読み取られるまで待機する時間。単位: 秒。このパラメーターは、データ量が batchSize パラメーターで指定されたしきい値に達していない場合にのみ有効になります。

datahub.retryTimes

3

いいえ

書き込み失敗後に許可される最大再試行回数。

datahub.retryInterval

5

いいえ

データ書き込み失敗後、2 回の連続する再試行の間隔。単位: 秒。

datahub.dirtyDataContinue

true

いいえ

ダーティレコードを無視するかどうかを指定します。このパラメーターを true に設定すると、デリミタ (カンマ ( , ) ) が追加され、ダーティレコードはカンマの後にダーティレコードを格納するファイルに自動的に書き込まれます。この操作は、後続のデータ処理には影響しません。

datahub.dirtyDataFile

DataHub-Flume-dirty-file

いいえ

ダーティレコードを格納するファイルの名前。

serializer

-

はい

データ解析方法。有効な値: DELIMITED、JSON、REGEX。DELIMITED を選択した場合、データは指定されたデリミタに基づいて解析されます。JSON を選択した場合、各行は単一レベルの JSON 配列として解析されます。REGEX を選択した場合、データは指定された正規表現に基づいて解析されます。

serializer.delimiter

,

いいえ

フィールドを区切るために使用されるデリミタ。特殊文字を使用する場合は、二重引用符 ( " " ) で囲む必要があります (例: "\t" )。

serializer.regex

(.*)

いいえ

データ解析に使用される正規表現。各フィールドの値はグループに解析されます。

serializer.fieldnames

-

はい

入力フィールドと DataHub フィールド間のマッピング。入力フィールドは入力順に基づいてマークされます。入力フィールドをマッピングしない場合は、フィールド名を空のままにします。たとえば、c1,c2,,c4 は、1 番目、2 番目、4 番目の入力フィールドが DataHub の c1、c2、c3 フィールドにマッピングされていることを示します。

serializer.charset

UTF-8

いいえ

データ解析のエンコード形式。

Source 関連のパラメーター

パラメーター

デフォルト値

必須

説明

datahub.endPoint

-

はい

DataHub のエンドポイント。

datahub.accessId

-

はい

Alibaba Cloud アカウントの AccessKey ID。

datahub.accessKey

-

はい

Alibaba Cloud アカウントの AccessKey シークレット。

datahub.project

-

はい

DataHub プロジェクトの名前。

datahub.topic

-

はい

DataHub トピックの名前。

datahub.subId

-

はい

DataHub のサブスクリプション ID。

datahub.startTime

-

いいえ

DataHub からデータの読み取りを開始する時点。yyyy-MM-dd HH:mm:ss の形式で時間を指定します。このパラメーターを設定すると、サブスクリプションオフセットがリセットされ、サブスクリプションに基づいてデータが読み取られます。

datahub.shard.ids

-

いいえ

データを読み取る DataHub 内のシャードの ID。複数の ID はカンマ ( , ) で区切ります (例: 0,1,2 )。指定されたシャードリストからシャードがランダムに選択されるたびに、DataHub 内のシャードからデータが読み取られます。このパラメーターを空のままにすると、データ読み取り操作で共同消費が有効になります。このパラメーターを設定しないことをお勧めします。このパラメーターを設定せずに複数のソースを構成すると、共同消費が有効になり、シャードが自動的に割り当てられます。これにより、各ソースの負荷が均等になります。

datahub.enablePb

true

いいえ

データ転送に Protobuf を有効にするかどうかを指定します。Apsara Stack を使用しているときに Protobuf がサポートされていない場合は、このパラメーターを false に設定する必要があります。

datahub.compressType

none

いいえ

データ転送に圧縮を有効にするかどうかを指定します。LZ4 および DEFLATE 圧縮形式がサポートされています。

datahub.batchSize

1000

いいえ

一度に DataHub から読み取ることができるレコードの最大数。

datahub.batchTimeout

5

いいえ

DataHub からデータが読み取られるまで待機する時間。単位: 秒。このパラメーターは、データ量が batchSize パラメーターで指定されたしきい値に達していない場合にのみ有効になります。

datahub.retryTimes

3

いいえ

読み取り失敗後に許可される最大再試行回数。デフォルトでは、2 回の連続する再試行の間隔は 1 秒で、変更できません。

datahub.autoCommit

true

いいえ

コンシューマーが消費オフセットを自動的に送信するかどうかを指定します。このパラメーターを true に設定すると、コンシューマーは消費オフセットを自動的に送信します。この場合、データは消費されない可能性がありますが、消費オフセットは送信されます。このパラメーターを false に設定すると、消費オフセットはデータが Flume チャネルに送信された後に送信されます。

datahub.offsetCommitTimeout

30

いいえ

コンシューマーが消費オフセットを自動的に送信する間隔。単位: 秒。

datahub.sessionTimeout

60

いいえ

セッションタイムアウト期間。ソースは共同消費モードで動作します。共同消費がタイムアウトし、ハートビートメッセージが送信されない場合、セッションは自動的にキャンセルされます。

serializer

-

はい

データ解析方法。値を DELIMITED に設定します。行内のフィールドは DataHub スキーマで指定された順序で書き込まれ、指定されたデリミタで区切られます。

serializer.delimiter

,

いいえ

フィールドを区切るために使用されるデリミタ。特殊文字を使用する場合は、二重引用符 ( " " ) で囲む必要があります (例: "\t" )。

serializer.charset

UTF-8

いいえ

データ解析のエンコード形式。

ケースの説明

Sink のユースケース

ケース 1:DELIMITED シリアライザー

  1. テストデータを準備します。

    データ解析方法として DELIMITED を選択すると、各行はレコードと見なされ、データは指定されたデリミタに基づいて解析されます。このケースでは、Flume 用 DataHub プラグインを使用して CSV ファイルを DataHub にほぼリアルタイムで書き込む方法を示します。次のデータを test.csv という名前のファイルとしてローカルディレクトリ /temp/ に保存します。

    0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
    0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
    1,hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V,false,1254275.1144637289,1573206062763,1254275.1144637289
    2,vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l,true,1254275.1144637289,1573206062763,1254275.1144637289
    3,t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs,false,1254275.1144637289,1573206062763,1254275.1144637289
    4,MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI,true,1254275.1144637289,1573206062763,1254275.1144637289
    5,bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV,false,1254275.1144637289,1573206062763,1254275.1144637289
    6,wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX,true,1254275.1144637289,1573206062763,1254275.1144637289
    7,whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8,false,1254275.1144637289,1573206062763,1254275.1144637289
    8,OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ,true,1254275.1144637289,1573206062763,1254275.1144637289

    次の表は、上記のデータに対応する DataHub トピックのスキーマを示しています。

    フィールド名

    データ型

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. Apache Flume ファイルを構成します。

    ${FLUME_HOME}/conf ディレクトリに datahub_basic.conf という名前のファイルを作成し、ファイルに次の内容を入力します。このケースでは exec ソースを使用しています。他のソースの詳細については、「Flume 1.9.0 ユーザーガイド」をご参照ください。

    # DataHub 用の単一ノード Flume 構成
    # このエージェントのコンポーネントに名前を付ける
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # ソースを記述/構成する
    a1.sources.r1.type = exec
    a1.sources.r1.command = cat /temp/test.csv
    # シンクを記述する
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = DELIMITED
    a1.sinks.k1.serializer.delimiter = ,
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    a1.sinks.k1.datahub.enablePb = true
    a1.sinks.k1.datahub.compressType = DEFLATE
    # メモリ内にイベントをバッファリングするチャネルを使用する
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # ソースとシンクをチャネルにバインドする
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    説明

    exec ソースはソースイベントが Flume チャネルに送信されることを保証できないため、exec ソースを使用するとデータが失われる可能性があります。たとえば、tail コマンドを実行して特定のデータを収集し、Flume チャネルがいっぱいになると、収集されたデータは失われます。Spooling Directory Source または Taildir Source を使用することをお勧めします。このケースでは、/temp/ ディレクトリにある静的ファイル test.csv がデータソースとして使用されます。ログデータがファイルに動的に書き込まれる場合は、tail -F logFile コマンドを実行してログデータをリアルタイムで収集できます。

  3. Flume 用 DataHub プラグインを起動します。

    Dflume.root.logger=INFO,console オプションを使用すると、ログを DataHub コンソールにリアルタイムでエクスポートできます。詳細が必要な場合は、デバッグモードを使用できます。次のコマンドを実行して Flume 用 DataHub プラグインを起動し、CSV ファイルを DataHub に書き込みます。

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

ケース 2:REGEX シリアライザー

  1. テストデータを準備します。

    データ解析方法として REGEX を選択すると、各行はレコードと見なされ、データは指定された正規表現に基づいて解析されます。レコードの内容は複数のグループで表されます。このケースでは、Flume 用 DataHub プラグインが正規表現を使用して CSV ファイルを DataHub にほぼリアルタイムで書き込む方法を示します。次のデータを という名前のファイルとしてローカルディレクトリ /temp/ に保存します。test.csv

    1. [2019-11-12 15:20:08] 0,j4M6PhzL1DXVTQawdfk306N2KnCDxtR0KK1pke5O,true,1254409.5059812006,1573543208698,1254409.5059819978
    2. [2019-11-12 15:22:35] 0,mYLF8UzIYCCFUm1jYs9wzd2Hl6IMr2N7GPYXZSZy,true,1254409.5645912462,1573543355740,1254409.5645920434
    3. [2019-11-12 15:23:14] 0,MOemUZur37n4SGtdUQyMohgmM6cxZRBXjJ34HzqX,true,1254409.5799291395,1573543394219,1254409.579929538
    4. [2019-11-12 15:23:30] 0,EAFc1VTOvC9rYzPl9zJYa6cc8uJ089EaFd79B25i,true,1254409.5862723626,1573543410134,1254409.5862731598
    5. [2019-11-12 15:23:53] 0,zndVraA4GP7FP8p4CkQFsKJkxwtYK3zXjDdkhmRk,true,1254409.5956010541,1573543433538,1254409.5956018514
    6. [2019-11-12 15:24:00] 0,9YrjjoALEfyZm07J7OuNvDVNyspIzrbOOAGnZtHx,true,1254409.598201082,1573543440061,1254409.5982018793
    7. [2019-11-12 15:24:23] 0,mWsFgFlUnXKQQR6RpbAYDF9OhGYgU8mljvGCtZ26,true,1254409.6073950487,1573543463126,1254409.607395447
    8. [2019-11-12 15:26:51] 0,5pZRRzkW3WDLdYLOklNgTLFX0Q0uywZ8jhw7RYfI,true,1254409.666525653,1573543611475,1254409.6665264503
    9. [2019-11-12 15:29:11] 0,hVgGQrXpBtTJm6sovVK4YGjfNMdQ3z9pQHxD5Iqd,true,1254409.7222845491,1573543751364,1254409.7222853464
    10. [2019-11-12 15:29:52] 0,7wQOQmxoaEl6Cxl1OSo6cr8MAc1AdJWJQaTPT5xs,true,1254409.7387664048,1573543792714,1254409.738767202
    11. [2019-11-12 15:30:30] 0,a3Th5Q6a8Vy2h1zfWLEP7MdPhbKyTY3a4AfcOJs2,true,1254409.7538966285,1573543830673,1254409.7538974257
    12. [2019-11-12 15:34:54] 0,d0yQAugqJ8M8OtmVQYMTYR8hi3uuX5WsH9VQRBpP,true,1254409.8589555968,1573544094247,1254409.8589563938

    次の表は、上記のデータに対応する DataHub トピックのスキーマを示しています。

    フィールド名

    データ型

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. Apache Flume ファイルを構成します。

    ${FLUME_HOME}/conf ディレクトリに datahub_basic.conf という名前のファイルを作成し、ファイルに次の内容を入力します。このケースでは exec ソースを使用しています。他のソースの詳細については、「Flume 1.9.0 ユーザーガイド」をご参照ください。

    # DataHub 用の単一ノード Flume 構成
    # このエージェントのコンポーネントに名前を付ける
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # ソースを記述/構成する
    a1.sources.r1.type = exec
    a1.sources.r1.command = cat /temp/test.csv
    # シンクを記述する
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = REGEX
    a1.sinks.k1.serializer.regex = \\[\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\] (\\d+),(\\S+),([a-z]+),([-+]?[0-9]*\\.?[0-9]*),(\\d+),([-+]?[0-9]*\\.?[0-9]*)
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    # メモリ内にイベントをバッファリングするチャネルを使用する
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # ソースとシンクをチャネルにバインドする
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    説明

    exec ソースはソースイベントが Flume チャネルに送信されることを保証できないため、exec ソースを使用するとデータが失われる可能性があります。たとえば、tail コマンドを実行して特定のデータを収集し、Flume チャネルがいっぱいになると、収集されたデータは失われます。 または を使用することをお勧めします。このケースでは、/temp/ ディレクトリにある静的ファイル がデータソースとして使用されます。ログデータがファイルに動的に書き込まれる場合は、 コマンドを実行してログデータをリアルタイムで収集できます。スプーリングディレクトリソースまたはTaildir ソース。静的ファイル test.csvtail -F logFile

  3. Flume 用 DataHub プラグインを起動します。

    Dflume.root.logger=INFO,console オプションを使用すると、ログを DataHub コンソールにリアルタイムでエクスポートできます。詳細が必要な場合は、デバッグモードを使用できます。次のコマンドを実行して Flume 用 DataHub プラグインを起動し、CSV ファイルを DataHub に書き込みます。

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

ケース 3:Flume taildir ソース

前述のように、Flume 用 DataHub プラグインで exec ソースを使用すると、データが失われる可能性があります。そのため、実際の運用環境では exec ソースを使用しないことをお勧めします。ローカルログを収集するには、Taildir Source または Spooling Directory Source を使用できます。このケースでは、taildir ソースを使用してログデータを収集する方法を示します。 taildir ソースは、指定されたファイルを監視し、ほぼリアルタイムでファイルに追加された新しい行を読み取ります。新しい行が書き込まれている場合、taildir ソースは書き込み操作が完了するまで行の読み取りを再試行します。 taildir ソースは、各ファイルの最後の読み取り位置を positionFile ファイルに JSON 形式で保存します。ソースイベントが Flume チャンネルに送信されなかった場合、記録された読み取り位置は更新されません。そのため、taildir ソースは信頼性が高いと言えます。

  1. テストデータを準備します。

    すべてのログデータを次の形式でログファイルの末尾に追加します。ログファイルの名前は 形式です。*.log

    0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289

    次の表は、上記のデータに対応する DataHub トピックのスキーマを示しています。

    フィールド名

    データ型

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. Apache Flume ファイルを構成します。

    という名前のファイルを作成しますdatahub_basic.conf${FLUME_HOME}/conf ディレクトリを作成し、ファイルに次のコンテンツを入力します。

    # DataHub 用の単一ノード Flume 構成
    # このエージェントのコンポーネントに名前を付ける
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # ソースを記述/構成する
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.positionFile = /temp/taildir_position.json
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /temp/.*log
    # シンクを記述する
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = DELIMITED
    a1.sinks.k1.serializer.delimiter = ,
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    a1.sinks.k1.datahub.enablePb = true
    a1.sinks.k1.datahub.compressType = DEFLATE
    # メモリ内にイベントをバッファリングするチャネルを使用する
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # ソースとシンクをチャネルにバインドする
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. Flume 用 DataHub プラグインを起動します。

    Dflume.root.logger=INFO,console オプションを使用すると、ログを DataHub コンソールにリアルタイムでエクスポートできます。詳細が必要な場合は、デバッグモードを使用できます。次のコマンドを実行して Flume 用 DataHub プラグインを起動し、CSV ファイルを DataHub に書き込みます。

    1. $ cd ${FLUME_HOME}
    2. $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

ケース 4:JSON シリアライザー

データ解析方法として JSON を選択すると、各行は単一レベルの JSON 配列形式のレコードと見なされます。最初のレベルにある要素のネストされた内容は文字列と見なされます。最初のレベルにある name フィールドの値が パラメーターで指定されたマッピングにある場合、name フィールドの値は宛先 DataHub トピックのフィールドにマッピングされます。このケースでは、Flume 用 DataHub プラグインが JSON 解析方法を使用してログファイルを DataHub にほぼリアルタイムで書き込む方法を示します。serializer.fieldnames パラメーターの場合、name フィールドの値は、宛先 DataHub Topic のフィールドにマッピングされます。このケースでは、Flume 用 DataHub プラグインが JSON 解析メソッドを使用して、ログファイルをほぼリアルタイムで DataHub に書き込む方法を示します。

  1. テストデータを準備します。

    次のデータをローカルディレクトリ /temp/ 内の test.json ファイルとして保存します。同期されるデータは、日付より後のコンテンツです。

    {"my_time":1573206062763,"gender":true,"name":"YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0","id":0,"salary":1254275.1144629316,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0","id":0,"salary":1254275.1144629316,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V","id":1,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l","id":2,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs","id":3,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI","id":4,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV","id":5,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX","id":6,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8","id":7,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"gender":true,"name":{"a":"OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ"},"id":8,"salary":1254275.1144637289,"decimal":1254275.1144637289}

    次の表は、上記のデータに対応する DataHub トピックのスキーマを示しています。

    フィールド名

    データ型

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. Apache Flume ファイルを構成します。

    ${FLUME_HOME}/conf ディレクトリに datahub_basic.conf という名前のファイルを作成し、次の内容を入力します。 このケースでは、exec ソースを使用しています。他のソースの詳細については、「Flume 1.9.0 ユーザーガイド」をご参照ください。

    # DataHub 用の単一ノード Flume 構成
    # このエージェントのコンポーネントに名前を付ける
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # ソースを記述/構成する
    a1.sources.r1.type = exec
    a1.sources.r1.command = cat /temp/test.json
    # シンクを記述する
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = JSON
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    # メモリ内にイベントをバッファリングするチャネルを使用する
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # ソースとシンクをチャネルにバインドする
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. Flume 用 DataHub プラグインを起動します。

    Dflume.root.logger=INFO,console オプションを使用すると、ログを DataHub コンソールにリアルタイムでエクスポートできます。詳細が必要な場合は、デバッグモードを使用できます。次のコマンドを実行して、Flume 用 DataHub プラグインを起動し、CSV ファイルを DataHub に書き込みます。

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

Source のユースケース

DataHub から他のシステムへのデータの読み取り

DataHub-Flume Source は DataHub からデータを読み取り、信頼性の高い方法で別のシステムに書き込むことができます。このケースでは、DataHub-Flume Source を使用してログデータを宛先コンソールにエクスポートする方法を示します。

  1. 次の表は、上記のデータに対応する DataHub トピックのスキーマを示しています。

    フィールド名

    データ型

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. Apache Flume ファイルを構成します。

    という名前のファイルを作成しますdatahub_source.conf 内の ${FLUME_HOME}/conf ディレクトリを作成し、ファイルに次のコンテンツを入力します。

     # DataHub 用の単一ノード Flume 構成
     # このエージェントのコンポーネントに名前を付ける
     a1.sources = r1
     a1.sinks = k1
     a1.channels = c1
    
     # ソースを記述/構成する
     a1.sources.r1.type = com.aliyun.datahub.flume.sink.DatahubSource
     a1.sources.r1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
     a1.sources.r1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
     a1.sources.r1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
     a1.sources.r1.datahub.project = datahub_test
     a1.sources.r1.datahub.topic = test_flume
     a1.sources.r1.datahub.subId = {YOUR_ALIYUN_DATAHUB_SUB_ID}
     a1.sources.r1.serializer = DELIMITED
     a1.sources.r1.serializer.delimiter = ,
     a1.sources.r1.serializer.charset = UTF-8
     a1.sources.r1.datahub.retryTimes = 3
     a1.sources.r1.datahub.batchSize = 1000
     a1.sources.r1.datahub.batchTimeout = 5
     a1.sources.r1.datahub.enablePb = false
    
     # シンクを記述する
     a1.sinks.k1.type = logger
    
     # メモリ内にイベントをバッファリングするチャネルを使用する
     a1.channels.c1.type = memory
     a1.channels.c1.capacity = 10000
     a1.channels.c1.transactionCapacity = 10000
    
     # ソースとシンクをチャネルにバインドする
     a1.sources.r1.channels = c1
     a1.sinks.k1.channel = c1
  3. Flume 用 DataHub プラグインを起動します。

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_source.conf -Dflume.root.logger=INFO,console

Flume メトリック

Flume 用 DataHub プラグインは、Flume の組み込みカウントメトリックをサポートしています。メトリックに基づいてプラグインの操作を監視できます。Flume 用 DataHub プラグインのシンクとソースでは、異なるメトリックがサポートされています。次の表に、DataHub 関連のメトリックを示します。他のメトリックの詳細については、「使用可能なコンポーネントメトリック」をご参照ください。

DatahubSink

メトリック

説明

BatchEmptyCount

DataHub にデータを書き込むまで待機する時間が経過したときに、DataHub に書き込む必要のあるデータがない回数。待機時間は、データ量が batchSize パラメーターで指定されたしきい値に達していない場合にのみ有効になります。

BatchCompleteCount

リクエストされたすべてのレコードが DataHub に書き込まれた書き込み操作の成功回数。

EventDrainAttemptCount

プラグインが DataHub に書き込もうとした解析済みレコードの数。

BatchUnderflowCount

DataHub に書き込まれたデータ量が、書き込まれるデータ量よりも少ない回数。このようなシナリオでは、データは解析されますが、一部またはすべてのデータが DataHub に書き込まれません。

EventDrainSuccessCount

DataHub に書き込まれたレコードの数。

DatahubSource

メトリック

説明

EventReceivedCount

ソースが受信した DataHub レコードの数。

EventAcceptedCount

ソースがチャネルに送信した DataHub レコードの数。

Flume モニタリング

Apache Flume はさまざまなモニタリング方法を提供しています。このパートでは、HTTP モニタリングを有効にする方法について説明します。他のモニタリング方法の詳細については、「モニタリング」をご参照ください。HTTP モニタリングを有効にするには、Flume 用 DataHub プラグインを起動するときに次の 2 つのパラメーターを追加します。Dflume.monitoring.type=http and Dflume.monitoring.port=1234。Dflume.monitoring.type パラメーターの値 http は HTTP モニタリングを示し、Dflume.monitoring.port パラメーターの値 1234 はポート番号を示します。次のコードは、プラグインを起動する例を示しています。

bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=1234

プラグインが起動したら、Web ページにログインしてメトリックを表示できます。URL は https://ip:1234/metrics です。

説明

他のモニタリング方法の詳細については、「Flume 1.9.0 ユーザーガイド」をご参照ください。

FAQ

Flume 用 DataHub プラグインを起動すると「org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight」エラーが報告された場合はどうすればよいですか?

Flume 用 DataHub プラグインのデフォルトのヒープメモリは 20 MB です。一度に書き込むように指定されたレコードの数が多い場合、Flume 用 DataHub プラグインで使用されるヒープメモリが 20 MB を超えます。次のいずれかの解決策を使用して、問題を解決できます。

解決策 1:batchSize パラメーターの値を減らします。

解決策 2:Flume 用 DataHub プラグインの最大ヒープメモリを増やします。

  • $ vim bin/flume-ng

  • JAV**A_OPTS**="-Xmx20m" ==> JAV**A_OPTS**="-Xmx1024m"

Flume 用 DataHub プラグインは JSON 形式をサポートしていますか?

いいえ。ただし、カスタム正規表現を使用してデータを解析したり、Flume 用 DataHub プラグインのコードを変更して JSONEvent を追加して JSON 形式をサポートしたりできます。

Flume 用 DataHub プラグインは、データ型が BLOB のトピックをサポートしていますか?

いいえ。Flume 用 DataHub プラグインは、データ型が TUPLE のトピックのみをサポートしています。

Flume 用 DataHub プラグインが「org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 1 full, consider committing more frequently, increasing capacity or increasing thread count」エラーを報告するのはなぜですか?

Flume チャネルがいっぱいで、ソースが Flume チャネルにデータを書き込めません。構成ファイルのチャネル容量を変更し、batchSize パラメーターの値を適切に減らすことで、この問題を解決できます。

次の場合はどうすればよいですか?以前のバージョンの Apache Flume を使用するとエラーが発生します。JAR パッケージの競合により、Flume 用 DataHub プラグインを起動できません。

  • たとえば、Apache Flume V1.6 を使用すると、java.lang.NoSuchMethodError:com.fasterxml.jackson.databind.ObjectMapper.readerFor(Lcom/fasterxml/jackson/databind/JavaType;)Lcom/fasterxml/jackson/databind/ObjectReader; エラーが報告されます。新しいバージョンのプラグインと Apache Flume V1.6 は、異なるバージョンの JAR パッケージに依存しています。Apache Flume V1.6 は以前のバージョンの JAR パッケージに依存しているため、新しいバージョンのプラグインによって提供されるメソッドが見つかりません。

  • ${FLUME_HOME}/lib ディレクトリにある次の 3 つの JAR パッケージを削除します。

    • jackson-annotations-2.3.0.jar

    • jackson-databind-2.3.1.jar

    • jackson-annotations-2.3.0.jar

Flume 用 DataHub プラグインを使用してデータを収集すると、空の文字列が自動的に NULL に変換されるのはなぜですか?

Flume 用 DataHub プラグイン V2.0.2 では、trim() メソッドが空でない文字列に使用され、空の文字列は直接 NULL に変換されます。このロジックは、Flume 用 DataHub プラグイン V2.0.3 では削除されています。空の文字列は、DataHub に書き込まれた後、NULL に変換されずに保持されます。

「com.aliyun.datahub.client.impl.batch.avro.AvroSchemaCache.schemaCache」が null であるため、「com.google.common.cache.LoadingCache.get(Object)」を呼び出すことができません。com.google.common.ca「 com.aliyun.datahub.client.impl.batch.avro.AvroSchemaCache.schemaCache 」が null であるため、「 che. LoadingCache.get(Object) 」が発生しました。

Flume の lib ディレクトリにある guava.jar ファイルと zstd.jar ファイルを削除します。