Flume 用 DataHub プラグインは、データのサブスクライブとパブリッシュを行うための DataHub プラグインです。このプラグインは Apache Flume をベースに開発されています。このプラグインを使用すると、収集したデータを DataHub に書き込んだり、DataHub からデータを読み取って他のシステムに書き込んだりできます。このプラグインは Apache Flume プラグインの開発規則に準拠しており、インストールも簡単です。このプラグインを使用すると、DataHub にデータをパブリッシュしたり、DataHub 内のデータをサブスクライブしたりできます。
Flume 用 DataHub プラグインのインストール
インストールの制限事項
Java 開発キット ( JDK ) のバージョンは 1.8 以降である必要があります。
Apache Maven のバージョンは 3.X である必要があります。
Flume-NG のバージョンは
1.X
である必要があります。
Flume 用 DataHub プラグインのインストール
Apache Flume をダウンロードします。Apache Flume がすでにダウンロードされている場合は、この手順をスキップします。
$ tar zxvf apache-flume-1.11.0-bin.tar.gz
説明説明を簡単にするために、以下の情報では
${FLUME_HOME}
を使用して Apache Flume のホームディレクトリを指定します。Flume 用 DataHub プラグインをインストールします。
Flume 用 DataHub プラグインを直接インストールします。
Flume 用 DataHub プラグインをダウンロードします。
パッケージから Flume 用 DataHub プラグインを抽出し、 ディレクトリに保存します。
${FLUME_HOME}/plugins.d
ディレクトリ。$ tar aliyun-flume-datahub-sink-x.x.x.tar.gz $ cd aliyun-flume-datahub-sink-x.x.x $ mkdir ${FLUME_HOME}/plugins.d $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
ソースコードを使用して Flume 用 DataHub プラグインをインストールします。
aliyun-maxcompute-data-collectors からソースコードをダウンロードします。
ソースコードをコンパイルしてプラグインをインストールします。
$ cd aliyun-maxcompute-data-collectors $ mvn clean package -DskipTests=true -Dmaven.javadoc.skip=true $ cd flume-plugin/target $ tar zxvf aliyun-flume-datahub-sink-x.x.x.tar.gz $ mv aliyun-flume-datahub-sink ${FLUME_HOME}/plugins.d
パラメーター
Sink 関連のパラメーター
Source 関連のパラメーター
ケースの説明
Sink のユースケース
ケース 1:DELIMITED シリアライザー
ケース 2:REGEX シリアライザー
ケース 3:Flume taildir ソース
ケース 4:JSON シリアライザー
Source のユースケース
DataHub から他のシステムへのデータの読み取り
Flume メトリック
Flume 用 DataHub プラグインは、Flume の組み込みカウントメトリックをサポートしています。メトリックに基づいてプラグインの操作を監視できます。Flume 用 DataHub プラグインのシンクとソースでは、異なるメトリックがサポートされています。次の表に、DataHub 関連のメトリックを示します。他のメトリックの詳細については、「使用可能なコンポーネントメトリック」をご参照ください。
DatahubSink
DatahubSource
Flume モニタリング
Apache Flume はさまざまなモニタリング方法を提供しています。このパートでは、HTTP モニタリングを有効にする方法について説明します。他のモニタリング方法の詳細については、「モニタリング」をご参照ください。HTTP モニタリングを有効にするには、Flume 用 DataHub プラグインを起動するときに次の 2 つのパラメーターを追加します。Dflume.monitoring.type=http and Dflume.monitoring.port=1234
。Dflume.monitoring.type パラメーターの値 http は HTTP モニタリングを示し、Dflume.monitoring.port パラメーターの値 1234 はポート番号を示します。次のコードは、プラグインを起動する例を示しています。
bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=1234
プラグインが起動したら、Web ページにログインしてメトリックを表示できます。URL は https://ip:1234/metrics です。
他のモニタリング方法の詳細については、「Flume 1.9.0 ユーザーガイド」をご参照ください。
FAQ
Flume 用 DataHub プラグインを起動すると「org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight」エラーが報告された場合はどうすればよいですか?
Flume 用 DataHub プラグインのデフォルトのヒープメモリは 20 MB です。一度に書き込むように指定されたレコードの数が多い場合、Flume 用 DataHub プラグインで使用されるヒープメモリが 20 MB を超えます。次のいずれかの解決策を使用して、問題を解決できます。
解決策 1:batchSize パラメーターの値を減らします。
解決策 2:Flume 用 DataHub プラグインの最大ヒープメモリを増やします。
$ vim bin/flume-ng
JAV**A_OPTS**="-Xmx20m" ==> JAV**A_OPTS**="-Xmx1024m"
Flume 用 DataHub プラグインは JSON 形式をサポートしていますか?
いいえ。ただし、カスタム正規表現を使用してデータを解析したり、Flume 用 DataHub プラグインのコードを変更して JSONEvent を追加して JSON 形式をサポートしたりできます。
Flume 用 DataHub プラグインは、データ型が BLOB のトピックをサポートしていますか?
いいえ。Flume 用 DataHub プラグインは、データ型が TUPLE のトピックのみをサポートしています。
Flume 用 DataHub プラグインが「org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 1 full, consider committing more frequently, increasing capacity or increasing thread count」エラーを報告するのはなぜですか?
Flume チャネルがいっぱいで、ソースが Flume チャネルにデータを書き込めません。構成ファイルのチャネル容量を変更し、batchSize パラメーターの値を適切に減らすことで、この問題を解決できます。
次の場合はどうすればよいですか?以前のバージョンの Apache Flume を使用するとエラーが発生します。JAR パッケージの競合により、Flume 用 DataHub プラグインを起動できません。
たとえば、Apache Flume V1.6 を使用すると、
java.lang.NoSuchMethodError:com.fasterxml.jackson.databind.ObjectMapper.readerFor(Lcom/fasterxml/jackson/databind/JavaType;)Lcom/fasterxml/jackson/databind/ObjectReader;
エラーが報告されます。新しいバージョンのプラグインと Apache Flume V1.6 は、異なるバージョンの JAR パッケージに依存しています。Apache Flume V1.6 は以前のバージョンの JAR パッケージに依存しているため、新しいバージョンのプラグインによって提供されるメソッドが見つかりません。${FLUME_HOME}/lib ディレクトリにある次の 3 つの JAR パッケージを削除します。
jackson-annotations-2.3.0.jar
jackson-databind-2.3.1.jar
jackson-annotations-2.3.0.jar
Flume 用 DataHub プラグインを使用してデータを収集すると、空の文字列が自動的に NULL に変換されるのはなぜですか?
Flume 用 DataHub プラグイン V2.0.2 では、trim() メソッドが空でない文字列に使用され、空の文字列は直接 NULL に変換されます。このロジックは、Flume 用 DataHub プラグイン V2.0.3 では削除されています。空の文字列は、DataHub に書き込まれた後、NULL に変換されずに保持されます。
「com.aliyun.datahub.client.impl.batch.avro.AvroSchemaCache.schemaCache」が null であるため、「com.google.common.cache.LoadingCache.get(Object)」を呼び出すことができません。com.google.common.ca「 com.aliyun.datahub.client.impl.batch.avro.AvroSchemaCache.schemaCache 」が null であるため、「 che. LoadingCache.get(Object) 」が発生しました。
Flume の lib ディレクトリにある guava.jar ファイルと zstd.jar ファイルを削除します。