Topik ini menjelaskan praktik terbaik penggunaan pekerjaan ingesti data Flink Change Data Capture (CDC) dalam skenario bisnis kompleks, mencakup evolusi skema tabel sumber, peningkatan logika data (seperti menyisipkan metadata, menambahkan kolom terhitung, dan melakukan soft delete), perutean heterogen (seperti menggabungkan tabel yang di-shard dan menyinkronkan seluruh database), serta kontrol presisi (seperti memfilter tabel dan memulai dari stempel waktu tertentu).
Sinkronisasi tabel yang baru ditambahkan
Pekerjaan ingesti data Flink CDC mendukung sinkronisasi tabel yang baru ditambahkan dalam dua skenario:
Sinkronisasi panas tabel kosong baru: Anda dapat secara dinamis menangkap tabel baru tanpa data historis. Pekerjaan tersebut menangkap perubahan berikutnya pada tabel-tabel ini tanpa perlu restart.
Sinkronisasi tabel baru dengan data historis: Tabel baru yang telah berisi data historis memerlukan sinkronisasi penuh dan inkremental. Anda harus me-restart pekerjaan untuk menyertakan tabel-tabel tersebut.
Sinkronisasi panas tabel kosong baru tanpa data historis
Anda dapat mengaktifkan parameter scan.binlog.newly-added-table.enabled dalam pekerjaan Flink CDC untuk menyinkronisasi tabel kosong baru secara real time selama fase inkremental. Tabel-tabel ini dibuat tanpa data historis dan tidak memerlukan restart pekerjaan. Konfigurasi ini direkomendasikan.
Sebagai contoh, sebuah Pekerjaan Ingesti Data Flink CDC sedang menyinkronkan semua tabel di database MySQL `dlf_test`. Sebuah tabel kosong baru bernama products tanpa data historis dibuat di database sumber. Untuk menyinkronkan tabel baru ini tanpa memulai ulang pekerjaan, aktifkan parameter scan.binlog.newly-added-table.enabled: true. Konfigurasinya adalah sebagai berikut:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronisasi data dari tabel yang baru dibuat selama fase inkremental.
scan.binlog.newly-added-table.enabled: true
# (Opsional) Sinkronisasi komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
table.properties.deletion-vectors.enabled: trueSetelah pekerjaan CDC YAML dengan konfigurasi ini berjalan, pekerjaan tersebut secara otomatis menyinkronisasi semua tabel kosong baru dari database dlf_test ke tujuan.
Sinkronisasi tabel baru dengan data historis
Asumsikan bahwa tabel customers dan products sudah ada di database MySQL, tetapi Anda hanya ingin menyinkronisasi tabel customers saat startup. Konfigurasi pekerjaan adalah sebagai berikut:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.customers
server-id: 8601-8604
# (Opsional) Sinkronisasi komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
table.properties.deletion-vectors.enabled: trueSetelah pekerjaan berjalan selama beberapa waktu, jika Anda ingin menyinkronisasi semua tabel beserta data historisnya dari database, Anda harus me-restart pekerjaan tersebut. Ikuti langkah-langkah berikut:
Hentikan pekerjaan dan buat titik simpan.
Dalam konfigurasi sumber data MySQL, ubah parameter tables agar sesuai dengan tabel yang diinginkan. Hapus parameter
scan.binlog.newly-added-table.enableddan aktifkanscan.newly-added-table.enabled.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronisasi data penuh dan inkremental untuk tabel yang baru ditambahkan.
scan.newly-added-table.enabled: true
# (Opsional) Sinkronisasi komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
table.properties.deletion-vectors.enabled: trueJalankan ulang pekerjaan dari titik simpan.
Anda tidak dapat mengaktifkan scan.binlog.newly-added-table.enabled dan scan.newly-added-table.enabled secara bersamaan.
Mengecualikan tabel tertentu
Dalam pekerjaan ingesti data Flink CDC, Anda dapat memfilter tabel tertentu agar tidak dibuat dan disinkronisasi ke hilir.
Sebagai contoh, database MySQL dlf_test berisi beberapa tabel, seperti customers dan products. Jika Anda ingin mengecualikan tabel bernama products_tmp, konfigurasikan pekerjaan sebagai berikut:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
# (Opsional) Kecualikan tabel yang tidak ingin Anda sinkronkan.
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
# (Opsional) Sinkronisasi data dari tabel yang baru dibuat selama fase inkremental.
scan.binlog.newly-added-table.enabled: true
# (Opsional) Sinkronisasi komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
table.properties.deletion-vectors.enabled: truePekerjaan ingesti data Flink CDC dengan konfigurasi ini secara otomatis menyinkronisasi semua tabel dari database dlf_test ke tujuan, kecuali tabel products_tmp. Pekerjaan ini juga menjaga skema tabel dan data tetap tersinkronisasi secara real time.
Parameter tables.exclude mendukung ekspresi reguler untuk mencocokkan beberapa tabel. Jika suatu tabel ditentukan baik di parameter tables maupun tables.exclude, aturan pengecualian akan didahulukan, sehingga tabel tersebut tidak akan disinkronkan.
Tingkatkan dengan metadata dan kolom terhitung
Tambahkan kolom metadata
Saat menulis data, Anda dapat menggunakan modul transform untuk menambahkan kolom metadata. Sebagai contoh, konfigurasi pekerjaan berikut menulis nama tabel, waktu operasi, dan jenis operasi ke tabel hilir. Untuk informasi lebih lanjut, lihat Modul Transform.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronisasi data penuh dan inkremental untuk tabel yang baru ditambahkan.
scan.newly-added-table.enabled: true
# (Opsional) Sinkronisasi komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Gunakan waktu operasi sebagai metadata.
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, *
# (Opsional) Ubah kunci primer.
primary-keys: id,identifier
description: tambahkan identifier, op_ts, dan op
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
table.properties.deletion-vectors.enabled: trueSaat menggunakan MySQL sebagai sumber, Anda harus menambahkan metadata-column.include-list: op_ts untuk mengirim waktu operasi sebagai metadata ke hilir. Untuk informasi lebih lanjut, lihat MySQL.
Jika Anda ingin melakukan soft delete di tabel hilir, Anda dapat mengubah operasi delete menjadi operasi insert. Untuk melakukannya, tambahkan konfigurasi converter-after-transform: SOFT_DELETE dalam modul transform. Hal ini berguna karena data sumber berisi semua jenis perubahan, termasuk penghapusan.
Tambahkan kolom terhitung
Saat menulis data, Anda dapat menggunakan modul transform untuk menambahkan kolom terhitung. Sebagai contoh, konfigurasi pekerjaan berikut mengubah bidang created_at untuk menghasilkan bidang dt, yang kemudian digunakan sebagai bidang partisi untuk tabel hilir.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronisasi data penuh dan inkremental untuk tabel yang baru ditambahkan.
scan.newly-added-table.enabled: true
# (Opsional) Sinkronisasi komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Gunakan waktu operasi sebagai metadata.
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
# (Opsional) Tetapkan bidang partisi.
partition-keys: dt
description: tambahkan dt
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
table.properties.deletion-vectors.enabled: trueSaat menggunakan MySQL sebagai sumber, Anda harus menambahkan metadata-column.include-list: op_ts untuk mengirim waktu operasi sebagai metadata ke hilir. Untuk informasi lebih lanjut, lihat MySQL.
Pemetaan nama tabel
Saat menyinkronisasi data dari tabel sumber ke tabel tujuan, Anda mungkin perlu mengubah nama tabel. Anda dapat melakukannya menggunakan modul routing. Bagian-bagian berikut menjelaskan skenario khas dan memberikan contoh konfigurasi pekerjaan ingesti data Flink CDC.
Gabungkan tabel yang di-shard
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronisasi data penuh dan inkremental untuk tabel yang baru ditambahkan.
scan.newly-added-table.enabled: true
# (Opsional) Sinkronisasi komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# Gabungkan semua tabel di database dlf_test yang namanya diawali dengan 'product_' dan diakhiri angka menjadi dlf.products.
- source-table: dlf_test.product_[0-9]+
sink-table: dlf.products
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
table.properties.deletion-vectors.enabled: trueSinkronisasi seluruh database
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronisasi data penuh dan inkremental untuk tabel yang baru ditambahkan.
scan.newly-added-table.enabled: true
# (Opsional) Sinkronisasi komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# Ubah nama tabel secara seragam. Sinkronisasi semua tabel dari database dlf_test ke database dlf. Nama tabel baru diberi awalan 'ods_'.
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
table.properties.deletion-vectors.enabled: trueKasus penggunaan komprehensif untuk skenario bisnis kompleks
Pekerjaan ingesti data Flink CDC berikut merupakan contoh komprehensif yang menggabungkan fitur-fitur yang disebutkan dalam topik ini untuk menangani skenario bisnis kompleks. Anda dapat menyesuaikan konfigurasi ini sesuai kebutuhan bisnis spesifik Anda.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
# (Opsional) Kecualikan tabel yang tidak ingin Anda sinkronkan.
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
# (Opsional) Sinkronisasi data penuh dan inkremental untuk tabel yang baru ditambahkan.
scan.newly-added-table.enabled: true
# (Opsional) Sinkronisasi komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Gunakan waktu operasi sebagai metadata.
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
# (Opsional) Ubah kunci primer.
primary-keys: id,identifier
# (Opsional) Tetapkan bidang partisi.
partition-keys: dt
# (Opsional) Ubah data yang dihapus menjadi insert.
converter-after-transform: SOFT_DELETE
route:
# Sinkronisasi semua tabel dari database dlf_test ke database dlf. Nama tabel baru diberi awalan 'ods_'.
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
table.properties.deletion-vectors.enabled: trueMulai dari stempel waktu tertentu
Saat melakukan start tanpa status dari pekerjaan ingesti data Flink CDC, Anda dapat menentukan waktu mulai untuk sumber data. Hal ini membantu Anda melanjutkan pembacaan data dari posisi binary logging (binlog) tertentu.
Konfigurasi di halaman O&M
Di halaman O&M pekerjaan, Anda dapat menentukan waktu mulai untuk tabel sumber saat memilih start tanpa status untuk pekerjaan tersebut.

Konfigurasi parameter pekerjaan
Dalam draf pekerjaan, Anda dapat menentukan waktu mulai untuk tabel sumber dengan mengonfigurasi parameter.
Untuk sumber MySQL, Anda dapat mengatur scan.startup.mode: timestamp dalam konfigurasi pekerjaan untuk menentukan waktu mulai. Berikut adalah contoh konfigurasinya:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Opsional) Mulai dalam mode yang menentukan waktu mulai tabel sumber.
scan.startup.mode: timestamp
# Tentukan stempel waktu startup dalam mode startup berbasis stempel waktu.
scan.startup.timestamp-millis: 1667232000000
# (Opsional) Sinkronisasi data dari tabel yang baru dibuat selama fase inkremental.
scan.binlog.newly-added-table.enabled: true
# (Opsional) Sinkronisasi komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
table.properties.deletion-vectors.enabled: trueWaktu mulai yang ditentukan di halaman O&M didahulukan daripada waktu mulai yang ditentukan dalam parameter pekerjaan.