すべてのプロダクト
Search
ドキュメントセンター

Object Storage Service:EMR クラスター上の Flink を使用して OSS-HDFS サービスにデータを書き込む

最終更新日:Nov 09, 2025

再開可能な書き込み機能により、EXACTLY_ONCE セマンティクスを使用してストレージメディアにデータを書き込むことができます。このトピックでは、E-MapReduce (EMR) クラスター上の Apache Flink を使用して、再開可能な方法で OSS-HDFS にデータを書き込む方法について説明します。

前提条件

  • バケットで OSS-HDFS が有効になっており、RAM ロールに OSS-HDFS へのアクセス権限が付与されていること。詳細については、「OSS-HDFS を有効にしてアクセス権限を付与する」をご参照ください。

  • デフォルトでは、Alibaba Cloud アカウントには、EMR クラスターを OSS-HDFS に接続し、OSS-HDFS に関連する一般的な操作を実行する権限があります。必要な権限が付与された RAM ユーザーが作成されます。RAM ユーザーを使用して EMR クラスターを OSS-HDFS に接続する場合、その RAM ユーザーは必要な権限を持っている必要があります。詳細については、「RAM ユーザーに EMR クラスターを OSS-HDFS に接続する権限を付与する」をご参照ください。

  1. 一般的な構成

    EXACTLY_ONCE セマンティクスを使用して OSS-HDFS サービスにデータを書き込むには、次の構成を完了する必要があります:

    1. Flink のチェックポイントを有効にします。

      例:

      1. 次のように StreamExecutionEnvironment オブジェクトを作成します。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      2. 次のコマンドを実行してチェックポイントを有効にします。

        env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
    2. Kafka など、リプレイ可能なデータソースを使用します。

  2. 簡単な使用方法

    追加の依存関係は必要ありません。データを書き込むには、oss:// プレフィックスで始まるパスを使用し、OSS-HDFS サービスのバケットとエンドポイントを指定します。

    1. シンクを追加します。

      次の例は、DataStream<String> オブジェクトを OSS-HDFS サービスに書き込む方法を示しています。

      String outputPath = "oss://{user-defined-oss-hdfs-bucket.oss-hdfs-endpoint}/{user-defined-dir}";
      StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
              new Path(outputPath),
              new SimpleStringEncoder<String>("UTF-8")
      ).build();
      outputStream.addSink(sink);
      重要

      パスの .{oss-hdfs-endpoint} 部分はオプションです。この部分を省略する場合は、Flink または Hadoop コンポーネントで OSS-HDFS サービスのエンドポイントを正しく構成する必要があります。

    2. env.execute() を使用して Flink ジョブを実行します。

(オプション) カスタム構成

Flink ジョブを送信するときに、パラメーターをカスタマイズして特定の機能を有効化または制御できます。

次の例は、-yD パラメーターを使用して yarn-cluster モードで Flink ジョブを送信する方法を示しています:

<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...

エントロピーインジェクション機能を有効にできます。エントロピーインジェクションは、書き込みパス内の特定の文字列を照合し、それをランダムな文字列に置き換えます。これにより、データのホットスポットが緩和され、書き込みパフォーマンスが向上します。

OSS-HDFS に書き込むときは、次の構成を適用します。

oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>

新しいファイルを書き込むとき、パス内で <user-defined-key> に一致する文字列はランダムな文字列に置き換えられます。ランダムな文字列の長さは <user-defined-length> で指定されます。<user-defined-length> の値は 0 より大きくなければなりません。