In a common scenario, Kafka data is imported to a data lake storage service, such as Alibaba Cloud Object Storage Service (OSS), in real time to reduce storage costs or perform query and analysis. In E-MapReduce (EMR) V3.37.1 and later, a Dataflow cluster provides built-in dependencies related to JindoFS. You can run a Flink job in a Dataflow cluster to write Kafka data to OSS in streaming mode by using exactly-once semantics. This topic describes how to write the code of a Flink job and run the job in a Dataflow cluster to meet the business requirements in the preceding scenario.

Prerequisites

  • EMR and OSS are activated.
  • The Alibaba Cloud account or RAM user that you want to use is granted the required permissions. For more information, see Assign roles.

Procedure

  1. Step 1: Prepare the environment
  2. Step 2: Prepare the JAR package
  3. Step 3: Create a Kafka topic and generate data
  4. Step 4: Run a Flink job
  5. Step 5: View the output

Step 1: Prepare the environment

  1. Create a Dataflow cluster that contains the Flink and Kafka services. For more information, see Create a cluster.
    Note In this topic, EMR V3.43.1 is used.
  2. Create an OSS bucket that resides in the same region as the Dataflow cluster. For more information, see Create buckets.

Step 2: Prepare the JAR package

  1. Download demo code. For information about demo code, see dataflow-demo.

    You can run a Flink job to write data to OSS in streaming mode based on JindoFS. The data write procedure is similar to that you write data to Hadoop Distributed File System (HDFS). The name of the path to which you write data must start with oss://. In the following example, Flink uses the StreamingFileSink method and exactly-once semantics to write Kafka data to OSS after checkpoints are enabled in Flink.

    The following sample code snippet provides an example on how to create a Kafka source and an OSS sink. You can download the complete code from GitHub. For more information, see dataflow-demo.
    Important If the OSS bucket and the Dataflow cluster in which JindoFS is deployed are created by using the same Alibaba Cloud account, JindoFS allows you to read data from and write data to the OSS bucket in password-free mode.
    public class OssDemoJob {
    
        public static void main(String[] args) throws Exception {
            ...
    
            // Check output oss dir
            Preconditions.checkArgument(
                    params.get(OUTPUT_OSS_DIR).startsWith("oss://"),
                    "outputOssDir should start with 'oss://'.");
    
            // Set up the streaming execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // Checkpoint is required
            env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
    
            String outputPath = params.get(OUTPUT_OSS_DIR);
    
            // Build Kafka source with new Source API based on FLIP-27
            KafkaSource<Event> kafkaSource =
                    KafkaSource.<Event>builder()
                            .setBootstrapServers(params.get(KAFKA_BROKERS_ARG))
                            .setTopics(params.get(INPUT_TOPIC_ARG))
                            .setStartingOffsets(OffsetsInitializer.latest())
                            .setGroupId(params.get(INPUT_TOPIC_GROUP_ARG))
                            .setDeserializer(new EventDeSerializationSchema())
                            .build();
            // DataStream Source
            DataStreamSource<Event> source =
                    env.fromSource(
                            kafkaSource,
                            WatermarkStrategy.<Event>forMonotonousTimestamps()
                                    .withTimestampAssigner((event, ts) -> event.getEventTime()),
                            "Kafka Source");
    
            StreamingFileSink<Event> sink =
                    StreamingFileSink.forRowFormat(
                                    new Path(outputPath), new SimpleStringEncoder<Event>("UTF-8"))
                            .withRollingPolicy(OnCheckpointRollingPolicy.build())
                            .build();
            source.addSink(sink);
    
            // Compile and submit the job
            env.execute();
        }
    }
    Note The sample code snippet provides the main sample program. You can modify the sample program based on your business requirements and proceed to compilation. For example, you can add a package name or change the checkpoint interval in the main sample program. For information about how to build a JAR package for a Flink job, see Flink official documentation.
  2. In the CLI, go to the root directory of the downloaded project file and run the following command to package the file:
    mvn clean package

    The dataflow-oss-demo-1.0-SNAPSHOT.jar package is displayed in the dataflow-demo/dataflow-oss-demo/target directory of the project based on the artifactId information in the pom.xml file.

