全部产品
Search
文档中心

Realtime Compute for Apache Flink:Kasus penggunaan untuk membangun pekerjaan ingesti data untuk skenario bisnis kompleks dengan Flink CDC

更新时间:Nov 10, 2025

Topik ini menjelaskan praktik terbaik penggunaan pekerjaan ingesti data Flink Change Data Capture (CDC) dalam skenario bisnis kompleks, mencakup evolusi skema tabel sumber, peningkatan logika data (seperti menyisipkan metadata, menambahkan kolom terhitung, dan melakukan soft delete), perutean heterogen (seperti menggabungkan tabel yang di-shard dan menyinkronkan seluruh database), serta kontrol presisi (seperti memfilter tabel dan memulai dari stempel waktu tertentu).

Sinkronisasi tabel yang baru ditambahkan

Pekerjaan ingesti data Flink CDC mendukung sinkronisasi tabel yang baru ditambahkan dalam dua skenario:

  • Sinkronisasi panas tabel kosong baru: Anda dapat secara dinamis menangkap tabel baru tanpa data historis. Pekerjaan tersebut menangkap perubahan berikutnya pada tabel-tabel ini tanpa perlu restart.

  • Sinkronisasi tabel baru dengan data historis: Tabel baru yang telah berisi data historis memerlukan sinkronisasi penuh dan inkremental. Anda harus me-restart pekerjaan untuk menyertakan tabel-tabel tersebut.

Sinkronisasi panas tabel kosong baru tanpa data historis

Anda dapat mengaktifkan parameter scan.binlog.newly-added-table.enabled dalam pekerjaan Flink CDC untuk menyinkronisasi tabel kosong baru secara real time selama fase inkremental. Tabel-tabel ini dibuat tanpa data historis dan tidak memerlukan restart pekerjaan. Konfigurasi ini direkomendasikan.

Sebagai contoh, sebuah Pekerjaan Ingesti Data Flink CDC sedang menyinkronkan semua tabel di database MySQL `dlf_test`. Sebuah tabel kosong baru bernama products tanpa data historis dibuat di database sumber. Untuk menyinkronkan tabel baru ini tanpa memulai ulang pekerjaan, aktifkan parameter scan.binlog.newly-added-table.enabled: true. Konfigurasinya adalah sebagai berikut:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronisasi data dari tabel yang baru dibuat selama fase inkremental.
  scan.binlog.newly-added-table.enabled: true
  # (Opsional) Sinkronisasi komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
  table.properties.deletion-vectors.enabled: true

Setelah pekerjaan CDC YAML dengan konfigurasi ini berjalan, pekerjaan tersebut secara otomatis menyinkronisasi semua tabel kosong baru dari database dlf_test ke tujuan.

Sinkronisasi tabel baru dengan data historis

Asumsikan bahwa tabel customers dan products sudah ada di database MySQL, tetapi Anda hanya ingin menyinkronisasi tabel customers saat startup. Konfigurasi pekerjaan adalah sebagai berikut:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.customers
  server-id: 8601-8604
  # (Opsional) Sinkronisasi komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
  table.properties.deletion-vectors.enabled: true

Setelah pekerjaan berjalan selama beberapa waktu, jika Anda ingin menyinkronisasi semua tabel beserta data historisnya dari database, Anda harus me-restart pekerjaan tersebut. Ikuti langkah-langkah berikut:

  1. Hentikan pekerjaan dan buat titik simpan.

  2. Dalam konfigurasi sumber data MySQL, ubah parameter tables agar sesuai dengan tabel yang diinginkan. Hapus parameter scan.binlog.newly-added-table.enabled dan aktifkan scan.newly-added-table.enabled.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronisasi data penuh dan inkremental untuk tabel yang baru ditambahkan.
  scan.newly-added-table.enabled: true
  # (Opsional) Sinkronisasi komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
  table.properties.deletion-vectors.enabled: true
  1. Jalankan ulang pekerjaan dari titik simpan.

Penting

Anda tidak dapat mengaktifkan scan.binlog.newly-added-table.enabled dan scan.newly-added-table.enabled secara bersamaan.

Mengecualikan tabel tertentu

Dalam pekerjaan ingesti data Flink CDC, Anda dapat memfilter tabel tertentu agar tidak dibuat dan disinkronisasi ke hilir.

