This tutorial shows you how to run a Flink streaming job on an E-MapReduce (EMR) Dataflow cluster that reads from a Kafka topic and writes to Object Storage Service (OSS) with exactly-once semantics. The job uses JindoFS — available in Dataflow clusters from EMR V3.37.1 — to write to OSS using oss:// paths, the same way you write to Hadoop Distributed File System (HDFS).
Prerequisites
Before you begin, make sure you have:
-
EMR and OSS activated on your Alibaba Cloud account
-
The required permissions granted to your Alibaba Cloud account or RAM user. For details, see Assign roles
How it works
The Flink job uses StreamingFileSink with OnCheckpointRollingPolicy. Under this policy, output files are committed to OSS only when a checkpoint completes — not continuously. With a 30-second checkpoint interval, this means new output files appear roughly every 30 seconds. Because this is a streaming job, it runs continuously and accumulates output files until you stop it.
JindoFS handles OSS writes. If the OSS bucket and the Dataflow cluster belong to the same Alibaba Cloud account, JindoFS reads from and writes to the bucket in password-free mode — no credentials are needed in the job code.
Because output files accumulate with every checkpoint, stop the job after you verify the output. See "Stop the job" in Step 5.
Step 1: Prepare the environment
This tutorial uses EMR V3.43.1.
-
Create a Dataflow cluster with the Flink and Kafka services. For details, see Create a cluster.
-
Create an OSS bucket in the same region as the Dataflow cluster. For details, see Create buckets.
Step 2: Prepare the JAR package
The sample code creates a Kafka source using the FLIP-27 source API and an OSS sink backed by StreamingFileSink. The output path must start with oss://. Checkpointing is enabled with CheckpointingMode.EXACTLY_ONCE and a 30-second interval — this is what drives the OnCheckpointRollingPolicy and determines how frequently files are committed.
public class OssDemoJob {
public static void main(String[] args) throws Exception {
...
// Check output oss dir
Preconditions.checkArgument(
params.get(OUTPUT_OSS_DIR).startsWith("oss://"),
"outputOssDir should start with 'oss://'.");
// Set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Checkpoint is required
env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
String outputPath = params.get(OUTPUT_OSS_DIR);
// Build Kafka source with new Source API based on FLIP-27
KafkaSource<Event> kafkaSource =
KafkaSource.<Event>builder()
.setBootstrapServers(params.get(KAFKA_BROKERS_ARG))
.setTopics(params.get(INPUT_TOPIC_ARG))
.setStartingOffsets(OffsetsInitializer.latest())
.setGroupId(params.get(INPUT_TOPIC_GROUP_ARG))
.setDeserializer(new EventDeSerializationSchema())
.build();
// DataStream Source
DataStreamSource<Event> source =
env.fromSource(
kafkaSource,
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, ts) -> event.getEventTime()),
"Kafka Source");
StreamingFileSink<Event> sink =
StreamingFileSink.forRowFormat(
new Path(outputPath), new SimpleStringEncoder<Event>("UTF-8"))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
source.addSink(sink);
// Compile and submit the job
env.execute();
}
}
This snippet shows the main program structure. Modify it as needed — for example, add a package name or adjust the checkpoint interval — before building. For instructions on building a JAR package, see the Flink official documentation.
Download the complete source from dataflow-demo, or use the pre-built dataflow-oss-demo-1.0-SNAPSHOT.jar directly.
To build the JAR yourself:
-
Go to the root directory of the downloaded project.
-
Run the following command:
mvn clean packageThe output JAR
dataflow-oss-demo-1.0-SNAPSHOT.jaris placed indataflow-demo/dataflow-oss-demo/target/based on theartifactIdinpom.xml.
Step 3: Create a Kafka topic and generate data
-
Log in to the Dataflow cluster over SSH. For details, see Log on to a cluster.
-
Create a Kafka topic for testing:
kafka-topics.sh --create --bootstrap-server core-1-1:9092 \ --replication-factor 2 \ --partitions 3 \ --topic kafka-test-topicOn success, the CLI returns:
Created topic kafka-test-topic. -
Write test data to the topic.
-
Open the Kafka producer console:
kafka-console-producer.sh --broker-list core-1-1:9092 --topic kafka-test-topic -
Enter the following five records:
1,Ken,0,1,1662022777000 1,Ken,0,2,1662022777000 1,Ken,0,3,1662022777000 1,Ken,0,4,1662022777000 1,Ken,0,5,1662022777000 -
Press Ctrl+C to exit the Kafka producer console.
-
Step 4: Run a Flink job
-
Log in to the Dataflow cluster over SSH. For details, see Log on to a cluster.
-
Upload
dataflow-oss-demo-1.0-SNAPSHOT.jarto the root directory of the Dataflow cluster.NoteThis example uploads to the root directory. Choose a different directory if needed.
-
Submit the Flink job in Per-Job mode:
flink run -t yarn-per-job -d -c com.alibaba.ververica.dataflow.demo.oss.OssDemoJob \ /dataflow-oss-demo-1.0-SNAPSHOT.jar \ --outputOssDir oss://xung****-flink-dlf-test/oss_kafka_test \ --kafkaBrokers core-1-1:9092 \ --inputTopic kafka-test-topic \ --inputTopicGroup my-groupParameter Description Valid values Value in this example --outputOssDirOSS path to write data to. Must start with oss://.Any valid OSS path beginning with oss://oss://xung****-flink-dlf-test/oss_kafka_test--kafkaBrokersKafka broker address host:portcore-1-1:9092--inputTopicKafka topic to read from Any existing topic name kafka-test-topic--inputTopicGroupKafka consumer group Any valid consumer group name my-groupFor other submission modes, see Basic usage.

-
Verify the job is running:
flink list -t yarn-per-job -Dyarn.application.id=<appId>Replace
<appId>with the application ID returned after job submission. In this example, the application ID isapplication_1670236019397_0003.
Step 5: View the output
The job commits output files on every checkpoint (every 30 seconds). View the output, then stop the job.
View output in the OSS console:
-
Log in to the OSS console.
-
In the left-side navigation pane, click Buckets. On the Buckets page, click the name of the bucket you created.
-
On the Objects page, navigate to the output directory you specified in
--outputOssDir.
View output from the CLI:
Run the following command on the Dataflow cluster:
hdfs dfs -cat oss://<YOUR_TARGET_BUCKET>/oss_kafka_test/<DATE_DIR>/part-0-0
Stop the job:
After verifying the output, stop the job to prevent further file accumulation:
yarn application -kill <appId>