All Products
Search
Document Center

Object Storage Service:Use Flink on an EMR cluster to write data to the OSS-HDFS service

Last Updated:Sep 17, 2025

The resumable writing feature allows data to be written to storage media using EXACTLY_ONCE semantics. This topic describes how to use Apache Flink on an E-MapReduce (EMR) cluster to write data to OSS-HDFS in a resumable manner.

Prerequisites

  • OSS-HDFS is enabled for a bucket and permissions are granted to a RAM role to access OSS-HDFS. For more information, see Enable OSS-HDFS and grant access permissions.

  • By default, an Alibaba Cloud account has the permissions to connect EMR clusters to OSS-HDFS and perform common operations related to OSS-HDFS. A RAM user that is granted the required permissions is created. If you want to use a RAM user to connect EMR clusters to OSS-HDFS, the RAM user must have the required permissions. For more information, see Grant a RAM user permissions to connect EMR clusters to OSS-HDFS.

Example

  1. General configurations

    To write data to the OSS-HDFS service using EXACTLY_ONCE semantics, you must complete the following configurations:

    1. Enable checkpoints for Flink.

      Example:

      1. Create a StreamExecutionEnvironment object as follows.

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      2. Run the following command to enable checkpoints.

        env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
    2. Use a data source that can be replayed, such as Kafka.

  2. Simple usage

    No extra dependencies are required. To write data, use a path that starts with the oss:// prefix and specify the bucket and Endpoint of the OSS-HDFS service.

    1. Add a sink.

      The following example shows how to write a DataStream<String> object to the OSS-HDFS service.

      String outputPath = "oss://{user-defined-oss-hdfs-bucket.oss-hdfs-endpoint}/{user-defined-dir}";
      StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
              new Path(outputPath),
              new SimpleStringEncoder<String>("UTF-8")
      ).build();
      outputStream.addSink(sink);
      Important

      The .{oss-hdfs-endpoint} part of the path is optional. If you omit this part, you must correctly configure the Endpoint of the OSS-HDFS service in the Flink or Hadoop component.

    2. Use env.execute() to execute the Flink job.

(Optional) Configure custom settings

When you submit Flink jobs, you can configure custom parameters to enable or manage specific features.

The following sample code provides an example on how to use -yD to configure settings for Flink job submissions based on Yarn clusters.

<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...

You can enable entropy injection. The entropy injection feature allows you to replace a specific string of the destination path with a random string. This way, written data is distributed to different partitions based on the specified paths to improve write performance.

To write data to OSS-HDFS, you must complete the following configurations:

oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>

When you write data to an object, the <user-defined-key> string in the write path is replaced by a random string. The length of the random string must be the same as the value of <user-defined-length>. The value of <user-defined-length> must be greater than 0.