Fitur penulisan yang dapat dilanjutkan memungkinkan data ditulis ke media penyimpanan menggunakan semantik EXACTLY_ONCE. Topik ini menjelaskan cara menggunakan Apache Flink pada kluster E-MapReduce (EMR) untuk menulis data ke OSS-HDFS secara dapat dilanjutkan.
Prasyarat
OSS-HDFS diaktifkan untuk sebuah bucket dan izin diberikan kepada peran RAM untuk mengakses OSS-HDFS. Untuk informasi lebih lanjut, lihat Aktifkan OSS-HDFS dan Berikan Izin Akses.
Secara default, akun Alibaba Cloud memiliki izin untuk menghubungkan kluster EMR ke OSS-HDFS dan melakukan operasi umum terkait OSS-HDFS. Pengguna RAM dengan izin yang diperlukan telah dibuat. Jika Anda ingin menggunakan pengguna RAM untuk menghubungkan kluster EMR ke OSS-HDFS, pengguna RAM tersebut harus memiliki izin yang sesuai. Untuk informasi lebih lanjut, lihat Berikan Izin kepada Pengguna RAM untuk Menghubungkan Kluster EMR ke OSS-HDFS.
Contoh
Konfigurasi Umum
Untuk menulis data ke layanan OSS-HDFS menggunakan semantik EXACTLY_ONCE, Anda harus menyelesaikan konfigurasi berikut:
Aktifkan checkpoint untuk Flink.
Contoh:
Buat objek StreamExecutionEnvironment sebagai berikut.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Jalankan perintah berikut untuk mengaktifkan checkpoint.
env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
Gunakan sumber data yang dapat diputar ulang, seperti Kafka.
Penggunaan Sederhana
Tidak diperlukan dependensi tambahan. Untuk menulis data, gunakan jalur yang dimulai dengan awalan oss:// dan tentukan bucket serta Endpoint dari layanan OSS-HDFS.
Tambahkan sink.
Contoh berikut menunjukkan cara menulis objek DataStream<String> ke layanan OSS-HDFS.
String outputPath = "oss://{user-defined-oss-hdfs-bucket.oss-hdfs-endpoint}/{user-defined-dir}"; StreamingFileSink<String> sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder<String>("UTF-8") ).build(); outputStream.addSink(sink);PentingBagian
.{oss-hdfs-endpoint}dari jalur bersifat opsional. Jika Anda menghilangkan bagian ini, Anda harus mengonfigurasi Endpoint layanan OSS-HDFS dengan benar di komponen Flink atau Hadoop.Gunakan
env.execute()untuk mengeksekusi pekerjaan Flink.
(Opsional) Konfigurasi kustom
Saat Anda mengirimkan pekerjaan Flink, Anda dapat menyesuaikan parameter untuk mengaktifkan atau mengontrol fitur tertentu.
Contoh berikut menunjukkan cara menggunakan parameter -yD untuk mengirimkan pekerjaan Flink dalam mode yarn-cluster:
<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...Anda dapat mengaktifkan fitur Entropy Injection. Injeksi entropi mencocokkan string tertentu di jalur tulis dan menggantinya dengan string acak. Ini mengurangi hotspot data dan meningkatkan kinerja tulis.
Saat menulis ke OSS-HDFS, terapkan konfigurasi berikut.
oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>Saat Anda menulis file baru, string apa pun di jalur yang cocok dengan <user-defined-key> akan diganti dengan string acak. Panjang string acak ditentukan oleh <user-defined-length>. Nilai dari <user-defined-length> harus lebih besar dari 0.