All Products
Search
Document Center

E-MapReduce:Stream Kafka data to Alibaba Cloud OSS using Flink

Last Updated:Mar 26, 2026

This tutorial shows you how to run a Flink streaming job on an E-MapReduce (EMR) Dataflow cluster that reads from a Kafka topic and writes to Object Storage Service (OSS) with exactly-once semantics. The job uses JindoFS — available in Dataflow clusters from EMR V3.37.1 — to write to OSS using oss:// paths, the same way you write to Hadoop Distributed File System (HDFS).

Prerequisites

Before you begin, make sure you have:

  • EMR and OSS activated on your Alibaba Cloud account

  • The required permissions granted to your Alibaba Cloud account or RAM user. For details, see Assign roles

How it works

The Flink job uses StreamingFileSink with OnCheckpointRollingPolicy. Under this policy, output files are committed to OSS only when a checkpoint completes — not continuously. With a 30-second checkpoint interval, this means new output files appear roughly every 30 seconds. Because this is a streaming job, it runs continuously and accumulates output files until you stop it.

JindoFS handles OSS writes. If the OSS bucket and the Dataflow cluster belong to the same Alibaba Cloud account, JindoFS reads from and writes to the bucket in password-free mode — no credentials are needed in the job code.

Important

Because output files accumulate with every checkpoint, stop the job after you verify the output. See "Stop the job" in Step 5.

Step 1: Prepare the environment

Note

This tutorial uses EMR V3.43.1.

  1. Create a Dataflow cluster with the Flink and Kafka services. For details, see Create a cluster.

  2. Create an OSS bucket in the same region as the Dataflow cluster. For details, see Create buckets.

Step 2: Prepare the JAR package

The sample code creates a Kafka source using the FLIP-27 source API and an OSS sink backed by StreamingFileSink. The output path must start with oss://. Checkpointing is enabled with CheckpointingMode.EXACTLY_ONCE and a 30-second interval — this is what drives the OnCheckpointRollingPolicy and determines how frequently files are committed.

public class OssDemoJob {

    public static void main(String[] args) throws Exception {
        ...

        // Check output oss dir
        Preconditions.checkArgument(
                params.get(OUTPUT_OSS_DIR).startsWith("oss://"),
                "outputOssDir should start with 'oss://'.");

        // Set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Checkpoint is required
        env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);

        String outputPath = params.get(OUTPUT_OSS_DIR);

        // Build Kafka source with new Source API based on FLIP-27
        KafkaSource<Event> kafkaSource =
                KafkaSource.<Event>builder()
                        .setBootstrapServers(params.get(KAFKA_BROKERS_ARG))
                        .setTopics(params.get(INPUT_TOPIC_ARG))
                        .setStartingOffsets(OffsetsInitializer.latest())
                        .setGroupId(params.get(INPUT_TOPIC_GROUP_ARG))
                        .setDeserializer(new EventDeSerializationSchema())
                        .build();
        // DataStream Source
        DataStreamSource<Event> source =
                env.fromSource(
                        kafkaSource,
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner((event, ts) -> event.getEventTime()),
                        "Kafka Source");

        StreamingFileSink<Event> sink =
                StreamingFileSink.forRowFormat(
                                new Path(outputPath), new SimpleStringEncoder<Event>("UTF-8"))
                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
                        .build();
        source.addSink(sink);

        // Compile and submit the job
        env.execute();
    }
}
Note

This snippet shows the main program structure. Modify it as needed — for example, add a package name or adjust the checkpoint interval — before building. For instructions on building a JAR package, see the Flink official documentation.

Download the complete source from dataflow-demo, or use the pre-built dataflow-oss-demo-1.0-SNAPSHOT.jar directly.

To build the JAR yourself:

  1. Go to the root directory of the downloaded project.

  2. Run the following command:

    mvn clean package

    The output JAR dataflow-oss-demo-1.0-SNAPSHOT.jar is placed in dataflow-demo/dataflow-oss-demo/target/ based on the artifactId in pom.xml.

Step 3: Create a Kafka topic and generate data

  1. Log in to the Dataflow cluster over SSH. For details, see Log on to a cluster.

  2. Create a Kafka topic for testing:

    kafka-topics.sh --create  --bootstrap-server core-1-1:9092 \
        --replication-factor 2  \
        --partitions 3  \
        --topic kafka-test-topic

    On success, the CLI returns:

    Created topic kafka-test-topic.
  3. Write test data to the topic.

    1. Open the Kafka producer console:

      kafka-console-producer.sh --broker-list core-1-1:9092 --topic  kafka-test-topic
    2. Enter the following five records:

      1,Ken,0,1,1662022777000
      1,Ken,0,2,1662022777000
      1,Ken,0,3,1662022777000
      1,Ken,0,4,1662022777000
      1,Ken,0,5,1662022777000
    3. Press Ctrl+C to exit the Kafka producer console.

Step 4: Run a Flink job

  1. Log in to the Dataflow cluster over SSH. For details, see Log on to a cluster.

  2. Upload dataflow-oss-demo-1.0-SNAPSHOT.jar to the root directory of the Dataflow cluster.

    Note

    This example uploads to the root directory. Choose a different directory if needed.

  3. Submit the Flink job in Per-Job mode:

    flink run -t yarn-per-job -d -c com.alibaba.ververica.dataflow.demo.oss.OssDemoJob \
        /dataflow-oss-demo-1.0-SNAPSHOT.jar  \
        --outputOssDir oss://xung****-flink-dlf-test/oss_kafka_test \
        --kafkaBrokers core-1-1:9092 \
        --inputTopic kafka-test-topic \
        --inputTopicGroup my-group
    Parameter Description Valid values Value in this example
    --outputOssDir OSS path to write data to. Must start with oss://. Any valid OSS path beginning with oss:// oss://xung****-flink-dlf-test/oss_kafka_test
    --kafkaBrokers Kafka broker address host:port core-1-1:9092
    --inputTopic Kafka topic to read from Any existing topic name kafka-test-topic
    --inputTopicGroup Kafka consumer group Any valid consumer group name my-group

    For other submission modes, see Basic usage.

    result

  4. Verify the job is running:

    flink list -t yarn-per-job -Dyarn.application.id=<appId>

    Replace <appId> with the application ID returned after job submission. In this example, the application ID is application_1670236019397_0003.

Step 5: View the output

The job commits output files on every checkpoint (every 30 seconds). View the output, then stop the job.

View output in the OSS console:

  1. Log in to the OSS console.

  2. In the left-side navigation pane, click Buckets. On the Buckets page, click the name of the bucket you created.

  3. On the Objects page, navigate to the output directory you specified in --outputOssDir.

    OSS results

View output from the CLI:

Run the following command on the Dataflow cluster:

hdfs dfs -cat oss://<YOUR_TARGET_BUCKET>/oss_kafka_test/<DATE_DIR>/part-0-0
OSS示例

Stop the job:

After verifying the output, stop the job to prevent further file accumulation:

yarn application -kill <appId>