Dalam skenario umum, data Kafka diimpor ke layanan penyimpanan danau data seperti Alibaba Cloud Object Storage Service (OSS) secara real-time untuk mengurangi biaya penyimpanan atau melakukan kueri dan analisis. Di E-MapReduce (EMR) V3.37.1 dan versi lebih baru, kluster Dataflow menyediakan dependensi bawaan terkait JindoFS. Anda dapat menjalankan pekerjaan Flink di kluster Dataflow untuk menulis data Kafka ke OSS dalam mode streaming menggunakan semantik exactly-once. Topik ini menjelaskan cara menulis kode pekerjaan Flink dan menjalankan pekerjaan tersebut di kluster Dataflow untuk memenuhi persyaratan bisnis dalam skenario yang disebutkan.
Prasyarat
- EMR dan OSS telah diaktifkan.
- Akun Alibaba Cloud atau pengguna RAM yang ingin digunakan diberikan izin yang diperlukan. Untuk informasi lebih lanjut, lihat Tetapkan peran.
Prosedur
Langkah 1: Siapkan lingkungan
Buat kluster Dataflow yang berisi layanan Flink dan Kafka. Untuk informasi lebih lanjut, lihat Buat kluster.
CatatanDalam topik ini, EMR V3.43.1 digunakan.
Buat Bucket OSS yang berada di wilayah yang sama dengan kluster Dataflow. Untuk informasi lebih lanjut, lihat Buat bucket.
Langkah 2: Siapkan paket JAR
Unduh kode demo. Untuk informasi tentang kode demo, lihat dataflow-demo.
Anda dapat menjalankan pekerjaan Flink untuk menulis data ke OSS dalam mode streaming berdasarkan JindoFS. Prosedur penulisan data serupa dengan saat Anda menulis data ke Hadoop Distributed File System (HDFS). Nama jalur tempat Anda menulis data harus dimulai dengan oss://. Dalam contoh berikut, Flink menggunakan metode StreamingFileSink dan semantik exactly-once untuk menulis data Kafka ke OSS setelah checkpoint diaktifkan di Flink.
Potongan kode sampel berikut memberikan contoh cara membuat sumber Kafka dan sink OSS. Anda dapat mengunduh kode lengkap dari GitHub. Untuk informasi lebih lanjut, lihat dataflow-demo.
PentingJika Bucket OSS dan kluster Dataflow tempat JindoFS diterapkan dibuat menggunakan akun Alibaba Cloud yang sama, JindoFS memungkinkan Anda membaca data dari dan menulis data ke Bucket OSS dalam mode tanpa kata sandi.
public class OssDemoJob { public static void main(String[] args) throws Exception { ... // Periksa direktori keluaran oss Preconditions.checkArgument( params.get(OUTPUT_OSS_DIR).startsWith("oss://"), "outputOssDir harus dimulai dengan 'oss://'."); // Atur lingkungan eksekusi streaming final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Checkpoint diperlukan env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE); String outputPath = params.get(OUTPUT_OSS_DIR); // Bangun sumber Kafka dengan API Sumber baru berdasarkan FLIP-27 KafkaSource<Event> kafkaSource = KafkaSource.<Event>builder() .setBootstrapServers(params.get(KAFKA_BROKERS_ARG)) .setTopics(params.get(INPUT_TOPIC_ARG)) .setStartingOffsets(OffsetsInitializer.latest()) .setGroupId(params.get(INPUT_TOPIC_GROUP_ARG)) .setDeserializer(new EventDeSerializationSchema()) .build(); // Sumber DataStream DataStreamSource<Event> source = env.fromSource( kafkaSource, WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner((event, ts) -> event.getEventTime()), "Sumber Kafka"); StreamingFileSink<Event> sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder<Event>("UTF-8")) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .build(); source.addSink(sink); // Kompilasi dan kirimkan pekerjaan env.execute(); } }CatatanPotongan kode sampel menyediakan program sampel utama. Anda dapat memodifikasi program sampel berdasarkan kebutuhan bisnis Anda dan melanjutkan ke kompilasi. Misalnya, Anda dapat menambahkan nama paket atau mengubah interval checkpoint dalam program sampel utama. Untuk informasi tentang cara membangun paket JAR untuk pekerjaan Flink, lihat Dokumentasi resmi Flink. Jika Anda tidak perlu memodifikasi program sampel, Anda dapat langsung menggunakan paket dataflow-oss-demo-1.0-SNAPSHOT.jar untuk melakukan operasi selanjutnya.
Di CLI, masuk ke direktori root file proyek yang diunduh dan jalankan perintah berikut untuk mengemas file:
mvn clean packagePaket dataflow-oss-demo-1.0-SNAPSHOT.jar ditampilkan di direktori dataflow-demo/dataflow-oss-demo/target proyek berdasarkan informasi artifactId di file pom.xml.
Langkah 3: Buat topik Kafka dan hasilkan data
Masuk ke kluster Dataflow dalam mode SSH. Untuk informasi tentang cara masuk ke kluster EMR, lihat Masuk ke kluster.
Jalankan perintah berikut untuk membuat topik Kafka untuk pengujian:
kafka-topics.sh --create --bootstrap-server core-1-1:9092 \ --replication-factor 2 \ --partitions 3 \ --topic kafka-test-topicSetelah topik Kafka dibuat, informasi berikut dikembalikan di CLI:
Created topic kafka-test-topic.Tulis data ke topik Kafka.
Jalankan perintah berikut di CLI untuk masuk ke konsol produser Kafka:
kafka-console-producer.sh --broker-list core-1-1:9092 --topic kafka-test-topicMasukkan lima potong data uji.
1,Ken,0,1,1662022777000 1,Ken,0,2,1662022777000 1,Ken,0,3,1662022777000 1,Ken,0,4,1662022777000 1,Ken,0,5,1662022777000Tekan Ctrl+C untuk keluar dari konsol produser Kafka.
Langkah 4: Jalankan pekerjaan Flink
Masuk ke kluster Dataflow dalam mode SSH. Untuk informasi tentang cara masuk ke kluster EMR, lihat Masuk ke kluster.
Unggah paket dataflow-oss-demo-1.0-SNAPSHOT.jar ke direktori root kluster Dataflow.
CatatanDalam contoh ini, paket dataflow-oss-demo-1.0-SNAPSHOT.jar diunggah ke direktori root. Anda juga dapat menentukan direktori unggah berdasarkan kebutuhan bisnis Anda.
Jalankan perintah berikut untuk mengirimkan pekerjaan Flink:
Dalam contoh ini, pekerjaan Flink dikirimkan dalam mode Per-Job. Untuk informasi tentang cara mengirimkan pekerjaan Flink dalam mode lain, lihat Penggunaan dasar.
flink run -t yarn-per-job -d -c com.alibaba.ververica.dataflow.demo.oss.OssDemoJob \ /dataflow-oss-demo-1.0-SNAPSHOT.jar \ --outputOssDir oss://xung****-flink-dlf-test/oss_kafka_test \ --kafkaBrokers core-1-1:9092 \ --inputTopic kafka-test-topic \ --inputTopicGroup my-groupDeskripsi parameter:
outputOssDir: direktori Bucket OSS tempat Anda ingin menulis data.
kafkaBrokers: broker Kafka yang ditentukan dalam kluster Kafka. Setel nilai menjadi
core-1-1:9092.inputTopic: topik Kafka tempat Anda ingin membaca data. Setel nilai menjadi
kafka-test-topicyang Anda buat di Langkah 3.inputTopicGroup: grup konsumen Kafka yang ingin Anda gunakan. Setel nilai menjadi
my-group.

Jalankan perintah berikut untuk melihat status pekerjaan:
flink list -t yarn-per-job -Dyarn.application.id=<appId>CatatanDalam perintah,
<appId>menentukan ID aplikasi yang dikembalikan setelah pekerjaan berhasil dijalankan. Dalam contoh ini, ID aplikasi adalah application_1670236019397_0003.
Langkah 5: Lihat output
Anda dapat melihat output di konsol OSS setelah pekerjaan berhasil dijalankan.
Masuk ke OSS console.
Di panel navigasi kiri, klik Bucket. Di halaman Bucket, klik nama bucket yang telah dibuat.
Di halaman Objek, lihat output di direktori keluaran yang ditentukan.
PentingPekerjaan ini adalah pekerjaan streaming yang terus berjalan dan dapat menghasilkan sejumlah besar file keluaran. Setelah Anda melihat output, kami sarankan Anda menjalankan perintah
yarn application -kill <appId>di CLI untuk menghentikan pekerjaan sesegera mungkin.
Anda juga dapat menjalankan perintah
hdfs dfs -cat oss://<YOUR_TARGET_BUCKET>/oss_kafka_test/<DATE_DIR>/part-0-0di CLI kluster Dataflow untuk menampilkan data yang disimpan di Bucket OSS. Gambar berikut menunjukkan contohnya.
PentingUntuk memastikan semantik exactly-once, file data disimpan di Bucket OSS setiap kali pekerjaan Flink menyelesaikan checkpoint. Dalam contoh ini, interval checkpoint adalah 30 detik.
Pekerjaan ini adalah pekerjaan streaming yang terus berjalan dan dapat menghasilkan sejumlah besar file keluaran. Setelah Anda melihat output, kami sarankan Anda menjalankan perintah yarn application -kill <appId> di CLI untuk menghentikan pekerjaan sesegera mungkin.