全部產品
Search
文件中心

DataHub:Flume外掛程式

更新時間:Mar 13, 2025

Flume-DataHub外掛程式是基於Flume開發的DataHub資料訂閱/發布外掛程式,可以將採集到的資料寫入DataHub,也可以從DataHub讀取資料寫入其他系統。該外掛程式遵守Flume外掛程式開發規範,安裝方便,可以很方便的向DataHub發布/訂閱資料。

安裝Flume外掛程式

安裝限制

  • JDK版本在 1.8及以上版本

  • Apache Maven 版本3.x

  • Flume-NG 版本1.x

安裝Flume

  1. 下載Flume(如已下載,可跳過該步驟)

    $ tar zxvf apache-flume-1.11.0-bin.tar.gz
    說明

    為後續方便描述,以下介紹以${FLUME_HOME}表示Flume主目錄位置。

  2. 安裝Flume-datahub。

    • 直接安裝。

      1. 下載Flume-datahub外掛程式aliyun-flume-datahub-sink-2.0.9.tar.gz

      2. 解壓flume外掛程式並放在${FLUME_HOME}/plugins.d目錄下

        $ tar aliyun-flume-datahub-sink-x.x.x.tar.gz
        $ cd aliyun-flume-datahub-sink-x.x.x
        $ mkdir ${FLUME_HOME}/plugins.d
        $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
    • 源碼安裝。

      1. 下載源碼aliyun-maxcompute-data-collectors

      2. 編譯並安裝。

        $ cd aliyun-maxcompute-data-collectors
        $ mvn clean package -DskipTests=true  -Dmaven.javadoc.skip=true
        $ cd flume-plugin/target
        $ tar zxvf aliyun-flume-datahub-sink-x.x.x.tar.gz
        $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d

參數介紹

sink參數介紹

名稱

預設值

是否必須

描述

datahub.endPoint

-

必須

阿里雲datahub的服務地址

datahub.accessId

-

必須

阿里雲access id

datahub.accessKey

-

必須

阿里雲access key

datahub.project

-

必須

datahub專案名稱

datahub.topic

-

必須

datahub topic名稱

datahub.shard.ids

所有shard

可選

寫入datahub的指定shard列表,以”,”分隔,例如 0,1,2。每次從shard列表隨機播放一個shard寫入DataHub。在發生shard分裂或者合并時,如果沒有指定該參數,那麼shard分裂或者合并後,flume會自動調整shard列表,否則需要使用者手動修改設定檔。

datahub.enablePb

true

可選

是否使用pb傳輸,部分專有雲不支援需要手動設定為false

datahub.compressType

none

可選

是否壓縮傳輸,目前支援 LZ4, DEFLATE

datahub.batchSize

1000

可選

datahub每次發送的最巨量資料量

datahub.maxBufferSize

2*1024*1024

可選

datahub單次請求寫入資料量的最大值(單位:Byte)。不建議修改該參數,單次寫入資料量過大可能寫入失敗

datahub.batchTimeout

5

可選

如果資料量沒有達到batchSize,向datahub同步資料之前等待的時間(單位:秒)

datahub.retryTimes

3

可選

資料同步失敗重試次數

datahub.retryInterval

5

可選

資料同步失敗稍候再試(單位:秒)

datahub.dirtyDataContinue

true

可選

遇到髒資料是否繼續處理,為true時,會自動將髒資料以,分隔字元寫入髒資料檔案,不影響後續資料的處理

datahub.dirtyDataFile

DataHub-Flume-dirty-file

可選

髒資料檔案

serializer

-

必須

