When streaming jobs fail and restart, standard file writes can lose or duplicate records. Flink's resumable writing feature addresses this by writing data to OSS-HDFS with EXACTLY_ONCE semantics — no records are lost or duplicated even after a job failure.
Prerequisites
Before you begin, ensure that you have:
OSS-HDFS enabled for a bucket, with permissions granted to a RAM role. See Enable OSS-HDFS and grant access permissions
Permissions to connect E-MapReduce (EMR) clusters to OSS-HDFS. An Alibaba Cloud account has these permissions by default. If you use a RAM user, grant the required permissions first. See Grant a RAM user permissions to connect EMR clusters to OSS-HDFS
A replayable data source, such as Apache Kafka. EXACTLY_ONCE semantics requires the data source to support replay so that in-flight records can be re-emitted after a job restart. Data sources that do not support replay will produce lost or duplicate records after a failure
Write data to OSS-HDFS
Step 1: Enable checkpointing
Create a StreamExecutionEnvironment and enable EXACTLY_ONCE checkpointing:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);Checkpointing is required for EXACTLY_ONCE delivery. Without it, part files stay in the in-progress or pending state and downstream systems cannot safely read the data.
Step 2: Add a sink
Use an oss:// path to point the StreamingFileSink at your OSS-HDFS bucket. No extra dependencies are needed.
String outputPath = "oss://{user-defined-oss-hdfs-bucket.oss-hdfs-endpoint}/{user-defined-dir}";
StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
new Path(outputPath),
new SimpleStringEncoder<String>("UTF-8")
).build();
outputStream.addSink(sink);Replace the placeholders:
| Placeholder | Description | Example |
|---|---|---|
{user-defined-oss-hdfs-bucket} | Name of the OSS bucket with OSS-HDFS enabled | my-bucket |
.{oss-hdfs-endpoint} | OSS-HDFS endpoint suffix (optional) | .cn-hangzhou.oss-hdfs.aliyuncs.com |
{user-defined-dir} | Target directory path in the bucket | output/streaming |
The .{oss-hdfs-endpoint} suffix is optional. If you omit it, configure the OSS-HDFS endpoint in your Flink or Hadoop component settings instead.Step 3: Run the job
Call env.execute() to submit the Flink job.
(Optional) Configure entropy injection
Entropy injection improves write performance by replacing a specific string in the output path with a random character sequence, distributing writes across multiple storage partitions.
To enable it, set the following parameters when submitting your Flink job on YARN:
<flink_home>/bin/flink run -m yarn-cluster \
-yD oss.entropy.key=<user-defined-key> \
-yD oss.entropy.length=<user-defined-length> \
...| Placeholder | Description | Constraint |
|---|---|---|
<user-defined-key> | The string in the write path to replace with random characters | Must appear in the path string |
<user-defined-length> | Length of the random string that replaces the key | Must be greater than 0 |
When Flink creates a file, it replaces every occurrence of <user-defined-key> in the write path with a random string of length <user-defined-length>.