Sebagai contoh, database MySQL dlf_test berisi beberapa tabel, seperti customers dan products. Jika Anda ingin mengecualikan tabel bernama products_tmp, konfigurasikan pekerjaan sebagai berikut:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  # (Opsional) Kecualikan tabel yang tidak ingin Anda sinkronkan.
  tables.exclude: dlf_test.products_tmp
  server-id: 8601-8604
  # (Opsional) Sinkronisasi data dari tabel yang baru dibuat selama fase inkremental.
  scan.binlog.newly-added-table.enabled: true
  # (Opsional) Sinkronisasi komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
  table.properties.deletion-vectors.enabled: true

Pekerjaan ingesti data Flink CDC dengan konfigurasi ini secara otomatis menyinkronisasi semua tabel dari database dlf_test ke tujuan, kecuali tabel products_tmp. Pekerjaan ini juga menjaga skema tabel dan data tetap tersinkronisasi secara real time.

Catatan

Parameter tables.exclude mendukung ekspresi reguler untuk mencocokkan beberapa tabel. Jika suatu tabel ditentukan baik di parameter tables maupun tables.exclude, aturan pengecualian akan didahulukan, sehingga tabel tersebut tidak akan disinkronkan.

Tingkatkan dengan metadata dan kolom terhitung

Tambahkan kolom metadata

Saat menulis data, Anda dapat menggunakan modul transform untuk menambahkan kolom metadata. Sebagai contoh, konfigurasi pekerjaan berikut menulis nama tabel, waktu operasi, dan jenis operasi ke tabel hilir. Untuk informasi lebih lanjut, lihat Modul Transform.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronisasi data penuh dan inkremental untuk tabel yang baru ditambahkan.
  scan.newly-added-table.enabled: true
  # (Opsional) Sinkronisasi komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Gunakan waktu operasi sebagai metadata.
  metadata-column.include-list: op_ts

transform:
  - source-table: dlf_test.customers
    projection: __schema_name__ || '.' || __table_name__  as identifier, op_ts, __data_event_type__ as op, *
    # (Opsional) Ubah kunci primer.
    primary-keys: id,identifier
    description: tambahkan identifier, op_ts, dan op
    
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
  table.properties.deletion-vectors.enabled: true
Catatan

Saat menggunakan MySQL sebagai sumber, Anda harus menambahkan metadata-column.include-list: op_ts untuk mengirim waktu operasi sebagai metadata ke hilir. Untuk informasi lebih lanjut, lihat MySQL.

Jika Anda ingin melakukan soft delete di tabel hilir, Anda dapat mengubah operasi delete menjadi operasi insert. Untuk melakukannya, tambahkan konfigurasi converter-after-transform: SOFT_DELETE dalam modul transform. Hal ini berguna karena data sumber berisi semua jenis perubahan, termasuk penghapusan.

Tambahkan kolom terhitung

Saat menulis data, Anda dapat menggunakan modul transform untuk menambahkan kolom terhitung. Sebagai contoh, konfigurasi pekerjaan berikut mengubah bidang created_at untuk menghasilkan bidang dt, yang kemudian digunakan sebagai bidang partisi untuk tabel hilir.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronisasi data penuh dan inkremental untuk tabel yang baru ditambahkan.
  scan.newly-added-table.enabled: true
  # (Opsional) Sinkronisasi komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Gunakan waktu operasi sebagai metadata.
  metadata-column.include-list: op_ts

transform:
  - source-table: dlf_test.customers
    projection: DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
    # (Opsional) Tetapkan bidang partisi.
    partition-keys: dt
    description: tambahkan dt
    
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
  table.properties.deletion-vectors.enabled: true
Catatan

Saat menggunakan MySQL sebagai sumber, Anda harus menambahkan metadata-column.include-list: op_ts untuk mengirim waktu operasi sebagai metadata ke hilir. Untuk informasi lebih lanjut, lihat MySQL.

Pemetaan nama tabel

Saat menyinkronisasi data dari tabel sumber ke tabel tujuan, Anda mungkin perlu mengubah nama tabel. Anda dapat melakukannya menggunakan modul routing. Bagian-bagian berikut menjelaskan skenario khas dan memberikan contoh konfigurasi pekerjaan ingesti data Flink CDC.

