すべてのプロダクト
Search
ドキュメントセンター

Object Storage Service:EMRクラスターでApache Flinkを使用してOSS-HDFSにデータを書き込む

最終更新日:Dec 20, 2023

再開可能な書き込み機能により、EXACTLY_ONCEセマンティクスを使用してデータをストレージメディアに書き込むことができます。 このトピックでは、E-MapReduce (EMR) クラスターでApache Flinkを使用して、再開可能な方法でOSS-HDFSにデータを書き込む方法について説明します。

前提条件

  • EMR 3.42.0以降、またはEMR 5.8.0以降のクラスターが作成されます。 詳細については、「クラスターの作成」をご参照ください。

  • バケットのOSS-HDFSが有効になり、OSS-HDFSにアクセスする権限が付与されます。 詳細については、「OSS-HDFSの有効化とアクセス許可の付与」をご参照ください。

Flinkジョブを使用してOSS-HDFSにデータを書き込む

  1. 一般的な設定を行います。

    1回限りのセマンティクスを使用してデータをOSS-HDFSに書き込むには、次の操作を実行する必要があります。

    1. Apache Flinkのチェックポイントメカニズムを有効にします。

      次のコマンドを実行して、StreamExecutionEnvironmentクラスを作成します。

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    2. 次のコマンドを実行してチェックポイントメカニズムを有効にします。
      env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
    3. Kafkaなど、データの再送信をサポートするデータソースを使用します。
  2. Flinkを使用するようにクイック設定を構成します。

    プレフィックスoss:// で始まるパスを含め、Flinkを有効にするOSS-HDFSにバケットとエンドポイントを使用できます。 このメソッドは、追加の依存関係を必要としません。

    1. シンクを追加します。

      次のサンプルコードは、OutputStreamを使用してDataStream<String> オブジェクトをOSS-HDFSに書き込む方法の例を示しています。

      String outputPath = "oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>"
      StreamingFileSink<String> sink = StreamingFileSink.forRowFormat (
              新しいパス (outputPath) 、
              新しいSimpleStringEncoder<String>("UTF-8")
      ).build();
      outputStream.addSink (シンク); 
      重要 前の例では、を含む要素です。<oss-hdfs-endpoint> はオプションです。 この要素を省略する場合は、FlinkコンポーネントまたはHadoopコンポーネントでOSS-HDFSの正しいエンドポイントを指定してください。
    2. Flinkジョブを実行するには、env.exe cute() を使用します。

(オプション) カスタム設定の構成

Flinkジョブを送信するときに、カスタムパラメーターを設定して、特定の機能を有効または管理できます。

次のサンプルコードは、-yDを使用してYARNクラスターに基づいてFlinkジョブ送信の設定を構成する方法の例を示しています。

<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2...

エントロピー注入を有効にできます。 エントロピー注入機能を使用すると、宛先パスの特定の文字列をランダムな文字列に置き換えることができます。 このように、書き込まれたデータは、指定されたパスに基づいて異なるパーティションに分散され、書き込みパフォーマンスが向上します。

OSS-HDFSにデータを書き込むには、次の設定を完了する必要があります。

oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length> 

オブジェクトにデータを書き込むと、書き込みパスの <user-defined-key> 文字列がランダムな文字列に置き換えられます。 ランダム文字列の長さは、<user-defined-length> の値と同じである必要があります。 <user-defined-length> の値は0より大きくなければなりません。