Realtime Compute for Apache Flink menggunakan Flink CDC untuk mengingesti data. Anda dapat mengembangkan pekerjaan YAML guna menyinkronkan data dari sumber ke sink. Topik ini menjelaskan cara mengembangkan pekerjaan ingesti data Flink CDC.
Informasi latar belakang
Ingesti data Flink CDC memanfaatkan Flink CDC untuk integrasi data. Anda dapat menggunakan konfigurasi YAML untuk mendefinisikan proses ekstrak, transformasi, dan muat (ETL) yang kompleks, yang secara otomatis dikonversi menjadi logika komputasi Flink. Fitur ini mendukung sinkronisasi basis data penuh, sinkronisasi tabel tunggal, sinkronisasi sharding, sinkronisasi tabel baru, evolusi skema, serta sinkronisasi kolom terhitung kustom. Selain itu, fitur ini juga mendukung pemrosesan ETL, penyaringan data dengan klausa WHERE, pemangkasan kolom, dan kolom terhitung. Hal ini menyederhanakan proses integrasi data sekaligus meningkatkan efisiensi dan keandalannya.
Keunggulan Flink CDC
Di Realtime Compute for Apache Flink, Anda dapat mengembangkan pekerjaan ingesti data Flink CDC, pekerjaan SQL, atau pekerjaan DataStream sendiri untuk menyinkronkan data. Bagian berikut menjelaskan keunggulan pekerjaan ingesti data Flink CDC dibandingkan dua metode pengembangan lainnya.
Flink CDC vs. Flink SQL
Pekerjaan ingesti data Flink CDC dan pekerjaan SQL menggunakan tipe data yang berbeda untuk transmisi data:
Pekerjaan SQL mentransmisikan RowData, sedangkan pekerjaan Flink CDC mentransmisikan DataChangeEvent dan SchemaChangeEvent. Setiap RowData dalam pekerjaan SQL memiliki tipe perubahan tersendiri, yaitu insert (+I), update before (-U), update after (+U), dan delete (-D).
Flink CDC menggunakan SchemaChangeEvent untuk mentransmisikan informasi perubahan skema, seperti membuat tabel, menambah kolom, atau membersihkan tabel. DataChangeEvent digunakan untuk mentransmisikan perubahan data, seperti operasi insert, update, dan delete. Pesan update mencakup konten update before dan update after, sehingga memungkinkan Anda menulis data perubahan asli ke sink.
Tabel berikut merangkum keunggulan pekerjaan ingesti data Flink CDC dibandingkan pekerjaan SQL.
Ingesti data Flink CDC | Flink SQL |
Mendeteksi skema secara otomatis dan mendukung sinkronisasi basis data penuh. | Memerlukan penulisan manual pernyataan CREATE TABLE dan INSERT. |
Mendukung berbagai kebijakan evolusi skema. | Tidak mendukung evolusi skema. |
Mendukung sinkronisasi changelog asli. | Mengganggu struktur changelog asli. |
Mendukung pembacaan dari dan penulisan ke beberapa tabel. | Hanya membaca dari dan menulis ke satu tabel. |
Dibandingkan dengan pernyataan CTAS atau CDAS, pekerjaan Flink CDC lebih kuat dan mendukung fitur-fitur berikut:
Evolusi skema pada tabel leluhur disinkronkan segera tanpa perlu menunggu penulisan data baru untuk memicu sinkronisasi.
Mendukung sinkronisasi changelog asli tanpa memisahkan pesan update.
Menyinkronkan lebih banyak jenis perubahan skema, seperti TRUNCATE TABLE dan DROP TABLE.
Mendukung pemetaan tabel untuk menentukan nama tabel sink secara fleksibel.
Mendukung perilaku evolusi skema yang fleksibel dan dapat dikonfigurasi.
Mendukung penyaringan data dengan klausa WHERE.
Mendukung pemangkasan bidang.
Flink CDC vs. Flink DataStream
Tabel berikut merangkum keunggulan pekerjaan ingesti data Flink CDC dibandingkan pekerjaan DataStream.
Ingesti data Flink CDC | Flink DataStream |
Dirancang untuk pengguna dari semua tingkat keahlian, bukan hanya ahli. | Memerlukan keakraban dengan Java dan sistem terdistribusi. |
Menyembunyikan detail implementasi dasar untuk menyederhanakan pengembangan. | Memerlukan keakraban dengan kerangka kerja Flink. |
Format YAML mudah dipahami dan dipelajari. | Memerlukan pengetahuan tentang alat seperti Maven untuk mengelola dependensi. |
Pekerjaan yang sudah ada mudah digunakan kembali. | Kode yang sudah ada sulit digunakan kembali. |
Batasan
Anda dapat menggunakan Ververica Runtime (VVR) 11.1 untuk mengembangkan pekerjaan ingesti data Flink CDC. Untuk menggunakan VVR 8.x, Anda harus menggunakan VVR 8.0.11.
Hanya satu sumber dan satu sink yang didukung. Untuk membaca dari beberapa sumber data atau menulis ke beberapa sink, Anda harus membuat beberapa pekerjaan Flink CDC.
Anda tidak dapat menerapkan pekerjaan Flink CDC ke kluster sesi.
Penyetelan otomatis tidak didukung untuk pekerjaan Flink CDC.
Konektor ingesti data Flink CDC
Tabel berikut mencantumkan konektor yang didukung sebagai sumber dan sink untuk ingesti data Flink CDC.
Anda dapat memberikan masukan mengenai penyimpanan hulu dan hilir yang Anda minati melalui saluran seperti Tiket atau DingTalk. Kami berencana untuk mendukung lebih banyak opsi penyimpanan ini di masa depan guna memenuhi kebutuhan Anda dengan lebih baik.
Konektor yang didukung
Konektor | Tipe yang didukung | |
Sumber | Sink | |
Catatan Terhubung ke ApsaraDB RDS for MySQL, PolarDB for MySQL, dan MySQL yang dikelola sendiri. | √ | × |
× | √ | |
√ Catatan Memerlukan Ververica Runtime (VVR) 8.0.10 atau versi lebih baru. | √ | |
× | √ | |
× | √ | |
× | √ | |
√ Catatan Memerlukan VVR 11.1 atau versi lebih baru. | × | |
√ Catatan Memerlukan VVR 11.2 atau versi lebih baru. | × | |
× | √ Catatan Memerlukan VVR 11.1 atau versi lebih baru. | |
× | √ Catatan Memerlukan VVR 11.1 atau versi lebih baru. | |
× | √ | |
Membuat pekerjaan ingesti data Flink CDC
Hasilkan pekerjaan dari templat pekerjaan sinkronisasi
Masuk ke Konsol Realtime Compute for Apache Flink.
Pada kolom Actions ruang kerja target, klik Console.
Di panel navigasi sebelah kiri, pilih .
Klik
, lalu klik Create From Template.Pilih templat sinkronisasi data.
Saat ini hanya templat MySQL ke StarRocks, MySQL ke Paimon, dan MySQL ke Hologres yang didukung.