Gabungkan tabel yang di-shard

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronisasi data penuh dan inkremental untuk tabel yang baru ditambahkan.
  scan.newly-added-table.enabled: true
  # (Opsional) Sinkronisasi komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true
 
route:
  # Gabungkan semua tabel di database dlf_test yang namanya diawali dengan 'product_' dan diakhiri angka menjadi dlf.products.
  - source-table: dlf_test.product_[0-9]+
    sink-table: dlf.products

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
  table.properties.deletion-vectors.enabled: true

Sinkronisasi seluruh database

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronisasi data penuh dan inkremental untuk tabel yang baru ditambahkan.
  scan.newly-added-table.enabled: true
  # (Opsional) Sinkronisasi komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  
route:
  # Ubah nama tabel secara seragam. Sinkronisasi semua tabel dari database dlf_test ke database dlf. Nama tabel baru diberi awalan 'ods_'.
  - source-table: dlf_test.\.*
    sink-table: dlf.ods_<>
    replace-symbol: <>
        
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
  table.properties.deletion-vectors.enabled: true

Kasus penggunaan komprehensif untuk skenario bisnis kompleks

Pekerjaan ingesti data Flink CDC berikut merupakan contoh komprehensif yang menggabungkan fitur-fitur yang disebutkan dalam topik ini untuk menangani skenario bisnis kompleks. Anda dapat menyesuaikan konfigurasi ini sesuai kebutuhan bisnis spesifik Anda.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  # (Opsional) Kecualikan tabel yang tidak ingin Anda sinkronkan.
  tables.exclude: dlf_test.products_tmp
  server-id: 8601-8604
  # (Opsional) Sinkronisasi data penuh dan inkremental untuk tabel yang baru ditambahkan.
  scan.newly-added-table.enabled: true
  # (Opsional) Sinkronisasi komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Gunakan waktu operasi sebagai metadata.
  metadata-column.include-list: op_ts

transform:
  - source-table: dlf_test.customers
    projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
    # (Opsional) Ubah kunci primer.
    primary-keys: id,identifier
    # (Opsional) Tetapkan bidang partisi.
    partition-keys: dt
    # (Opsional) Ubah data yang dihapus menjadi insert.
    converter-after-transform: SOFT_DELETE
    
route:
  # Sinkronisasi semua tabel dari database dlf_test ke database dlf. Nama tabel baru diberi awalan 'ods_'.
  - source-table: dlf_test.\.*
    sink-table: dlf.ods_<>
    replace-symbol: <>
    
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
  table.properties.deletion-vectors.enabled: true

Mulai dari stempel waktu tertentu

Saat melakukan start tanpa status dari pekerjaan ingesti data Flink CDC, Anda dapat menentukan waktu mulai untuk sumber data. Hal ini membantu Anda melanjutkan pembacaan data dari posisi binary logging (binlog) tertentu.

Konfigurasi di halaman O&M

Di halaman O&M pekerjaan, Anda dapat menentukan waktu mulai untuk tabel sumber saat memilih start tanpa status untuk pekerjaan tersebut.

image

Konfigurasi parameter pekerjaan

Dalam draf pekerjaan, Anda dapat menentukan waktu mulai untuk tabel sumber dengan mengonfigurasi parameter.

Untuk sumber MySQL, Anda dapat mengatur scan.startup.mode: timestamp dalam konfigurasi pekerjaan untuk menentukan waktu mulai. Berikut adalah contoh konfigurasinya:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Opsional) Mulai dalam mode yang menentukan waktu mulai tabel sumber.
  scan.startup.mode: timestamp
  # Tentukan stempel waktu startup dalam mode startup berbasis stempel waktu.
  scan.startup.timestamp-millis: 1667232000000
  # (Opsional) Sinkronisasi data dari tabel yang baru dibuat selama fase inkremental.
  scan.binlog.newly-added-table.enabled: true
  # (Opsional) Sinkronisasi komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory Pengelola Tugas.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk commit. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan vektor penghapusan untuk meningkatkan kinerja baca.
  table.properties.deletion-vectors.enabled: true
Catatan

Waktu mulai yang ditentukan di halaman O&M didahulukan daripada waktu mulai yang ditentukan dalam parameter pekerjaan.