Step 3: Create a Kafka topic and generate data

  1. Log on to the Dataflow cluster in SSH mode. For information about how to log on to an EMR cluster, see Log on to a cluster.
  2. Run the following command to create a Kafka topic for testing:
    kafka-topics.sh --create  --bootstrap-server core-1-1:9092 \
        --replication-factor 2  \
        --partitions 3  \
        --topic kafka-test-topic
    After the Kafka topic is created, the following information is returned in the CLI:
    Created topic kafka-test-topic.
  3. Write data to the Kafka topic.
    1. Run the following command in the CLI to go to the Kafka producer console:
      kafka-console-producer.sh --broker-list core-1-1:9092 --topic  kafka-test-topic
    2. Enter five pieces of test data.
      1,Ken,0,1,1662022777000
      1,Ken,0,2,1662022777000
      1,Ken,0,3,1662022777000
      1,Ken,0,4,1662022777000
      1,Ken,0,5,1662022777000
    3. Press Ctrl+C to exit from the Kafka producer console.

Step 4: Run a Flink job

  1. Log on to the Dataflow cluster in SSH mode. For more information, see Log on to a cluster.
  2. Upload the dataflow-oss-demo-1.0-SNAPSHOT.jar package to the root directory of the Dataflow cluster.
    Note In this example, the dataflow-oss-demo-1.0-SNAPSHOT.jar package is uploaded to the root directory. You can also specify an upload directory based on your business requirements.
  3. Run the following command to submit the Flink job.
    In this example, the Flink job is submitted in Per-Job mode. For information about how to submit the Flink job in other modes, see Basic usage.
    flink run -t yarn-per-job -d -c com.alibaba.ververica.dataflow.demo.oss.OssDemoJob \
        dataflow-oss-demo-1.0-SNAPSHOT.jar  \
        --outputOssDir oss://xung****-flink-dlf-test/oss_kafka_test \
        --kafkaBrokers core-1-1:9092 \
        --inputTopic kafka-test-topic \
        --inputTopicGroup my-group
    Parameter descriptions:
    • outputOssDir: the directory of the OSS bucket to which you want to write data.
    • kafkaBrokers: the specified Kafka broker in a Kafka cluster. Set the value to core-1-1:9092.
    • inputTopic: the Kafka topic from which you want to read data. Set the value to kafka-test-topic that you created in Step 3.
    • inputTopicGroup: the Kafka consumer group that you want to use. Set the value to my-group.
    result
    Run the following command to view the status of the job:
    flink list -t yarn-per-job -Dyarn.application.id=<appId>
    Note In the command, <appId> specifies the application ID returned after the job is successfully run. In this example, the application ID is application_1670236019397_0003.

Step 5: View the output

  • You can view the output in the OSS console after the job is successfully run.
    1. Log on to the OSS console.
    2. In the left-side navigation pane, click Buckets. On the Buckets page, click the name of the created bucket.
    3. On the Objects page, view the output in the specified output directory.
      Important The job is a streaming job that continuously runs and can generate a large number of output files. After you view the output, we recommend that you run the yarn application -kill <appId> command in the CLI to terminate the job at the earliest opportunity.
  • You can also run the hdfs dfs -cat oss://<YOUR_TARGET_BUCKET>/oss_kafka_test/<DATE_DIR>/part-0-0 command in the CLI of the Dataflow cluster to display the data that is stored in the OSS bucket. The following figure shows an example. OSS example
    Important
    • To ensure the exactly-once semantics, data files are stored in an OSS bucket each time a Flink job completes a checkpoint. In this example, the checkpoint interval is 30s.
    • The job is a streaming job that continuously runs and can generate a large number of output files. After you view the output, we recommend that you run the yarn application -kill <appId> command in the CLI to terminate the job at the earliest opportunity.