Pekerjaan ingesti data Flink CDC didefinisikan dalam file YAML yang terdiri dari hingga lima modul. Modul Source dan Sink wajib ada, sedangkan modul Transform, Route, dan Pipeline bersifat opsional.
Ikhtisar struktur pekerjaan
File pekerjaan lengkap mengikuti struktur berikut:
source: # Wajib: mendefinisikan sumber data
...
sink: # Wajib: mendefinisikan tujuan
...
transform: # Opsional: memfilter dan membentuk ulang data sebelum ditulis
- ...
route: # Opsional: memetakan tabel sumber ke tabel sink yang berbeda
- ...
pipeline: # Opsional: mengatur parameter global pekerjaan
...
Konfigurasi minimal
Contoh berikut menyinkronkan semua tabel yang cocok dari MySQL ke Paimon hanya dengan menggunakan modul wajib:
# Modul source
source:
type: mysql
name: MySQL Source
host: localhost
port: 3306
username: admin
password: <yourPassword>
tables: adb.*
# Modul sink
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse
# Modul pipeline
pipeline:
name: source-database-sync-pipe
schema.change.behavior: evolve
Konfigurasi lengkap
Contoh berikut menambahkan modul Transform dan Route untuk memfilter bidang, menerapkan ekspresi, dan memetakan ulang tabel:
# Modul source
source:
type: mysql
name: MySQL Source
host: localhost
port: 3306
username: admin
password: <yourPassword>
tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*, mydb.\\.*
# Modul sink
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse
# Modul transform
transform:
- source-table: mydb.app_order_.*
projection: id, order_id, TO_UPPER(product_name)
filter: id > 10 AND order_id > 100
primary-keys: id
partition-keys: product_name
table-options: comment=app order
description: project fields from source table
converter-after-transform: SOFT_DELETE
- source-table: mydb.web_order_.*
projection: CONCAT(id, order_id) as uniq_id, *
filter: uniq_id > 10
description: add new uniq_id for each row
# Modul route
route:
- source-table: mydb.default.app_order_.*
sink-table: odsdb.default.app_order
description: sync all table shards to one
- source-table: mydb.default.web_order
sink-table: odsdb.default.ods_web_order
description: sync table with given prefix ods_
# Modul pipeline
pipeline:
name: source-database-sync-pipe
schema.change.behavior: evolve
Modul Source
Modul Source mendefinisikan sumber data yang dibaca oleh Flink CDC. Konektor yang didukung:
source:
type: mysql # Jenis konektor
name: mysql source
# Parameter spesifik konektor mengikuti
xxx: ...
Untuk semua parameter yang didukung, lihat dokumentasi konektor terkait.
Modul Sink
Modul Sink mendefinisikan tujuan penulisan data oleh Flink CDC. Konektor yang didukung:
sink:
type: hologres # Jenis konektor
name: hologres sink
# Parameter spesifik konektor mengikuti
xxx: ...
Untuk semua parameter yang didukung, lihat dokumentasi konektor terkait.
Modul Transform
Modul Transform menerapkan aturan proyeksi, perhitungan, dan pemfilteran pada data tabel sumber sebelum mencapai sink. Definisikan satu aturan untuk setiap pola tabel sumber.
transform:
- source-table: db.tbl1
projection: ...
filter: ...
- source-table: db.tbl2
projection: ...
filter: ...
| Parameter | Deskripsi | Wajib |
|---|---|---|
source-table |
Identifier tabel sumber; mendukung ekspresi reguler | Wajib |
projection |
Pemilihan kolom dan ekspresi, mirip dengan klausa SELECT SQL |
Opsional |
filter |
Kondisi filter baris, mirip dengan klausa WHERE SQL |
Opsional |
primary-keys |
Kunci primer tabel sink, dipisahkan koma | Opsional |
partition-keys |
Kunci partisi tabel sink, dipisahkan koma | Opsional |
table-options |
Opsi pembuatan tabel yang diterapkan saat tabel sink dibuat otomatis | Opsional |
converter-after-transform |
Konverter yang diterapkan pada event perubahan setelah aturan transform dijalankan | Opsional |
description |
Deskripsi aturan | Opsional |
Untuk detail sintaks ekspresi, lihat Modul Transform Flink CDC.
Modul Route
Modul Route memetakan tabel sumber ke tabel sink. Gunakan modul ini untuk menggabungkan tabel yang di-shard, menambahkan awalan nama tabel, atau mengarahkan ulang tabel ke tujuan berbeda.
route:
- source-table: db.tbl1
sink-table: sinkdb.tbl1
- source-table: db.tbl2
sink-table: sinkdb.tbl2
| Parameter | Deskripsi | Wajib |
|---|---|---|
source-table |
Identifier tabel sumber; mendukung ekspresi reguler | Wajib |
sink-table |
Identifier tabel sink target | Wajib |
description |
Deskripsi aturan rute | Opsional |
Untuk detail sintaks ekspresi, lihat Modul Route Flink CDC.
Modul Pipeline
Modul Pipeline mengatur parameter global yang berlaku untuk seluruh pekerjaan.
pipeline:
name: CDC YAML job
schema.change.behavior: LENIENT
| Parameter | Deskripsi | Wajib |
|---|---|---|
name |
Nama pekerjaan yang ditampilkan di Dasbor Flink | Opsional |
schema.change.behavior |
Cara pekerjaan menangani perubahan skema hulu. Nilai yang diterima: evolve, LENIENT, dan lainnya. |
Opsional |
Untuk semua parameter yang dapat dikonfigurasi, lihat Modul Pipeline Flink CDC.