すべてのプロダクト
Search
ドキュメントセンター

Object Storage Service:オープンソース Flink を使用して OSS-HDFS にデータを書き込む

最終更新日:Nov 09, 2025

オープンソース Flink は、EXACTLY_ONCE セマンティクスでの OSS-HDFS サービスへのストリーミング書き込みをサポートしていません。この機能を有効にするには、JindoSDK を使用する必要があります。

説明

Flink を使用して OSS-HDFS サービスにデータをストリーミングする際に JindoSDK をデプロイしたくない場合は、Alibaba Cloud Realtime Compute for Flink を使用して OSS-HDFS サービスの読み書きができます。詳細については、「Realtime Compute for Flink を使用して OSS または OSS-HDFS からのデータの読み取り、または OSS または OSS-HDFS へのデータの書き込み」をご参照ください。

前提条件

  • ECS インスタンスが作成されていること。詳細については、「ECS インスタンスの作成」をご参照ください。

  • OSS-HDFS サービスが有効化され、アクセスが承認されていること。詳細については、「OSS-HDFS サービスを有効にする」をご参照ください。

  • オープンソース Flink 1.10.1 以降がダウンロードされ、インストールされていること。Flink 1.16.0 以降との互換性は検証されていません。Apache Flink のインストールパッケージとバージョンについては、「Apache Flink」をご参照ください。

JindoSDK の構成

  1. 作成した ECS インスタンスにログインします。詳細については、「ECS インスタンスへの接続」をご参照ください。

  2. 最新の JindoSDK JAR パッケージをダウンロードして解凍します。パッケージをダウンロードするには、「GitHub」をご参照ください。

  3. 解凍した JindoSDK パッケージから plugins/flink/jindo-flink-${version}-full.jar ファイルを Flink のルートディレクトリにある `lib` フォルダに移動します。

    mv plugins/flink/jindo-flink-${version}-full.jar lib/
重要
  • Apache Flink の Flink OSS Connector が存在する場合は、それを削除する必要があります。これを行うには、Flink の lib フォルダまたは plugins/oss-fs-hadoop パスから flink-oss-fs-hadoop-${flink-version}.jar ファイルを削除します。

  • JindoSDK を構成した後、Flink のストリーミングジョブで使用するための追加の構成は必要ありません。oss:// プレフィックスを使用して、OSS-HDFS サービスと OSS サービスの両方に書き込みます。JindoSDK は書き込み先を自動的に検出します。

  1. 一般的な構成

    EXACTLY_ONCE セマンティクスで OSS-HDFS に書き込むには、次の構成を適用します。

    1. Flink のチェックポイントを有効にします。

      例:

      1. 次のように StreamExecutionEnvironment を作成します。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      2. 次のコマンドを実行してチェックポイントを有効にします。

        env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
    2. Kafka などの再生可能なデータソースを使用します。

  2. 簡単な使用法

    追加の依存関係は必要ありません。Flink を使用するには、oss:// プレフィックス、バケット、および OSS-HDFS サービスのエンドポイントを含むパスを指定します。

    1. シンクを追加します。

      次の例は、`outputStream` という名前の `DataStream<String>` オブジェクトを OSS-HDFS に書き込む方法を示しています。

      String outputPath = "oss://{user-defined-oss-hdfs-bucket.oss-hdfs-endpoint}/{user-defined-dir}";
      StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
              new Path(outputPath),
              new SimpleStringEncoder<String>("UTF-8")
      ).build();
      outputStream.addSink(sink);
      重要

      OSS-HDFS バケット名の .{oss-hdfs-endpoint} フィールドはオプションです。このフィールドを省略する場合は、Flink または Hadoop コンポーネントで OSS-HDFS エンドポイントが正しく構成されていることを確認してください。

    2. env.execute() を使用して Flink ジョブを実行します。

(オプション) カスタム構成

Flink ジョブを送信するときに、パラメーターをカスタマイズして特定の機能を有効にしたり、制御したりできます。

次の例は、-yD パラメーターを使用して yarn-cluster モードで Flink ジョブを送信する方法を示しています。

<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...

エントロピーインジェクション機能を有効にできます。エントロピーインジェクションは、書き込みパス内の特定の文字列を照合し、それをランダムな文字列に置き換えます。これにより、データのホットスポット化が緩和され、書き込みパフォーマンスが向上します。

OSS-HDFS に書き込むときは、次の構成を適用します。

oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>

新しいファイルを書き込むとき、パス内で <user-defined-key> に一致する文字列はランダムな文字列に置き換えられます。ランダムな文字列の長さは <user-defined-length> で指定されます。<user-defined-length> の値は 0 より大きい必要があります。