Topik ini menjelaskan praktik terbaik penggunaan pekerjaan ingesti data Flink CDC dalam skenario bisnis kompleks, mencakup penanganan evolusi skema tabel sumber, peningkatan logika data melalui penyisipan metadata, penambahan kolom terhitung, penerapan soft delete, implementasi routing heterogen untuk menggabungkan tabel yang di-shard dan menyinkronkan seluruh database, serta penerapan kontrol presisi melalui pemfilteran tabel dan memulai pekerjaan dari timestamp tertentu.
Sinkronkan tabel yang baru ditambahkan
Pekerjaan ingesti data Flink CDC mendukung sinkronisasi tabel yang baru ditambahkan dengan dua cara:
Sinkronisasi panas (hot synchronization) tabel kosong baru: Metode ini berlaku untuk tabel baru yang tidak memiliki data historis dan hanya akan berisi perubahan selanjutnya. Pekerjaan dapat secara dinamis menangkap tabel-tabel tersebut tanpa perlu restart.
Sinkronisasi tabel dengan data historis: Metode ini berlaku untuk tabel baru yang sudah berisi data historis. Proses ini memerlukan sinkronisasi penuh dan inkremental, serta pekerjaan harus di-restart agar perubahan diterapkan.
Sinkronisasi panas tabel kosong baru tanpa data historis
Anda dapat mengaktifkan parameter scan.binlog.newly-added-table.enabled agar pekerjaan Flink CDC Anda dapat menyinkronkan tabel kosong baru secara real time selama fase inkremental. Metode ini tidak memerlukan restart pekerjaan dan merupakan pendekatan yang direkomendasikan.
Sebagai contoh, jika pekerjaan ingesti data Flink CDC sedang menyinkronkan semua tabel di database MySQL `dlf_test` dan sebuah tabel kosong baru bernama products dibuat di database sumber, Anda dapat menyinkronkan tabel baru ini tanpa me-restart pekerjaan. Untuk melakukannya, atur parameter scan.binlog.newly-added-table.enabled ke `true` dalam konfigurasi pekerjaan Anda. Berikut adalah contoh konfigurasinya:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental.
scan.binlog.newly-added-table.enabled: true
# (Opsional) Sinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: trueSetelah Anda menjalankan pekerjaan CDC YAML dengan konfigurasi ini, pekerjaan tersebut secara otomatis membuat semua tabel baru dari database `dlf_test` di destinasi.
Parameter scan.newly-added-table.enabled hanya berlaku ketika scan.startup.mode diatur ke initial (nilai default).
Sinkronkan tabel dengan data historis
Asumsikan sebuah database MySQL berisi tabel `customers` dan `products`, tetapi Anda hanya ingin menyinkronkan tabel `customers` saat startup. Konfigurasi pekerjaan awal adalah sebagai berikut:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.customers
server-id: 8601-8604
# (Opsional) Sinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: trueSetelah pekerjaan berjalan selama beberapa waktu, jika Anda ingin menyinkronkan semua tabel beserta data historisnya dari database tersebut, Anda harus me-restart pekerjaan. Untuk melakukannya, ikuti langkah-langkah berikut:
Hentikan pekerjaan dan buat titik simpan (savepoint).
Ubah konfigurasi `tables` pada sumber data MySQL agar mencakup semua tabel yang ingin Anda sinkronkan. Selain itu, hapus parameter
scan.binlog.newly-added-table.enableddan aktifkan parameterscan.newly-added-table.enabled.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronkan data penuh dan inkremental untuk tabel yang baru ditambahkan.
scan.newly-added-table.enabled: true
# (Opsional) Sinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: trueJalankan ulang pekerjaan dari titik simpan tersebut.
Anda tidak dapat mengaktifkan scan.binlog.newly-added-table.enabled dan scan.newly-added-table.enabled secara bersamaan.
Kecualikan tabel tertentu
Dalam pekerjaan ingesti data Flink CDC, Anda dapat mengecualikan tabel tertentu agar tidak dibuat dan disinkronkan ke destinasi downstream.
Sebagai contoh, jika database `dlf_test` di MySQL berisi beberapa tabel seperti `customers` dan `products`, dan Anda ingin mengecualikan tabel `products_tmp`, Anda dapat mengonfigurasi pekerjaan sebagai berikut:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
# (Opsional) Kecualikan tabel yang tidak ingin Anda sinkronkan.
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
# (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental.
scan.binlog.newly-added-table.enabled: true
# (Opsional) Sinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: truePekerjaan ingesti data Flink CDC dengan konfigurasi ini secara otomatis membuat semua tabel dari database dlf_test di destinasi, kecuali tabel products_tmp. Pekerjaan ini juga menjaga skema tabel dan data tetap tersinkronisasi secara real time.
Parameter `tables.exclude` mendukung penggunaan ekspresi reguler untuk mencocokkan beberapa tabel. Jika terdapat tumpang tindih antara tabel yang ditentukan di `tables.exclude` dan `tables`, tabel yang tumpang tindih tersebut akan dikecualikan dan tidak disinkronkan. Artinya, pengecualian memiliki prioritas lebih tinggi daripada inklusi.
Tingkatkan dengan metadata dan kolom terhitung
Tambahkan kolom metadata
Saat menulis data, Anda dapat menggunakan modul transform untuk menambahkan kolom metadata. Sebagai contoh, konfigurasi pekerjaan berikut menambahkan nama tabel, waktu operasi, dan jenis operasi ke tabel downstream. Untuk informasi lebih lanjut, lihat Modul Transform.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronkan data penuh dan inkremental untuk tabel yang baru ditambahkan.
scan.newly-added-table.enabled: true
# (Opsional) Sinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Gunakan waktu operasi sebagai metadata.
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, *
# (Opsional) Ubah primary key.
primary-keys: id,identifier
description: tambahkan identifier, op_ts, dan op
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: trueSaat menggunakan MySQL sebagai sumber, Anda harus mengatur `metadata-column.include-list: op_ts` untuk mengirim waktu operasi sebagai metadata ke sink downstream. Untuk informasi lebih lanjut, lihat MySQL.
Data sumber untuk pekerjaan ingesti berisi semua jenis event change data capture (CDC), termasuk operasi delete. Untuk mengonversi operasi `DELETE` menjadi operasi `INSERT` di tabel downstream dan menerapkan soft delete, tambahkan konfigurasi `converter-after-transform: SOFT_DELETE` dalam modul transform.
Tambahkan kolom terhitung
Saat menulis data, Anda dapat menggunakan modul transform untuk menambahkan kolom terhitung. Sebagai contoh, konfigurasi pekerjaan berikut mengubah bidang `created_at` untuk menghasilkan bidang `dt`, lalu menggunakan bidang `dt` baru tersebut sebagai bidang partisi untuk tabel downstream.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronkan data penuh dan inkremental untuk tabel yang baru ditambahkan.
scan.newly-added-table.enabled: true
# (Opsional) Sinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Gunakan waktu operasi sebagai metadata.
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
# (Opsional) Atur bidang partisi.
partition-keys: dt
description: tambahkan dt
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: trueSaat menggunakan MySQL sebagai sumber, Anda harus mengatur `metadata-column.include-list: op_ts` untuk mengirim waktu operasi sebagai metadata ke sink downstream. Untuk informasi lebih lanjut, lihat MySQL.
Pemetaan nama tabel
Saat menyinkronkan tabel leluhur ke tabel turunan, Anda mungkin perlu mengganti nama tabel menggunakan modul route. Bagian berikut menyediakan contoh konfigurasi untuk skenario umum di mana Anda mengganti nama tabel dalam pekerjaan ingesti data Flink CDC.
Gabungkan database dan tabel yang di-shard
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronkan data penuh dan inkremental untuk tabel yang baru ditambahkan.
scan.newly-added-table.enabled: true
# (Opsional) Sinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# Gabungkan semua tabel di database dlf_test yang namanya dimulai dengan product_ dan diakhiri angka ke tabel dlf.products.
- source-table: dlf_test.product_[0-9]+
sink-table: dlf.products
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: trueSinkronkan seluruh database
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronkan data penuh dan inkremental untuk tabel yang baru ditambahkan.
scan.newly-added-table.enabled: true
# (Opsional) Sinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# Ubah nama tabel secara seragam. Sinkronkan semua tabel dari database dlf_test ke database dlf, dengan memberi awalan ods_ pada setiap nama tabel destinasi.
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: trueStudi kasus komprehensif untuk skenario bisnis kompleks
Konfigurasi pekerjaan ingesti data Flink CDC berikut merupakan contoh komprehensif untuk skenario bisnis kompleks yang menggabungkan fitur-fitur yang dijelaskan pada bagian sebelumnya. Anda dapat menyesuaikan kode ini sesuai kebutuhan bisnis spesifik Anda.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
# (Opsional) Kecualikan tabel yang tidak ingin Anda sinkronkan.
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
# (Opsional) Sinkronkan data penuh dan inkremental untuk tabel yang baru ditambahkan.
scan.newly-added-table.enabled: true
# (Opsional) Sinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Gunakan waktu operasi sebagai metadata.
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
# (Opsional) Ubah primary key.
primary-keys: id,identifier
# (Opsional) Atur bidang partisi.
partition-keys: dt
# (Opsional) Konversi data yang dihapus menjadi insert.
converter-after-transform: SOFT_DELETE
route:
# Sinkronkan semua tabel dari database dlf_test ke database dlf, dengan memberi awalan ods_ pada setiap nama tabel destinasi.
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: trueMulai dari timestamp tertentu
Saat melakukan startup tanpa status (stateless) pada pekerjaan ingesti data Flink CDC, Anda dapat menentukan waktu mulai untuk sumber data. Hal ini memungkinkan Anda melanjutkan pembacaan data dari posisi binary logging (binlog) tertentu.
Konfigurasi di halaman O&M
Di halaman Operations and Maintenance (O&M) pekerjaan, Anda dapat menentukan waktu mulai untuk tabel sumber saat melakukan startup tanpa status.

Konfigurasi parameter pekerjaan
Dalam draf pekerjaan, Anda dapat menentukan waktu mulai untuk tabel sumber dengan mengonfigurasi parameter.
Untuk sumber MySQL, Anda dapat mengatur scan.startup.mode ke `timestamp` dalam konfigurasi pekerjaan untuk menentukan waktu mulai. Berikut adalah contoh konfigurasinya:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Opsional) Mulai dalam mode yang menentukan waktu mulai untuk tabel sumber.
scan.startup.mode: timestamp
# Tentukan timestamp startup dalam mode startup berbasis timestamp.
scan.startup.timestamp-millis: 1667232000000
# (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental.
scan.binlog.newly-added-table.enabled: true
# (Opsional) Sinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: trueJika Anda menentukan waktu mulai baik di halaman O&M maupun parameter pekerjaan, konfigurasi di halaman O&M memiliki prioritas lebih tinggi.