全部产品
Search
文档中心

Object Storage Service:Menulis data ke OSS-HDFS menggunakan Flink open source

更新时间:Nov 09, 2025

Flink open source tidak mendukung penulisan streaming ke layanan OSS-HDFS dengan semantik EXACTLY_ONCE. Anda harus menggunakan JindoSDK untuk mengaktifkan kemampuan ini.

Catatan

Jika Anda tidak ingin menerapkan JindoSDK saat menggunakan Flink untuk men-stream data ke layanan OSS-HDFS, Anda dapat menggunakan Alibaba Cloud Realtime Compute for Flink untuk membaca dari dan menulis ke layanan OSS-HDFS. Untuk informasi lebih lanjut, lihat Membaca Data dari atau Menulis Data ke OSS atau OSS-HDFS Menggunakan Realtime Compute for Flink.

Prasyarat

  • Sebuah Instance ECS telah dibuat. Untuk informasi lebih lanjut, lihat Buat Sebuah Instance ECS.

  • Layanan OSS-HDFS telah diaktifkan, dan akses telah diberikan. Untuk petunjuk, lihat Aktifkan Layanan OSS-HDFS.

  • Flink open source versi 1.10.1 atau yang lebih baru telah diunduh dan diinstal. Kompatibilitas dengan Flink 1.16.0 dan yang lebih baru belum diverifikasi. Untuk informasi tentang paket instalasi Apache Flink dan versinya, lihat Apache Flink.

Konfigurasikan JindoSDK

  1. Masuk ke instance ECS yang telah Anda buat. Untuk informasi lebih lanjut, lihat Hubungkan ke Instance ECS.

  2. Unduh dan ekstrak paket JAR JindoSDK terbaru. Untuk mengunduh paket, lihat GitHub.

  3. Pindahkan file plugins/flink/jindo-flink-${version}-full.jar dari paket JindoSDK yang diekstraksi ke folder `lib` di direktori root Flink.

    mv plugins/flink/jindo-flink-${version}-full.jar lib/
Penting
  • Jika Flink OSS Connector dari Apache Flink ada, Anda harus menghapusnya. Untuk melakukannya, hapus file flink-oss-fs-hadoop-${flink-version}.jar dari folder lib Flink atau jalur plugins/oss-fs-hadoop.

  • Setelah Anda mengonfigurasi JindoSDK, tidak diperlukan konfigurasi tambahan untuk menggunakannya pada pekerjaan streaming Flink. Gunakan awalan oss:// untuk menulis ke layanan OSS-HDFS dan layanan OSS. JindoSDK secara otomatis mendeteksi tujuan penulisan.

Contoh

  1. Konfigurasi Umum

    Untuk menulis ke OSS-HDFS dengan semantik EXACTLY_ONCE, terapkan konfigurasi berikut:

    1. Aktifkan checkpoint Flink.

      Contoh:

      1. Buat StreamExecutionEnvironment sebagai berikut.

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      2. Jalankan perintah berikut untuk mengaktifkan checkpoint.

        env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
    2. Gunakan sumber data yang dapat diputar ulang, seperti Kafka.

  2. Penggunaan Sederhana

    Tidak diperlukan dependensi tambahan. Untuk menggunakan Flink, tentukan jalur yang mencakup awalan oss://, bucket, dan Endpoint dari layanan OSS-HDFS.

    1. Tambahkan sink.

      Contoh berikut menunjukkan cara menulis objek `DataStream<String>` bernama `outputStream` ke OSS-HDFS.

      String outputPath = "oss://{user-defined-oss-hdfs-bucket.oss-hdfs-endpoint}/{user-defined-dir}";
      StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
              new Path(outputPath),
              new SimpleStringEncoder<String>("UTF-8")
      ).build();
      outputStream.addSink(sink);
      Penting

      Bidang .{oss-hdfs-endpoint} dalam nama bucket OSS-HDFS bersifat opsional. Jika Anda menghilangkan bidang ini, pastikan bahwa Endpoint OSS-HDFS dikonfigurasi dengan benar di komponen Flink atau Hadoop Anda.

    2. Gunakan env.execute() untuk menjalankan pekerjaan Flink.

(Opsional) Konfigurasi kustom

Saat Anda mengirimkan pekerjaan Flink, Anda dapat menyesuaikan parameter untuk mengaktifkan atau mengontrol fitur tertentu.

Contoh berikut menunjukkan cara menggunakan parameter -yD untuk mengirimkan pekerjaan Flink dalam mode yarn-cluster:

<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...

Anda dapat mengaktifkan fitur Entropy Injection. Entropy injection mencocokkan string tertentu di jalur penulisan dan menggantinya dengan string acak. Ini mengurangi hotspotting data dan meningkatkan kinerja penulisan.

Saat menulis ke OSS-HDFS, terapkan konfigurasi berikut.

oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>

Saat Anda menulis file baru, string apa pun di jalur yang cocok dengan <user-defined-key> akan diganti dengan string acak. Panjang string acak ditentukan oleh <user-defined-length>. Nilai dari <user-defined-length> harus lebih besar dari 0.