オープンソース Flink は、EXACTLY_ONCE セマンティクスでの OSS-HDFS サービスへのストリーミング書き込みをサポートしていません。この機能を有効にするには、JindoSDK を使用する必要があります。
Flink を使用して OSS-HDFS サービスにデータをストリーミングする際に JindoSDK をデプロイしたくない場合は、Alibaba Cloud Realtime Compute for Flink を使用して OSS-HDFS サービスの読み書きができます。詳細については、「Realtime Compute for Flink を使用して OSS または OSS-HDFS からのデータの読み取り、または OSS または OSS-HDFS へのデータの書き込み」をご参照ください。
前提条件
ECS インスタンスが作成されていること。詳細については、「ECS インスタンスの作成」をご参照ください。
OSS-HDFS サービスが有効化され、アクセスが承認されていること。詳細については、「OSS-HDFS サービスを有効にする」をご参照ください。
オープンソース Flink 1.10.1 以降がダウンロードされ、インストールされていること。Flink 1.16.0 以降との互換性は検証されていません。Apache Flink のインストールパッケージとバージョンについては、「Apache Flink」をご参照ください。
JindoSDK の構成
作成した ECS インスタンスにログインします。詳細については、「ECS インスタンスへの接続」をご参照ください。
最新の JindoSDK JAR パッケージをダウンロードして解凍します。パッケージをダウンロードするには、「GitHub」をご参照ください。
解凍した JindoSDK パッケージから plugins/flink/jindo-flink-${version}-full.jar ファイルを Flink のルートディレクトリにある `lib` フォルダに移動します。
mv plugins/flink/jindo-flink-${version}-full.jar lib/
Apache Flink の Flink OSS Connector が存在する場合は、それを削除する必要があります。これを行うには、Flink の
libフォルダまたはplugins/oss-fs-hadoopパスからflink-oss-fs-hadoop-${flink-version}.jarファイルを削除します。JindoSDK を構成した後、Flink のストリーミングジョブで使用するための追加の構成は必要ありません。
oss://プレフィックスを使用して、OSS-HDFS サービスと OSS サービスの両方に書き込みます。JindoSDK は書き込み先を自動的に検出します。
例
一般的な構成
EXACTLY_ONCE セマンティクスで OSS-HDFS に書き込むには、次の構成を適用します。
Flink のチェックポイントを有効にします。
例:
次のように StreamExecutionEnvironment を作成します。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();次のコマンドを実行してチェックポイントを有効にします。
env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
Kafka などの再生可能なデータソースを使用します。
簡単な使用法
追加の依存関係は必要ありません。Flink を使用するには、oss:// プレフィックス、バケット、および OSS-HDFS サービスのエンドポイントを含むパスを指定します。
シンクを追加します。
次の例は、`outputStream` という名前の `DataStream<String>` オブジェクトを OSS-HDFS に書き込む方法を示しています。
String outputPath = "oss://{user-defined-oss-hdfs-bucket.oss-hdfs-endpoint}/{user-defined-dir}"; StreamingFileSink<String> sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder<String>("UTF-8") ).build(); outputStream.addSink(sink);重要OSS-HDFS バケット名の
.{oss-hdfs-endpoint}フィールドはオプションです。このフィールドを省略する場合は、Flink または Hadoop コンポーネントで OSS-HDFS エンドポイントが正しく構成されていることを確認してください。env.execute()を使用して Flink ジョブを実行します。
(オプション) カスタム構成
Flink ジョブを送信するときに、パラメーターをカスタマイズして特定の機能を有効にしたり、制御したりできます。
次の例は、-yD パラメーターを使用して yarn-cluster モードで Flink ジョブを送信する方法を示しています。
<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...エントロピーインジェクション機能を有効にできます。エントロピーインジェクションは、書き込みパス内の特定の文字列を照合し、それをランダムな文字列に置き換えます。これにより、データのホットスポット化が緩和され、書き込みパフォーマンスが向上します。
OSS-HDFS に書き込むときは、次の構成を適用します。
oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>新しいファイルを書き込むとき、パス内で <user-defined-key> に一致する文字列はランダムな文字列に置き換えられます。ランダムな文字列の長さは <user-defined-length> で指定されます。<user-defined-length> の値は 0 より大きい必要があります。