Masukkan pengaturan pekerjaan, seperti Job Name, Storage Location, dan Engine Version, lalu klik OK.
Konfigurasikan informasi sumber dan sink untuk pekerjaan Flink CDC.
Untuk informasi selengkapnya mengenai parameter, lihat dokumentasi untuk konektor yang sesuai.
Hasilkan pekerjaan dari pekerjaan CTAS/CDAS
Jika suatu pekerjaan berisi beberapa pernyataan CXAS, Flink hanya mendeteksi dan mengonversi pernyataan pertama.
Karena fungsi bawaan yang didukung oleh Flink SQL dan Flink CDC berbeda, aturan transformasi yang dihasilkan mungkin belum siap digunakan. Anda harus memeriksa dan menyesuaikan aturan tersebut secara manual.
Jika sumbernya adalah MySQL dan pekerjaan CTAS/CDAS asli sedang berjalan, Anda harus menyesuaikan server ID sumber dalam pekerjaan ingesti data Flink CDC untuk menghindari konflik dengan pekerjaan asli.
Masuk ke Konsol Realtime Compute for Apache Flink.
Klik Console di kolom Actions ruang kerja target.
Di panel navigasi sebelah kiri, pilih .
Klik
, lalu klik Generate From Existing CTAS/CDAS Job. Pilih pekerjaan CTAS atau CDAS target dan klik OK.Di halaman pemilihan, sistem hanya menampilkan pekerjaan CTAS dan CDAS yang valid. Pekerjaan ETL biasa dan draf pekerjaan dengan kesalahan sintaks tidak ditampilkan.
Masukkan informasi pekerjaan, seperti Job Name, Storage Location, dan Engine Version, lalu klik OK.
Migrasikan pekerjaan dari komunitas open source
Masuk ke Konsol Realtime Compute for Apache Flink.
Untuk ruang kerja target, klik Console di kolom Actions.
Di panel navigasi sebelah kiri, buka .
Klik
, pilih Create Data Ingestion Draft, masukkan File Name dan Engine Version, lalu klik Create.Salin pekerjaan Flink CDC dari komunitas open source.
(Opsional) Klik Deep Check.
Anda dapat memeriksa sintaks, konektivitas jaringan, dan izin akses.
Buat pekerjaan ingesti data Flink CDC dari awal
Masuk ke Konsol Realtime Compute for Apache Flink.
Pada kolom Actions untuk ruang kerja target, klik Console.
Di panel navigasi sebelah kiri, pilih .
Klik
dan pilih Create Data Ingestion Draft. Masukkan File Name dan Engine Version, lalu klik Create.Konfigurasikan pekerjaan Flink CDC.
# Wajib source: # Tipe sumber data type: <Ganti dengan tipe konektor sumber Anda> # Konfigurasi sumber data. Untuk informasi selengkapnya mengenai item konfigurasi, lihat dokumentasi untuk konektor yang sesuai. ... # Wajib sink: # Tipe sink type: <Ganti dengan tipe konektor sink Anda> # Konfigurasi sink. Untuk informasi selengkapnya mengenai item konfigurasi, lihat dokumentasi untuk konektor yang sesuai. ... # Opsional transform: # Aturan transformasi untuk tabel flink_test.customers - source-table: flink_test.customers # Konfigurasi proyeksi. Menentukan kolom yang akan disinkronkan dan melakukan transformasi data. projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name # Kondisi filter. Hanya menyinkronkan data di mana id lebih besar dari 10. filter: id > 10 # Deskripsi yang digunakan untuk menjelaskan aturan transformasi description: tambahkan kolom terhitung berdasarkan tabel sumber # Opsional route: # Aturan routing. Menentukan pemetaan antara tabel sumber dan tabel sink. - source-table: flink_test.customers sink-table: db.customers_o # Deskripsi yang digunakan untuk menjelaskan aturan routing description: sinkronisasi tabel customers - source-table: flink_test.customers_suffix sink-table: db.customers_s # Deskripsi yang digunakan untuk menjelaskan aturan routing description: sinkronisasi tabel customers_suffix #Opsional pipeline: # Nama pekerjaan name: MySQL to Hologres PipelineCatatanDalam pekerjaan Flink CDC, kunci dan nilai harus dipisahkan oleh spasi. Formatnya adalah
Kunci: Nilai.Tabel berikut menjelaskan blok kode tersebut.
Wajib
Blok kode
Deskripsi
Wajib
source
Awal dari pipa data. Flink CDC menangkap data perubahan dari sumber data.
CatatanSaat ini, hanya MySQL yang didukung sebagai sumber data. Untuk informasi selengkapnya mengenai item konfigurasi, lihat MySQL.
Anda dapat menggunakan variabel untuk mengatur informasi sensitif. Untuk informasi selengkapnya, lihat Manajemen Variabel.
sink
Akhir dari pipa data. Flink CDC mentransmisikan perubahan data yang ditangkap ke sistem sink ini.
CatatanUntuk informasi mengenai sistem sink yang saat ini didukung, lihat Konektor ingesti data Flink CDC. Untuk informasi selengkapnya mengenai item konfigurasi sink, lihat dokumentasi untuk konektor yang sesuai.
Anda dapat menggunakan variabel untuk mengatur informasi sensitif. Untuk informasi selengkapnya, lihat Manajemen Variabel.
Opsional
pipeline
pipeline
Menentukan konfigurasi dasar untuk seluruh pekerjaan pipa data, seperti nama pipeline.
transform
Menentukan aturan transformasi data. Transformasi adalah proses operasi pada data yang mengalir melalui pipeline Flink. Mendukung pemrosesan ETL, penyaringan klausa WHERE, pemangkasan kolom, dan kolom terhitung.
Ketika data perubahan asli yang ditangkap oleh Flink CDC perlu ditransformasi agar sesuai dengan sistem hilir tertentu, Anda dapat menggunakan blok transform.
route
Jika blok ini tidak dikonfigurasi, berarti menunjukkan sinkronisasi basis data penuh atau tabel target.
Dalam beberapa kasus, data perubahan yang ditangkap mungkin perlu dikirim ke tujuan berbeda berdasarkan aturan tertentu. Mekanisme routing memungkinkan Anda secara fleksibel menentukan pemetaan antara sistem hulu dan hilir untuk mengirim data ke sink yang berbeda.
Untuk informasi selengkapnya mengenai sintaks dan item konfigurasi setiap blok, lihat Referensi pengembangan pekerjaan ingesti data Flink CDC.
Kode berikut memberikan contoh cara menyinkronkan semua tabel dari basis data app_db di MySQL ke basis data di Hologres.
source: type: mysql hostname: <hostname> port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: app_db.\.* server-id: 5400-5404 sink: type: hologres name: Hologres Sink endpoint: <endpoint> dbname: <database-name> username: ${secret_values.holousername} password: ${secret_values.holopassword} pipeline: name: Sync MySQL Database to Hologres(Opsional) Anda dapat mengklik Deep Check.
Anda dapat memeriksa sintaks, konektivitas jaringan, dan izin akses.
Referensi
Setelah mengembangkan pekerjaan Flink CDC, Anda harus menerapkan dan mempublikasikannya. Untuk informasi selengkapnya, lihat Deploy a job.
Untuk membangun pekerjaan Flink CDC yang menyinkronkan data dari basis data MySQL ke StarRocks secara cepat, lihat Quick start for Flink CDC data ingestion jobs.