再開可能な書き込み機能により、EXACTLY_ONCE セマンティクスを使用してストレージメディアにデータを書き込むことができます。このトピックでは、E-MapReduce (EMR) クラスター上の Apache Flink を使用して、再開可能な方法で OSS-HDFS にデータを書き込む方法について説明します。
前提条件
バケットで OSS-HDFS が有効になっており、RAM ロールに OSS-HDFS へのアクセス権限が付与されていること。詳細については、「OSS-HDFS を有効にしてアクセス権限を付与する」をご参照ください。
デフォルトでは、Alibaba Cloud アカウントには、EMR クラスターを OSS-HDFS に接続し、OSS-HDFS に関連する一般的な操作を実行する権限があります。必要な権限が付与された RAM ユーザーが作成されます。RAM ユーザーを使用して EMR クラスターを OSS-HDFS に接続する場合、その RAM ユーザーは必要な権限を持っている必要があります。詳細については、「RAM ユーザーに EMR クラスターを OSS-HDFS に接続する権限を付与する」をご参照ください。
例
一般的な構成
EXACTLY_ONCE セマンティクスを使用して OSS-HDFS サービスにデータを書き込むには、次の構成を完了する必要があります:
Flink のチェックポイントを有効にします。
例:
次のように StreamExecutionEnvironment オブジェクトを作成します。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();次のコマンドを実行してチェックポイントを有効にします。
env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
Kafka など、リプレイ可能なデータソースを使用します。
簡単な使用方法
追加の依存関係は必要ありません。データを書き込むには、oss:// プレフィックスで始まるパスを使用し、OSS-HDFS サービスのバケットとエンドポイントを指定します。
シンクを追加します。
次の例は、DataStream<String> オブジェクトを OSS-HDFS サービスに書き込む方法を示しています。
String outputPath = "oss://{user-defined-oss-hdfs-bucket.oss-hdfs-endpoint}/{user-defined-dir}"; StreamingFileSink<String> sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder<String>("UTF-8") ).build(); outputStream.addSink(sink);重要パスの
.{oss-hdfs-endpoint}部分はオプションです。この部分を省略する場合は、Flink または Hadoop コンポーネントで OSS-HDFS サービスのエンドポイントを正しく構成する必要があります。env.execute()を使用して Flink ジョブを実行します。
(オプション) カスタム構成
Flink ジョブを送信するときに、パラメーターをカスタマイズして特定の機能を有効化または制御できます。
次の例は、-yD パラメーターを使用して yarn-cluster モードで Flink ジョブを送信する方法を示しています:
<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...エントロピーインジェクション機能を有効にできます。エントロピーインジェクションは、書き込みパス内の特定の文字列を照合し、それをランダムな文字列に置き換えます。これにより、データのホットスポットが緩和され、書き込みパフォーマンスが向上します。
OSS-HDFS に書き込むときは、次の構成を適用します。
oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>新しいファイルを書き込むとき、パス内で <user-defined-key> に一致する文字列はランダムな文字列に置き換えられます。ランダムな文字列の長さは <user-defined-length> で指定されます。<user-defined-length> の値は 0 より大きくなければなりません。