All Products
Search
Document Center

Object Storage Service:Use Flink on an EMR cluster to write data to the OSS-HDFS service

Last Updated:Mar 20, 2026

When streaming jobs fail and restart, standard file writes can lose or duplicate records. Flink's resumable writing feature addresses this by writing data to OSS-HDFS with EXACTLY_ONCE semantics — no records are lost or duplicated even after a job failure.

Prerequisites

Before you begin, ensure that you have:

  • OSS-HDFS enabled for a bucket, with permissions granted to a RAM role. See Enable OSS-HDFS and grant access permissions

  • Permissions to connect E-MapReduce (EMR) clusters to OSS-HDFS. An Alibaba Cloud account has these permissions by default. If you use a RAM user, grant the required permissions first. See Grant a RAM user permissions to connect EMR clusters to OSS-HDFS

  • A replayable data source, such as Apache Kafka. EXACTLY_ONCE semantics requires the data source to support replay so that in-flight records can be re-emitted after a job restart. Data sources that do not support replay will produce lost or duplicate records after a failure

Write data to OSS-HDFS

Step 1: Enable checkpointing

Create a StreamExecutionEnvironment and enable EXACTLY_ONCE checkpointing:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
Important

Checkpointing is required for EXACTLY_ONCE delivery. Without it, part files stay in the in-progress or pending state and downstream systems cannot safely read the data.

Step 2: Add a sink

Use an oss:// path to point the StreamingFileSink at your OSS-HDFS bucket. No extra dependencies are needed.

String outputPath = "oss://{user-defined-oss-hdfs-bucket.oss-hdfs-endpoint}/{user-defined-dir}";
StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
        new Path(outputPath),
        new SimpleStringEncoder<String>("UTF-8")
).build();
outputStream.addSink(sink);

Replace the placeholders:

PlaceholderDescriptionExample
{user-defined-oss-hdfs-bucket}Name of the OSS bucket with OSS-HDFS enabledmy-bucket
.{oss-hdfs-endpoint}OSS-HDFS endpoint suffix (optional).cn-hangzhou.oss-hdfs.aliyuncs.com
{user-defined-dir}Target directory path in the bucketoutput/streaming
The .{oss-hdfs-endpoint} suffix is optional. If you omit it, configure the OSS-HDFS endpoint in your Flink or Hadoop component settings instead.

Step 3: Run the job

Call env.execute() to submit the Flink job.

(Optional) Configure entropy injection

Entropy injection improves write performance by replacing a specific string in the output path with a random character sequence, distributing writes across multiple storage partitions.

To enable it, set the following parameters when submitting your Flink job on YARN:

<flink_home>/bin/flink run -m yarn-cluster \
  -yD oss.entropy.key=<user-defined-key> \
  -yD oss.entropy.length=<user-defined-length> \
  ...
PlaceholderDescriptionConstraint
<user-defined-key>The string in the write path to replace with random charactersMust appear in the path string
<user-defined-length>Length of the random string that replaces the keyMust be greater than 0

When Flink creates a file, it replaces every occurrence of <user-defined-key> in the write path with a random string of length <user-defined-length>.

What's next