All Products
Search
Document Center

Object Storage Service:Use Apache Flink on an EMR cluster to write data to OSS-HDFS

Last Updated:Dec 06, 2023

The resumable writing feature allows data to be written to storage media by using EXACTLY_ONCE semantics. This topic describes how to use Apache Flink on an E-MapReduce (EMR) cluster to write data to OSS-HDFS in a resumable manner.

Prerequisites

Use Flink jobs to write data to OSS-HDFS

  1. Configure general settings.

    To write data to OSS-HDFS by using exactly-once semantics, you must perform the following operations:

    1. Enable the checkpoint mechanism of Apache Flink.

      Run the following command to create a StreamExecutionEnvironment class:

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    2. Run the following command to enable the checkpoint mechanism:
      env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
    3. Use a data source that supports data retransmission, such as Kafka.
  2. Configure quick settings to use Flink.

    You can include a path that starts with the oss:// prefix, and use buckets and endpoints in OSS-HDFS to enable Flink. This method does not require additional dependencies.

    1. Add a sink.

      The following sample code provides an example on how to write the DataStream<String> object to OSS-HDFS by using OutputStream:

      String outputPath = "oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>"
      StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
              new Path(outputPath),
              new SimpleStringEncoder<String>("UTF-8")
      ).build();
      outputStream.addSink(sink);
      Important In the preceding example, the element that includes .<oss-hdfs-endpoint> is optional. If you want to omit this element, make sure that you specify the correct endpoint of OSS-HDFS in the Flink component or Hadoop component.
    2. Use env.execute() to execute Flink jobs.

(Optional) Configure custom settings

When you submit Flink jobs, you can configure custom parameters to enable or manage specific features.

The following sample code provides an example on how to use -yD to configure settings for Flink job submissions based on YARN clusters:

<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...

You can enable entropy injection. The entropy injection feature allows you to replace a specific string of the destination path with a random string. This way, written data is distributed to different partitions based on the specified paths to improve write performance.

To write data to OSS-HDFS, you must complete the following configurations:

oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>

When you write data to an object, the <user-defined-key> string in the write path is replaced with a random string. The length of the random string must be the same as the value of <user-defined-length>. The value of <user-defined-length> must be greater than 0.