All Products
Search
Document Center

Object Storage Service:Write data to OSS-HDFS by using open source Flink

Last Updated:Sep 17, 2025

Open source Flink does not support streaming writes to the OSS-HDFS service with EXACTLY_ONCE semantics. You must use JindoSDK to enable this capability.

Note

If you do not want to deploy JindoSDK when you use Flink to stream data to the OSS-HDFS service, you can use Alibaba Cloud Realtime Compute for Flink to read from and write to the OSS-HDFS service. For more information, see Read data from or write data to OSS or OSS-HDFS using Realtime Compute for Flink.

Prerequisites

  • An ECS instance is created. For more information, see Create an ECS instance.

  • The OSS-HDFS service is enabled, and access is authorized. For instructions, see Enable the OSS-HDFS service.

  • Open source Flink 1.10.1 or later is downloaded and installed. Compatibility with Flink 1.16.0 and later has not been verified. For information about Apache Flink installation packages and versions, see Apache Flink.

Configure JindoSDK

  1. Log on to the ECS instance that you created. For more information, see Connect to an ECS instance.

  2. Download and decompress the latest JindoSDK JAR package. To download the package, see GitHub.

  3. Move the plugins/flink/jindo-flink-${version}-full.jar file from the decompressed JindoSDK package to the `lib` folder in the Flink root directory.

    mv plugins/flink/jindo-flink-${version}-full.jar lib/
Important
  • If the Flink OSS Connector from Apache Flink is present, you must remove it. To do this, delete the flink-oss-fs-hadoop-${flink-version}.jar file from the Flink lib folder or the plugins/oss-fs-hadoop path.

  • After you configure JindoSDK, no additional configuration is required to use it for Flink streaming jobs. Use the oss:// prefix to write to both the OSS-HDFS service and the OSS service. JindoSDK automatically detects the write destination.

Examples

  1. General configurations

    To write to OSS-HDFS with EXACTLY_ONCE semantics, apply the following configurations:

    1. Enable Flink checkpoints.

      Example:

      1. Create a StreamExecutionEnvironment as follows.

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      2. Run the following command to enable checkpoints.

        env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
    2. Use a replayable data source, such as Kafka.

  2. Simple usage

    No additional dependencies are required. To use Flink, specify a path that includes the oss:// prefix, the bucket, and the Endpoint of the OSS-HDFS service.

    1. Add a sink.

      The following example shows how to write a `DataStream<String>` object named `outputStream` to OSS-HDFS.

      String outputPath = "oss://{user-defined-oss-hdfs-bucket.oss-hdfs-endpoint}/{user-defined-dir}";
      StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
              new Path(outputPath),
              new SimpleStringEncoder<String>("UTF-8")
      ).build();
      outputStream.addSink(sink);
      Important

      The .{oss-hdfs-endpoint} field in the OSS-HDFS bucket name is optional. If you omit this field, ensure that the OSS-HDFS Endpoint is correctly configured in your Flink or Hadoop component.

    2. Use env.execute() to run the Flink job.

(Optional) Custom configurations

When you submit a Flink job, you can customize parameters to enable or control specific features.

The following example shows how to use the -yD parameter to submit a Flink job in yarn-cluster mode:

<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...

You can enable the Entropy Injection feature. Entropy injection matches a specific string in the write path and replaces it with a random string. This mitigates data hotspotting and improves write performance.

When writing to OSS-HDFS, apply the following configurations.

oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>

When you write a new file, any string in the path that matches <user-defined-key> is replaced with a random string. The length of the random string is specified by <user-defined-length>. The value of <user-defined-length> must be greater than 0.