Flume-DataHub外掛程式是基於Flume開發的DataHub資料訂閱/發布外掛程式,可以將採集到的資料寫入DataHub,也可以從DataHub讀取資料寫入其他系統。該外掛程式遵守Flume外掛程式開發規範,安裝方便,可以很方便的向DataHub發布/訂閱資料。
安裝Flume外掛程式
安裝限制
JDK版本在 1.8及以上版本。
Apache Maven 版本3.x。
Flume-NG 版本
1.x。
安裝Flume
下載Flume(如已下載,可跳過該步驟)
$ tar zxvf apache-flume-1.11.0-bin.tar.gz說明為後續方便描述,以下介紹以
${FLUME_HOME}表示Flume主目錄位置。安裝Flume-datahub。
直接安裝。
下載Flume-datahub外掛程式aliyun-flume-datahub-sink-2.0.9.tar.gz
解壓flume外掛程式並放在
${FLUME_HOME}/plugins.d目錄下$ tar aliyun-flume-datahub-sink-x.x.x.tar.gz $ cd aliyun-flume-datahub-sink-x.x.x $ mkdir ${FLUME_HOME}/plugins.d $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
源碼安裝。
編譯並安裝。
$ cd aliyun-maxcompute-data-collectors $ mvn clean package -DskipTests=true -Dmaven.javadoc.skip=true $ cd flume-plugin/target $ tar zxvf aliyun-flume-datahub-sink-x.x.x.tar.gz $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
參數介紹
sink參數介紹
Source 參數
案例介紹
Sink使用案例
案例一:DELIMITED serializer
案例二:REGEX serializer
案例三:Flume Taildir Source
案例四:JSON serializer
Source使用案例
讀取DataHub中資料至其它系統
Flume metric
DataHub-Flume 支援Flume的內建計數監控器,使用者可以利用監控器來監控自己的Flume外掛程式的運行情況。DataHub-Flume外掛程式的Sink和Source都支援metric資訊顯示,具體參數含義可查看下錶(只含DataHub相關的參數,更多參數含義參考:Flume官方文檔)。
DatahubSink
DatahubSource
Flume監控
Flume提供了多種監控方法,本文以HTTP監控為例,介紹Flume監控工具的使用,使用HTTP方式監控,只需要在Flume外掛程式啟動時增加兩個參數即可,-Dflume.monitoring.type=http -Dflume.monitoring.port=1234,其中type將監控方式指定為http,port為指定的連接埠號碼。使用樣本如下:
bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=1234外掛程式成功啟動之後,便可以登入Web介面進行查看。地址為 https://ip:1234/metrics
更多的監控方法可以參考Flume官方文檔 。
常見問題
flume啟動報錯org.apache.flume.ChannelFullException: Space for commit to queue couldn’t be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
flume預設堆記憶體20MB,配置的batchSize過大時,flume使用的堆記憶體會超出20MB。
解決方案1:調小batchSize。
解決方案2:調大flume最大堆記憶體。
$ vim bin/flume-ngJAV**A_OPTS**="-Xmx20m" ==> JAV**A_OPTS**="-Xmx1024m"
DataHub-Flume外掛程式是否支援JSON格式?
目前不支援,不過使用者可以通過自訂Regex進行資料解析,或者修改DataHub-Flume外掛程式代碼,添加JSONEvent進行支援。
DataHub-Flume外掛程式支援Blob Topic嗎?
目前DataHub-Flume外掛程式僅支援Tuple Topic,暫不支援blob。
flume 報錯 org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 1 full, consider committing more frequently, increasing capacity or increasing thread count
channel已滿,source資料寫入channel失敗。可以在設定檔中修改channel capacity解決,並且可以適當降低datahub source的batchSize。
使用舊版本flume時報錯,可能會因為jar包衝突導致無法正常啟動。
錯誤情境:使用flume1.6時,啟動時報錯:
java.lang.NoSuchMethodError:com.fasterxml.jackson.databind.ObjectMapper.readerFor(Lcom/fasterxml/jackson/databind/JavaType;)Lcom/fasterxml/jackson/databind/ObjectReader;。因為新版本的外掛程式依賴的jar包和flume本身依賴的jar包版本不一致,使用了flume依賴的舊版本jar包導致新版本的method找不到。如何處理:刪除${FLUME_HOME}/lib目錄下的三個jar包即可。
jackson-annotations-2.3.0.jarjackson-databind-2.3.1.jarjackson-annotations-2.3.0.jar
使用flume採集資料時,Null 字元串自動轉為null
在flume外掛程式2.0.2中對於非Null 字元串會做trim,Null 字元串直接轉為null。flume外掛程式2.0.3中已經最佳化掉,非Null 字元串寫入DataHub依舊為空白字串。
啟動報錯Cannot invoke "com.google.common.cache.LoadingCache.get(Object)" because"com.aliyun.datahub.client.impl.batch.avro.AvroSchemaCache.schemaCache" is null]
刪除Flume lib檔案夾中的guava 、zstd的 jar包檔案,重新啟動。