Realtime Compute for Apache Flink menggunakan Flink CDC untuk membantu Anda mengembangkan pekerjaan YAML yang menyinkronkan data dari sumber ke tujuan. Topik ini menjelaskan cara mengembangkan pekerjaan ingesti data Flink CDC.
Informasi latar belakang
Ingesti data Flink CDC memanfaatkan Flink CDC untuk integrasi data. Dengan konfigurasi YAML, Anda dapat dengan mudah mendefinisikan pipa ekstrak, transformasi, dan muat (ETL) yang kompleks, yang secara otomatis dikonversi menjadi logika komputasi Flink. Fitur ini mendukung secara efisien sinkronisasi database penuh, sinkronisasi tabel tunggal, sinkronisasi database dan tabel terpartisi, sinkronisasi tabel yang baru ditambahkan, evolusi skema, serta sinkronisasi kolom terhitung kustom. Selain itu, fitur ini juga mendukung pemrosesan ETL, penyaringan data berbasis klausa WHERE, dan pemangkasan kolom, sehingga sangat menyederhanakan proses integrasi data serta meningkatkan efisiensi dan keandalannya.
Keunggulan Flink CDC
Di Realtime Compute for Apache Flink, Anda dapat mengembangkan pekerjaan ingesti data Flink CDC, pekerjaan SQL, atau pekerjaan DataStream untuk menyinkronkan data. Bagian berikut menjelaskan keunggulan pekerjaan ingesti data Flink CDC dibandingkan dua jenis pekerjaan lainnya.
Flink CDC vs. Flink SQL
Pekerjaan ingesti data Flink CDC dan pekerjaan SQL menggunakan tipe data yang berbeda untuk mentransfer data:
Pekerjaan SQL mentransfer RowData, sedangkan pekerjaan Flink CDC mentransfer DataChangeEvent dan SchemaChangeEvent. Setiap RowData dalam pekerjaan SQL memiliki tipe perubahan sendiri, yaitu insert (+I), update_before (-U), update_after (+U), dan delete (-D).
Flink CDC menggunakan SchemaChangeEvent untuk mentransfer informasi perubahan skema, seperti pembuatan tabel, penambahan kolom, dan pemotongan tabel. DataChangeEvent digunakan untuk mentransfer perubahan data, seperti insert, update, dan delete. Pesan update berisi konten sebelum dan sesudah update, sehingga memungkinkan Anda menulis data perubahan mentah ke tujuan.
Tabel berikut membandingkan keunggulan pekerjaan ingesti data Flink CDC dengan pekerjaan SQL.
Flink CDC data ingestion | Flink SQL |
Mendeteksi skema secara otomatis dan mendukung sinkronisasi database penuh. | Mengharuskan Anda menulis pernyataan CREATE TABLE dan INSERT secara manual. |
Mendukung berbagai kebijakan evolusi skema. | Tidak mendukung evolusi skema. |
Mendukung sinkronisasi changelog mentah. | Mengubah struktur changelog mentah. |
Mendukung pembacaan dan penulisan data ke beberapa tabel. | Membaca dan menulis data hanya ke satu tabel. |
Dibandingkan dengan pernyataan CTAS atau CDAS, pekerjaan Flink CDC menyediakan fitur yang lebih kuat, termasuk:
Segera menyinkronkan evolusi skema di tabel leluhur tanpa menunggu data baru ditulis.
Mendukung sinkronisasi changelog mentah tanpa memisahkan pesan update.
Menyinkronkan lebih banyak jenis evolusi skema, seperti TRUNCATE TABLE dan DROP TABLE.
Mendukung pemetaan tabel untuk mendefinisikan nama tabel tujuan secara fleksibel.
Mendukung perilaku evolusi skema yang fleksibel dan dapat dikonfigurasi.
Mendukung penyaringan data menggunakan klausa WHERE.
Mendukung pemangkasan kolom.
Flink CDC vs. Flink DataStream
Tabel berikut membandingkan keunggulan pekerjaan ingesti data Flink CDC dengan pekerjaan DataStream.
Flink CDC data ingestion | Flink DataStream |
Dirancang untuk pengguna di semua tingkat, bukan hanya ahli. | Memerlukan keakraban dengan Java dan sistem terdistribusi. |
Menyembunyikan detail dasar untuk menyederhanakan pengembangan. | Memerlukan keakraban dengan kerangka kerja Flink. |
Menggunakan format YAML yang mudah dipahami dan dipelajari. | Memerlukan pengetahuan tentang alat seperti Maven untuk mengelola dependensi. |
Memungkinkan penggunaan ulang pekerjaan yang sudah ada dengan mudah. | Menyulitkan penggunaan ulang kode yang sudah ada. |
Batasan
Anda harus menggunakan Ververica Runtime (VVR) 11.1 untuk mengembangkan pekerjaan ingesti data Flink CDC. Untuk VVR 8.x, gunakan VVR 8.0.11.
Anda hanya dapat menyinkronkan data dari satu sumber ke satu tujuan. Untuk membaca data dari beberapa sumber atau menulis ke beberapa tujuan, Anda harus membuat beberapa pekerjaan Flink CDC.
Anda tidak dapat men-deploy pekerjaan Flink CDC ke session cluster.
Penyetelan otomatis tidak didukung untuk pekerjaan Flink CDC.
Konektor ingesti data Flink CDC
Tabel berikut mencantumkan konektor yang didukung sebagai sumber dan tujuan untuk pekerjaan ingesti data Flink CDC.
Anda dapat memberikan masukan mengenai penyimpanan hulu dan hilir yang Anda minati melalui saluran seperti ticket atau DingTalk. Kami berencana mendukung lebih banyak opsi penyimpanan di masa depan untuk memenuhi kebutuhan Anda dengan lebih baik.
Konektor yang didukung
Konektor | Tipe yang didukung | |
Source | Sink | |
Catatan Terhubung ke ApsaraDB RDS for MySQL, PolarDB for MySQL, dan MySQL yang dikelola sendiri. | √ | × |
× | √ | |
√ Catatan Memerlukan Ververica Runtime (VVR) 8.0.10 atau versi yang lebih baru. | √ | |
× | √ | |
× | √ | |
× | √ | |
√ Catatan Memerlukan Ververica Runtime (VVR) 11.1 atau versi yang lebih baru. | × | |
√ Catatan Memerlukan Ververica Runtime (VVR) 11.2 atau versi yang lebih baru. | × | |
× | √ Catatan Memerlukan Ververica Runtime (VVR) 11.1 atau versi yang lebih baru. | |
× | √ Catatan Memerlukan Ververica Runtime (VVR) 11.1 atau versi yang lebih baru. | |
√ Catatan Memerlukan Ververica Runtime (VVR) 11.4 atau versi yang lebih baru. | × | |
× | √ | |
Gunakan kembali informasi koneksi dari Catalog yang sudah ada
Mulai dari Ververica Runtime (VVR) 11.5, Anda dapat menggunakan kembali informasi koneksi dari Catalog yang sudah ada dalam pekerjaan ingesti data Flink CDC Anda. Anda dapat mereferensikan Catalog bawaan yang dibuat pada halaman Data Management untuk mengambil properti koneksi secara otomatis, seperti URL, username, dan password. Hal ini mengurangi upaya konfigurasi manual.
Sintaks
source:
type: mysql
using.built-in-catalog: mysql_rds_catalog
sink:
type: paimon
using.built-in-catalog: paimon_dlf_catalogDalam modul source dan sink, gunakan sintaks using.built-in-catalog untuk mereferensikan Catalog bawaan Anda.
Sebagai contoh, dalam kode di atas, metadata untuk Catalog mysql_rds_catalog sudah mencakup parameter yang diperlukan seperti hostname, username, dan password. Anda tidak perlu menentukan parameter-parameter ini lagi dalam pekerjaan YAML.
Batasan
Konektor berikut mendukung penggunaan kembali informasi koneksi dari Catalog:
MySQL (source)
Kafka (source)
Upsert Kafka (sink)
StarRocks (sink)
Hologres (sink)
Paimon (sink)
SLS (source)
Parameter Catalog yang tidak kompatibel dengan sintaks YAML CDC tidak akan berlaku. Untuk informasi selengkapnya, lihat daftar parameter untuk setiap konektor.
Buat pekerjaan ingesti data Flink CDC
Hasilkan dari templat pekerjaan sinkronisasi
Masuk ke Konsol Realtime Compute for Apache Flink.
Klik Console di kolom Actions ruang kerja target.
Di panel navigasi sebelah kiri, pilih .
Klik
, lalu klik Create From Template.Pilih templat sinkronisasi data.
Saat ini, hanya templat MySQL ke StarRocks, MySQL ke Paimon, dan MySQL ke Hologres yang didukung.