資料解析方式,目前支援DELIMITED(分,JSON(每行為單層Json)和REGEX(Regex)

serializer.delimiter

,

可選

資料欄位分割符,如果要使用特殊字元需要添加雙引號,例如”\t”

serializer.regex

(.*)

可選

資料解析的Regex,每個欄位的資料被解析成一個group

serializer.fieldnames

-

必須

輸入資料欄位到datahub欄位的映射,以輸入的順序標示欄位,如果要跳過某個欄位, 不指定列名即可,例如 c1,c2,,c3,表示將輸入資料的第一、二、四欄位和datahub的c1,c2,c3欄位進行匹配。

serializer.charset

UTF-8

可選

資料解析編碼格式

Source 參數

名稱

預設值

是否必須

描述

datahub.endPoint

-

必須

阿里雲datahub的服務地址

datahub.accessId

-

必須

阿里雲access id

datahub.accessKey

-

必須

阿里雲access key

datahub.project

-

必須

datahub專案名稱

datahub.topic

-

必須

datahub topic名稱

datahub.subId

-

必須

datahub 訂閱 id

datahub.startTime

-

可選

datahub 指定時間點進行讀資料,格式為yyyy-MM-dd HH:mm:ss,使用該參數會首先重設訂閱,然後根據訂閱讀取資料。

datahub.shard.ids

-

可選

讀取datahub的指定shard列表,以”,”分隔,例如 0,1,2。每次讀資料時會從shard列表隨機播放一個shard進行消費。如不指定,則採用協同消費進行資料讀取。不建議使用該參數,如果配置了多個source的情況下,不指定該參數時,協同消費會自動分配shard,儘可能保證每個source負載平衡。

datahub.enablePb

true

可選

是否使用pb傳輸,部分專有雲不支援需要手動設定為false

datahub.compressType

none

可選

是否壓縮傳輸,目前支援 LZ4, DEFLATE

datahub.batchSize

1000

可選

DataHub每次讀取的最巨量資料量

datahub.batchTimeout

5

可選

如果資料量沒有達到batchSize,向datahub同步資料之前等待的時間(單位:秒)

datahub.retryTimes

3

可選

資料讀取失敗重試次數,稍候再試預設為1S,不可調整

datahub.autoCommit

true

可選

設為true表示由consumer自動認可點位,可能發生資料未消費但是點位被提交的可能,修改為false表示資料被提交到flume channel之後才會提交該點位

datahub.offsetCommitTimeout

30

可選

自動認可點位時間間隔(單位:秒)

datahub.sessionTimeout

60

可選

source功能採取協同消費實現,協同消費逾時沒有發送心跳,則session會自動關閉

serializer

-

必須

資料解析方式,目前支援DELIMITED(分隔字元),資料的每個欄位將會以datahub schema順序寫成一行,並以delimiter進行分隔

serializer.delimiter

,

可選

資料欄位分割符,如果要使用特殊字元需要添加雙引號,例如”\t”

serializer.charset

UTF-8

可選

資料解析編碼格式

案例介紹

Sink使用案例

案例一:DELIMITED serializer

  1. 準備測試資料。

    DELIMITED解析資料時將每一行作為一條Record,並以給定的分隔字元對資料進行解析。下面以csv檔案為例,說明如何使用Flume將批量csv檔案准即時上傳到DataHub。請將以下內容儲存至本地檔案/temp/test.csv中。

    0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
    0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
    1,hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V,false,1254275.1144637289,1573206062763,1254275.1144637289
    2,vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l,true,1254275.1144637289,1573206062763,1254275.1144637289
    3,t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs,false,1254275.1144637289,1573206062763,1254275.1144637289
    4,MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI,true,1254275.1144637289,1573206062763,1254275.1144637289
    5,bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV,false,1254275.1144637289,1573206062763,1254275.1144637289
    6,wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX,true,1254275.1144637289,1573206062763,1254275.1144637289
    7,whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8,false,1254275.1144637289,1573206062763,1254275.1144637289
    8,OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ,true,1254275.1144637289,1573206062763,1254275.1144637289

    測試資料對應的DataHub Schema為以下內容:

    欄位名稱

    欄位類型

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. 配置Flume檔案。

    在目錄 ${FLUME_HOME}/conf 下建立檔案名稱為datahub_basic.conf的檔案,然後將以下內容寫入檔案,本執行個體採用Exec Source作為資料來源,更多Source可以參考Flume官方文檔

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = cat /temp/test.csv
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = DELIMITED
    a1.sinks.k1.serializer.delimiter = ,
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    a1.sinks.k1.datahub.enablePb = true
    a1.sinks.k1.datahub.compressType = DEFLATE
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    說明

    ExecSource源可能發生資料丟失,因為ExecSource無法保證將事件放入Channel,在這種情況下,資料將丟失。例如,tail命令擷取資料時,此時flume channel已滿,而這部分資料將會丟失。建議使用Spooling Directory Source或者Taildir Source。這裡將靜態檔案/temp/test.csv作為資料來源,如果檔案為動態寫入的記錄檔,可使用命令tail -F logFile進行即時採集。

  3. 啟動Flume。

    Dflume.root.logger=INFO,console選項可以將日誌即時輸出到控制台,如需更多資訊可採用DEBUG模式。使用如下命令啟動Flume,即可完成CSV檔案資料擷取進入DataHub:

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

案例二:REGEX serializer

  1. 準備測試資料。

    REGEX解析資料時將每一行作為一條Record,並以給定的Regex對資料進行解析,一條Record的資訊的多個內容以分組表示。下面以記錄檔為例,說明flume如何利用Regex準時時上傳到DataHub。請將以下測試資料儲存在本地檔案/temp/test.csv中:

    1. [2019-11-12 15:20:08] 0,j4M6PhzL1DXVTQawdfk306N2KnCDxtR0KK1pke5O,true,1254409.5059812006,1573543208698,1254409.5059819978
    2. [2019-11-12 15:22:35] 0,mYLF8UzIYCCFUm1jYs9wzd2Hl6IMr2N7GPYXZSZy,true,1254409.5645912462,1573543355740,1254409.5645920434
    3. [2019-11-12 15:23:14] 0,MOemUZur37n4SGtdUQyMohgmM6cxZRBXjJ34HzqX,true,1254409.5799291395,1573543394219,1254409.579929538
    4. [2019-11-12 15:23:30] 0,EAFc1VTOvC9rYzPl9zJYa6cc8uJ089EaFd79B25i,true,1254409.5862723626,1573543410134,1254409.5862731598
    5. [2019-11-12 15:23:53] 0,zndVraA4GP7FP8p4CkQFsKJkxwtYK3zXjDdkhmRk,true,1254409.5956010541,1573543433538,1254409.5956018514
    6. [2019-11-12 15:24:00] 0,9YrjjoALEfyZm07J7OuNvDVNyspIzrbOOAGnZtHx,true,1254409.598201082,1573543440061,1254409.5982018793
    7. [2019-11-12 15:24:23] 0,mWsFgFlUnXKQQR6RpbAYDF9OhGYgU8mljvGCtZ26,true,1254409.6073950487,1573543463126,1254409.607395447
    8. [2019-11-12 15:26:51] 0,5pZRRzkW3WDLdYLOklNgTLFX0Q0uywZ8jhw7RYfI,true,1254409.666525653,1573543611475,1254409.6665264503
    9. [2019-11-12 15:29:11] 0,hVgGQrXpBtTJm6sovVK4YGjfNMdQ3z9pQHxD5Iqd,true,1254409.7222845491,1573543751364,1254409.7222853464
    10. [2019-11-12 15:29:52] 0,7wQOQmxoaEl6Cxl1OSo6cr8MAc1AdJWJQaTPT5xs,true,1254409.7387664048,1573543792714,1254409.738767202
    11. [2019-11-12 15:30:30] 0,a3Th5Q6a8Vy2h1zfWLEP7MdPhbKyTY3a4AfcOJs2,true,1254409.7538966285,1573543830673,1254409.7538974257
    12. [2019-11-12 15:34:54] 0,d0yQAugqJ8M8OtmVQYMTYR8hi3uuX5WsH9VQRBpP,true,1254409.8589555968,1573544094247,1254409.8589563938

    以上測試資料對應的DataHub Schema為:

    欄位名稱

    欄位類型

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. 配置Flume檔案。

    在目錄 ${FLUME_HOME}/conf 下建立檔案名稱為datahub_basic.conf的檔案,然後將以下內容寫入檔案。本執行個體採用Exec Source作為資料來源,更多Source可以參考Flume官方文檔

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = cat /temp/test.csv
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = REGEX
    a1.sinks.k1.serializer.regex = \\[\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\] (\\d+),(\\S+),([a-z]+),([-+]?[0-9]*\\.?[0-9]*),(\\d+),([-+]?[0-9]*\\.?[0-9]*)
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    說明

    ExecSource源可能發生資料丟失,因為EeecSource無法保證將事件放入Channel,在這種情況下,資料將丟失。例如,tail命令擷取資料時,此時flume channel已滿,而這部分資料將會丟失。建議使用Spooling Directory Source或者Taildir Source。這裡將靜態檔案/temp/test.csv作為資料來源,如果檔案為動態寫入的記錄檔,可使用命令tail -F logFile進行即時採集。

  3. 啟動Flume。

    Dflume.root.logger=INFO,console選項可以將日誌即時輸出到控制台,如需更多資訊可採用DEBUG模式。使用如下命令啟動Flume,即可完成CSV檔案資料擷取進入DataHub:

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

案例三:Flume Taildir Source

Flume使用exec source時,可能會導致資料丟失,所以在實際生產環境中並不建議使用。如果想要採集本地日誌,可以使用Taildir Source或者Spooling Directory Source。下面以Taildir為例,介紹記錄檔的採集。Taildir將會可以指定檔案組,然後觀察指定的檔案,並在檢測到新行添加到每個檔案後,幾乎即時的進行讀取。如果正在寫入新行,則此源將重試讀取它們,以等待寫入完成。 Taildir Source會把每個檔案的已經讀到的位置資訊以JSON格式儲存在positionFile檔案中,source event 放入channel失敗,已讀位置不會更新,所以Taildir Source是可靠的。

  1. 準備測試資料。

    所有的日誌將以如下格式追加到檔案末尾。記錄檔命名格式為 *.log

    0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289

    以上測試資料對應的DataHub Schema為:

    欄位名稱

    欄位類型

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. 配置Flume設定檔。

    在目錄 ${FLUME_HOME}/conf 下建立檔案名稱為datahub_basic.conf的檔案,然後將以下內容寫入檔案。

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.positionFile = /temp/taildir_position.json
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /temp/.*log
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = DELIMITED
    a1.sinks.k1.serializer.delimiter = ,
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    a1.sinks.k1.datahub.enablePb = true
    a1.sinks.k1.datahub.compressType = DEFLATE
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. 啟動Flume。

    Dflume.root.logger=INFO,console選項可以將日誌即時輸出到控制台,如需更多資訊可採用DEBUG模式。使用如下命令啟動Flume,即可完成CSV檔案資料擷取進入DataHub:

    1. $ cd ${FLUME_HOME}
    2. $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

案例四:JSON serializer

SON解析資料時將每一行作為一條Record,只做一層JSON解析,嵌套的內容直接當作string,第一層的name若在配置的serializer.fieldnames中,就會加入到對應的列中。下面以記錄檔為例,說明flume如何利用JSON解析方式準時時上傳到DataHub。

  1. 準備測試資料。

    將以下內容儲存在本地檔案/temp/test.json中。其中需要同步的資料內容為日期後面的詳細資料。

    {"my_time":1573206062763,"gender":true,"name":"YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0","id":0,"salary":1254275.1144629316,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0","id":0,"salary":1254275.1144629316,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V","id":1,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l","id":2,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs","id":3,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI","id":4,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV","id":5,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":true,"name":"wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX","id":6,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"my_time":1573206062763,"gender":false,"name":"whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8","id":7,"salary":1254275.1144637289,"decimal":1254275.1144637289}
    {"gender":true,"name":{"a":"OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ"},"id":8,"salary":1254275.1144637289,"decimal":1254275.1144637289}

    以上測試資料對應的DataHub Schema為:

    欄位名稱

    欄位類型

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. 配置Flume檔案。

    在目錄 ${FLUME_HOME}/conf 下建立檔案名稱為datahub_basic.conf的檔案,然後將以下內容寫入檔案。本執行個體採用Exec Source作為資料來源,更多Source可以參考Flume官方文檔

    # A single-node Flume configuration for DataHub
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = cat /temp/test.json
    # Describe the sink
    a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
    a1.sinks.k1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
    a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
    a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
    a1.sinks.k1.datahub.project = datahub_project_test
    a1.sinks.k1.datahub.topic = test_topic
    a1.sinks.k1.serializer = JSON
    a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
    a1.sinks.k1.serializer.charset = UTF-8
    a1.sinks.k1.datahub.retryTimes = 5
    a1.sinks.k1.datahub.retryInterval = 5
    a1.sinks.k1.datahub.batchSize = 100
    a1.sinks.k1.datahub.batchTimeout = 5
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. 啟動Flume。

    Dflume.root.logger=INFO,console選項可以將日誌即時輸出到控制台,如需更多資訊可採用DEBUG模式。使用如下命令啟動Flume,即可完成CSV檔案資料擷取進入DataHub:

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

Source使用案例

讀取DataHub中資料至其它系統

DataHub-Flume Source可以將DataHub中的資料讀取出來,並且移動到另外的系統中,本文以logger(直接輸出到控制台)為例,介紹DataHub-Flume Source的使用方法。

  1. 如下欄位topic Schema為例。

    欄位名稱

    欄位類型

    id

    BIGINT

    name

    STRING

    gender

    BOOLEAN

    salary

    DOUBLE

    my_time

    TIMESTAMP

    decimal

    DECIMAL

  2. 配置Flume檔案。

    在目錄 ${FLUME_HOME}/conf 下建立檔案名稱為datahub_source.conf的檔案,然後將以下內容寫入檔案。

     # A single-node Flume configuration for DataHub
     # Name the components on this agent
     a1.sources = r1
     a1.sinks = k1
     a1.channels = c1
    
     # Describe/configure the source
     a1.sources.r1.type = com.aliyun.datahub.flume.sink.DatahubSource
     a1.sources.r1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ENDPOINT}
     a1.sources.r1.datahub.accessId = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
     a1.sources.r1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
     a1.sources.r1.datahub.project = datahub_test
     a1.sources.r1.datahub.topic = test_flume
     a1.sources.r1.datahub.subId = {YOUR_ALIYUN_DATAHUB_SUB_ID}
     a1.sources.r1.serializer = DELIMITED
     a1.sources.r1.serializer.delimiter = ,
     a1.sources.r1.serializer.charset = UTF-8
     a1.sources.r1.datahub.retryTimes = 3
     a1.sources.r1.datahub.batchSize = 1000
     a1.sources.r1.datahub.batchTimeout = 5
     a1.sources.r1.datahub.enablePb = false
    
     # Describe the sink
     a1.sinks.k1.type = logger
    
     # Use a channel which buffers events in memory
     a1.channels.c1.type = memory
     a1.channels.c1.capacity = 10000
     a1.channels.c1.transactionCapacity = 10000
    
     # Bind the source and sink to the channel
     a1.sources.r1.channels = c1
     a1.sinks.k1.channel = c1
  3. 啟動Flume。

    $ cd ${FLUME_HOME}
    $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_source.conf -Dflume.root.logger=INFO,console

Flume metric

DataHub-Flume 支援Flume的內建計數監控器,使用者可以利用監控器來監控自己的Flume外掛程式的運行情況。DataHub-Flume外掛程式的Sink和Source都支援metric資訊顯示,具體參數含義可查看下錶(只含DataHub相關的參數,更多參數含義參考:Flume官方文檔)。

DatahubSink

名稱

描述

BatchEmptyCount

batch timeout時沒有資料需要寫入DataHub發生的次數

BatchCompleteCount

Batch處理成功次數,僅包含全部寫入成功的情況

EventDrainAttemptCount

嘗試寫入DataHub的資料數量(解析成功數量)

BatchUnderflowCount

成功寫入DataHub的資料數量小於需要寫入的資料量發生的次數。資料解析完成,但寫入DataHub時部分失敗或全部失敗。

EventDrainSuccessCount

成功寫入DataHub的資料量

DatahubSource

名稱

描述

EventReceivedCount

Source接收到的DataHub的資料數量

EventAcceptedCount

Source將DataHub資料成功寫入channel的數量

Flume監控

Flume提供了多種監控方法,本文以HTTP監控為例,介紹Flume監控工具的使用,使用HTTP方式監控,只需要在Flume外掛程式啟動時增加兩個參數即可,-Dflume.monitoring.type=http -Dflume.monitoring.port=1234,其中type將監控方式指定為http,port為指定的連接埠號碼。使用樣本如下:

bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=1234

外掛程式成功啟動之後,便可以登入Web介面進行查看。地址為 https://ip:1234/metrics

說明

更多的監控方法可以參考Flume官方文檔

常見問題

flume啟動報錯org.apache.flume.ChannelFullException: Space for commit to queue couldn’t be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight

flume預設堆記憶體20MB,配置的batchSize過大時,flume使用的堆記憶體會超出20MB。

解決方案1:調小batchSize。

解決方案2:調大flume最大堆記憶體。

  • $ vim bin/flume-ng

  • JAV**A_OPTS**="-Xmx20m" ==> JAV**A_OPTS**="-Xmx1024m"

DataHub-Flume外掛程式是否支援JSON格式?

目前不支援,不過使用者可以通過自訂Regex進行資料解析,或者修改DataHub-Flume外掛程式代碼,添加JSONEvent進行支援。

DataHub-Flume外掛程式支援Blob Topic嗎?

目前DataHub-Flume外掛程式僅支援Tuple Topic,暫不支援blob。

flume 報錯 org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 1 full, consider committing more frequently, increasing capacity or increasing thread count

channel已滿,source資料寫入channel失敗。可以在設定檔中修改channel capacity解決,並且可以適當降低datahub source的batchSize。

使用舊版本flume時報錯,可能會因為jar包衝突導致無法正常啟動。

  • 錯誤情境:使用flume1.6時,啟動時報錯:java.lang.NoSuchMethodError:com.fasterxml.jackson.databind.ObjectMapper.readerFor(Lcom/fasterxml/jackson/databind/JavaType;)Lcom/fasterxml/jackson/databind/ObjectReader;。因為新版本的外掛程式依賴的jar包和flume本身依賴的jar包版本不一致,使用了flume依賴的舊版本jar包導致新版本的method找不到。

  • 如何處理:刪除${FLUME_HOME}/lib目錄下的三個jar包即可。

    • jackson-annotations-2.3.0.jar

    • jackson-databind-2.3.1.jar

    • jackson-annotations-2.3.0.jar

使用flume採集資料時,Null 字元串自動轉為null

在flume外掛程式2.0.2中對於非Null 字元串會做trim,Null 字元串直接轉為null。flume外掛程式2.0.3中已經最佳化掉,非Null 字元串寫入DataHub依舊為空白字串。

啟動報錯Cannot invoke "com.google.common.cache.LoadingCache.get(Object)" because"com.aliyun.datahub.client.impl.batch.avro.AvroSchemaCache.schemaCache" is null]

刪除Flume lib檔案夾中的guava 、zstd的 jar包檔案,重新啟動。