全部产品
Search
文档中心

Object Storage Service:Gunakan Flink pada kluster EMR untuk menulis data ke layanan OSS-HDFS

更新时间:Nov 09, 2025

Fitur penulisan yang dapat dilanjutkan memungkinkan data ditulis ke media penyimpanan menggunakan semantik EXACTLY_ONCE. Topik ini menjelaskan cara menggunakan Apache Flink pada kluster E-MapReduce (EMR) untuk menulis data ke OSS-HDFS secara dapat dilanjutkan.

Prasyarat

  • OSS-HDFS diaktifkan untuk sebuah bucket dan izin diberikan kepada peran RAM untuk mengakses OSS-HDFS. Untuk informasi lebih lanjut, lihat Aktifkan OSS-HDFS dan Berikan Izin Akses.

  • Secara default, akun Alibaba Cloud memiliki izin untuk menghubungkan kluster EMR ke OSS-HDFS dan melakukan operasi umum terkait OSS-HDFS. Pengguna RAM dengan izin yang diperlukan telah dibuat. Jika Anda ingin menggunakan pengguna RAM untuk menghubungkan kluster EMR ke OSS-HDFS, pengguna RAM tersebut harus memiliki izin yang sesuai. Untuk informasi lebih lanjut, lihat Berikan Izin kepada Pengguna RAM untuk Menghubungkan Kluster EMR ke OSS-HDFS.

Contoh

  1. Konfigurasi Umum

    Untuk menulis data ke layanan OSS-HDFS menggunakan semantik EXACTLY_ONCE, Anda harus menyelesaikan konfigurasi berikut:

    1. Aktifkan checkpoint untuk Flink.

      Contoh:

      1. Buat objek StreamExecutionEnvironment sebagai berikut.

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      2. Jalankan perintah berikut untuk mengaktifkan checkpoint.

        env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
    2. Gunakan sumber data yang dapat diputar ulang, seperti Kafka.

  2. Penggunaan Sederhana

    Tidak diperlukan dependensi tambahan. Untuk menulis data, gunakan jalur yang dimulai dengan awalan oss:// dan tentukan bucket serta Endpoint dari layanan OSS-HDFS.

    1. Tambahkan sink.

      Contoh berikut menunjukkan cara menulis objek DataStream<String> ke layanan OSS-HDFS.

      String outputPath = "oss://{user-defined-oss-hdfs-bucket.oss-hdfs-endpoint}/{user-defined-dir}";
      StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
              new Path(outputPath),
              new SimpleStringEncoder<String>("UTF-8")
      ).build();
      outputStream.addSink(sink);
      Penting

      Bagian .{oss-hdfs-endpoint} dari jalur bersifat opsional. Jika Anda menghilangkan bagian ini, Anda harus mengonfigurasi Endpoint layanan OSS-HDFS dengan benar di komponen Flink atau Hadoop.

    2. Gunakan env.execute() untuk mengeksekusi pekerjaan Flink.

(Opsional) Konfigurasi kustom

Saat Anda mengirimkan pekerjaan Flink, Anda dapat menyesuaikan parameter untuk mengaktifkan atau mengontrol fitur tertentu.

Contoh berikut menunjukkan cara menggunakan parameter -yD untuk mengirimkan pekerjaan Flink dalam mode yarn-cluster:

<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...

Anda dapat mengaktifkan fitur Entropy Injection. Injeksi entropi mencocokkan string tertentu di jalur tulis dan menggantinya dengan string acak. Ini mengurangi hotspot data dan meningkatkan kinerja tulis.

Saat menulis ke OSS-HDFS, terapkan konfigurasi berikut.

oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>

Saat Anda menulis file baru, string apa pun di jalur yang cocok dengan <user-defined-key> akan diganti dengan string acak. Panjang string acak ditentukan oleh <user-defined-length>. Nilai dari <user-defined-length> harus lebih besar dari 0.