Masukkan nama pekerjaan, lokasi penyimpanan, dan versi engine, lalu klik OK.
Konfigurasikan informasi source dan sink untuk pekerjaan Flink CDC.
Untuk informasi selengkapnya tentang pengaturan parameter, lihat dokumentasi untuk konektor yang sesuai.
Hasilkan dari pekerjaan CTAS/CDAS
Jika suatu pekerjaan berisi beberapa pernyataan CXAS, Flink hanya mendeteksi dan mengubah pernyataan pertama.
Karena perbedaan dukungan fungsi bawaan antara Flink SQL dan Flink CDC, aturan transformasi yang dihasilkan mungkin belum siap digunakan. Anda harus memverifikasi dan menyesuaikan aturan tersebut secara manual.
Jika sumbernya adalah MySQL dan pekerjaan CTAS/CDAS asli sedang berjalan, Anda harus menyesuaikan server ID sumber untuk pekerjaan ingesti data Flink CDC agar tidak terjadi konflik dengan pekerjaan asli.
Masuk ke Konsol Realtime Compute for Apache Flink.
Klik Console di kolom Actions ruang kerja target.
Di panel navigasi sebelah kiri, pilih .
Klik
, klik New Draft From CTAS/CDAS, pilih pekerjaan CTAS atau CDAS target, lalu klik OK.Di halaman pemilihan, sistem hanya menampilkan pekerjaan CTAS dan CDAS yang valid. Pekerjaan ETL biasa atau draf pekerjaan yang memiliki kesalahan sintaks tidak ditampilkan.
Masukkan nama pekerjaan, lokasi penyimpanan, dan versi engine, lalu klik OK.
Migrasi Pekerjaan dari Komunitas Open Source
Masuk ke Konsol Realtime Compute for Apache Flink.
Klik Console di kolom Actions ruang kerja target.
Di panel navigasi sebelah kiri, pilih .
Klik
dan pilih New Data Ingestion Draft. Atur File Name dan Engine Version, lalu klik Create.Salin pekerjaan Flink CDC dari komunitas open source.
(Opsional) Klik Depth Check.
Anda dapat memeriksa sintaks, konektivitas jaringan, dan izin akses.
Buat pekerjaan ingesti data Flink CDC dari awal
Masuk ke Konsol Realtime Compute for Apache Flink.
Klik Console di kolom Actions ruang kerja target.
Di panel navigasi sebelah kiri, pilih .
Klik
, pilih New Data Ingestion Draft, masukkan File Name dan Engine Version, lalu klik Create.Konfigurasikan pekerjaan Flink CDC.
# Required source: # Tipe sumber data type: <Ganti dengan tipe konektor sumber Anda> # Konfigurasi sumber data. Untuk informasi selengkapnya tentang item konfigurasi, lihat dokumentasi untuk konektor yang sesuai. ... # Required sink: # Tipe tujuan type: <Ganti dengan tipe konektor tujuan Anda> # Konfigurasi tujuan. Untuk informasi selengkapnya tentang item konfigurasi, lihat dokumentasi untuk konektor yang sesuai. ... # Optional transform: # Aturan transformasi untuk tabel flink_test.customers - source-table: flink_test.customers # Konfigurasi proyeksi. Menentukan kolom yang akan disinkronkan dan melakukan transformasi data. projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name # Kondisi filter. Hanya menyinkronkan data dengan id lebih besar dari 10. filter: id > 10 # Deskripsi aturan transformasi description: tambahkan kolom terhitung berdasarkan tabel sumber # Optional route: # Aturan routing yang menentukan pemetaan antara tabel sumber dan tujuan. - source-table: flink_test.customers sink-table: db.customers_o # Deskripsi aturan routing description: sinkronisasi tabel customers - source-table: flink_test.customers_suffix sink-table: db.customers_s # Deskripsi aturan routing description: sinkronisasi tabel customers_suffix #Optional pipeline: # Nama pekerjaan name: MySQL to Hologres PipelineCatatanDalam pekerjaan Flink CDC, kunci dan nilai harus dipisahkan oleh spasi. Formatnya adalah
Key: Value.Blok kode dijelaskan sebagai berikut.
Wajib
Modul kode
Deskripsi
Required
source (data source)
Awal dari pipa data. Flink CDC menangkap data perubahan dari sumber data.
CatatanSaat ini, hanya MySQL yang didukung sebagai sumber data. Untuk informasi selengkapnya mengenai item konfigurasi, lihat MySQL.
Anda dapat menggunakan variabel untuk mengatur informasi sensitif. Untuk informasi selengkapnya, lihat Variable Management.
sink (destination)
Akhir dari pipa data. Flink CDC mengirimkan perubahan data yang ditangkap ke sistem tujuan ini.
CatatanUntuk informasi tentang sistem tujuan yang didukung, lihat Konektor ingesti data Flink CDC. Untuk informasi selengkapnya tentang item konfigurasi tujuan, lihat dokumentasi untuk konektor yang sesuai.
Anda dapat menggunakan variabel untuk mengatur informasi sensitif. Untuk informasi selengkapnya, lihat Variable Management.
Optional
pipeline
(data pipeline)
Menentukan konfigurasi dasar untuk seluruh pekerjaan pipa data, seperti nama pipa.
transform (data transformation)
Menentukan aturan transformasi data. Transformasi adalah operasi pada data yang mengalir melalui pipa Flink. Modul ini mendukung pemrosesan ETL, penyaringan klausa WHERE, pemangkasan kolom, dan kolom terhitung.
Anda dapat menggunakan transform ketika data perubahan mentah yang ditangkap oleh Flink CDC perlu ditransformasi agar sesuai dengan sistem hilir tertentu.
route (routing)
Jika modul ini tidak dikonfigurasi, berarti menunjukkan sinkronisasi database penuh atau tabel target.
Dalam beberapa kasus, data perubahan yang ditangkap perlu dikirim ke tujuan berbeda berdasarkan aturan tertentu. Mekanisme routing memungkinkan Anda menentukan pemetaan antara sistem hulu dan hilir secara fleksibel serta mengirim data ke tujuan data yang berbeda.
Untuk informasi selengkapnya tentang struktur sintaks dan item konfigurasi setiap modul, lihat Referensi Pengembangan untuk pekerjaan ingesti data Flink CDC.
Kode berikut memberikan contoh cara menyinkronkan semua tabel dari database app_db di MySQL ke database di Hologres.
source: type: mysql hostname: <hostname> port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: app_db.\.* server-id: 5400-5404 # (Optional) Sinkronkan data dari tabel yang baru dibuat pada fase inkremental. scan.binlog.newly-added-table.enabled: true # (Optional) Sinkronkan komentar tabel dan bidang. include-comments.enabled: true # (Optional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory pada TaskManager. scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (Optional) Aktifkan filter parsing untuk mempercepat pembacaan. scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: hologres name: Hologres Sink endpoint: <endpoint> dbname: <database-name> username: ${secret_values.holousername} password: ${secret_values.holopassword} pipeline: name: Sync MySQL Database to Hologres(Opsional) Klik Depth Check.
Anda dapat memeriksa sintaks, konektivitas jaringan, dan izin akses.
Referensi
Setelah mengembangkan pekerjaan Flink CDC, Anda perlu men-deploy-nya. Untuk informasi selengkapnya, lihat Deploy a job.
Untuk membangun pekerjaan Flink CDC yang menyinkronkan data dari database MySQL ke StarRocks secara cepat, lihat Quick Start for Flink CDC data ingestion jobs.