全部产品
Search
文档中心

Realtime Compute for Apache Flink:Studi kasus: Bangun pekerjaan ingesti data untuk skenario bisnis kompleks menggunakan Flink CDC

更新时间:Feb 05, 2026

Topik ini menjelaskan praktik terbaik penggunaan pekerjaan ingesti data Flink CDC dalam skenario bisnis kompleks, mencakup penanganan evolusi skema tabel sumber, peningkatan logika data melalui penyisipan metadata, penambahan kolom terhitung, penerapan soft delete, implementasi routing heterogen untuk menggabungkan tabel yang di-shard dan menyinkronkan seluruh database, serta penerapan kontrol presisi melalui pemfilteran tabel dan memulai pekerjaan dari timestamp tertentu.

Sinkronkan tabel yang baru ditambahkan

Pekerjaan ingesti data Flink CDC mendukung sinkronisasi tabel yang baru ditambahkan dengan dua cara:

  • Sinkronisasi panas (hot synchronization) tabel kosong baru: Metode ini berlaku untuk tabel baru yang tidak memiliki data historis dan hanya akan berisi perubahan selanjutnya. Pekerjaan dapat secara dinamis menangkap tabel-tabel tersebut tanpa perlu restart.

  • Sinkronisasi tabel dengan data historis: Metode ini berlaku untuk tabel baru yang sudah berisi data historis. Proses ini memerlukan sinkronisasi penuh dan inkremental, serta pekerjaan harus di-restart agar perubahan diterapkan.

Sinkronisasi panas tabel kosong baru tanpa data historis

Anda dapat mengaktifkan parameter scan.binlog.newly-added-table.enabled agar pekerjaan Flink CDC Anda dapat menyinkronkan tabel kosong baru secara real time selama fase inkremental. Metode ini tidak memerlukan restart pekerjaan dan merupakan pendekatan yang direkomendasikan.

Sebagai contoh, jika pekerjaan ingesti data Flink CDC sedang menyinkronkan semua tabel di database MySQL `dlf_test` dan sebuah tabel kosong baru bernama products dibuat di database sumber, Anda dapat menyinkronkan tabel baru ini tanpa me-restart pekerjaan. Untuk melakukannya, atur parameter scan.binlog.newly-added-table.enabled ke `true` dalam konfigurasi pekerjaan Anda. Berikut adalah contoh konfigurasinya:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental.
  scan.binlog.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

Setelah Anda menjalankan pekerjaan CDC YAML dengan konfigurasi ini, pekerjaan tersebut secara otomatis membuat semua tabel baru dari database `dlf_test` di destinasi.

Penting

Parameter scan.newly-added-table.enabled hanya berlaku ketika scan.startup.mode diatur ke initial (nilai default).

Sinkronkan tabel dengan data historis

Asumsikan sebuah database MySQL berisi tabel `customers` dan `products`, tetapi Anda hanya ingin menyinkronkan tabel `customers` saat startup. Konfigurasi pekerjaan awal adalah sebagai berikut:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.customers
  server-id: 8601-8604
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

Setelah pekerjaan berjalan selama beberapa waktu, jika Anda ingin menyinkronkan semua tabel beserta data historisnya dari database tersebut, Anda harus me-restart pekerjaan. Untuk melakukannya, ikuti langkah-langkah berikut:

  1. Hentikan pekerjaan dan buat titik simpan (savepoint).

  2. Ubah konfigurasi `tables` pada sumber data MySQL agar mencakup semua tabel yang ingin Anda sinkronkan. Selain itu, hapus parameter scan.binlog.newly-added-table.enabled dan aktifkan parameter scan.newly-added-table.enabled.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan data penuh dan inkremental untuk tabel yang baru ditambahkan.
  scan.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true
  1. Jalankan ulang pekerjaan dari titik simpan tersebut.

Penting

Anda tidak dapat mengaktifkan scan.binlog.newly-added-table.enabled dan scan.newly-added-table.enabled secara bersamaan.

Kecualikan tabel tertentu

Dalam pekerjaan ingesti data Flink CDC, Anda dapat mengecualikan tabel tertentu agar tidak dibuat dan disinkronkan ke destinasi downstream.

Sebagai contoh, jika database `dlf_test` di MySQL berisi beberapa tabel seperti `customers` dan `products`, dan Anda ingin mengecualikan tabel `products_tmp`, Anda dapat mengonfigurasi pekerjaan sebagai berikut:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  # (Opsional) Kecualikan tabel yang tidak ingin Anda sinkronkan.
  tables.exclude: dlf_test.products_tmp
  server-id: 8601-8604
  # (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental.
  scan.binlog.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

Pekerjaan ingesti data Flink CDC dengan konfigurasi ini secara otomatis membuat semua tabel dari database dlf_test di destinasi, kecuali tabel products_tmp. Pekerjaan ini juga menjaga skema tabel dan data tetap tersinkronisasi secara real time.

Catatan

Parameter `tables.exclude` mendukung penggunaan ekspresi reguler untuk mencocokkan beberapa tabel. Jika terdapat tumpang tindih antara tabel yang ditentukan di `tables.exclude` dan `tables`, tabel yang tumpang tindih tersebut akan dikecualikan dan tidak disinkronkan. Artinya, pengecualian memiliki prioritas lebih tinggi daripada inklusi.

Tingkatkan dengan metadata dan kolom terhitung

Tambahkan kolom metadata

Saat menulis data, Anda dapat menggunakan modul transform untuk menambahkan kolom metadata. Sebagai contoh, konfigurasi pekerjaan berikut menambahkan nama tabel, waktu operasi, dan jenis operasi ke tabel downstream. Untuk informasi lebih lanjut, lihat Modul Transform.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan data penuh dan inkremental untuk tabel yang baru ditambahkan.
  scan.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Gunakan waktu operasi sebagai metadata.
  metadata-column.include-list: op_ts

transform:
  - source-table: dlf_test.customers
    projection: __schema_name__ || '.' || __table_name__  as identifier, op_ts, __data_event_type__ as op, *
    # (Opsional) Ubah primary key.
    primary-keys: id,identifier
    description: tambahkan identifier, op_ts, dan op
    
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true
Catatan

Saat menggunakan MySQL sebagai sumber, Anda harus mengatur `metadata-column.include-list: op_ts` untuk mengirim waktu operasi sebagai metadata ke sink downstream. Untuk informasi lebih lanjut, lihat MySQL.

Data sumber untuk pekerjaan ingesti berisi semua jenis event change data capture (CDC), termasuk operasi delete. Untuk mengonversi operasi `DELETE` menjadi operasi `INSERT` di tabel downstream dan menerapkan soft delete, tambahkan konfigurasi `converter-after-transform: SOFT_DELETE` dalam modul transform.

Tambahkan kolom terhitung

Saat menulis data, Anda dapat menggunakan modul transform untuk menambahkan kolom terhitung. Sebagai contoh, konfigurasi pekerjaan berikut mengubah bidang `created_at` untuk menghasilkan bidang `dt`, lalu menggunakan bidang `dt` baru tersebut sebagai bidang partisi untuk tabel downstream.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan data penuh dan inkremental untuk tabel yang baru ditambahkan.
  scan.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Gunakan waktu operasi sebagai metadata.
  metadata-column.include-list: op_ts

transform:
  - source-table: dlf_test.customers
    projection: DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
    # (Opsional) Atur bidang partisi.
    partition-keys: dt
    description: tambahkan dt
    
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true
Catatan

Saat menggunakan MySQL sebagai sumber, Anda harus mengatur `metadata-column.include-list: op_ts` untuk mengirim waktu operasi sebagai metadata ke sink downstream. Untuk informasi lebih lanjut, lihat MySQL.

Pemetaan nama tabel

Saat menyinkronkan tabel leluhur ke tabel turunan, Anda mungkin perlu mengganti nama tabel menggunakan modul route. Bagian berikut menyediakan contoh konfigurasi untuk skenario umum di mana Anda mengganti nama tabel dalam pekerjaan ingesti data Flink CDC.

Gabungkan database dan tabel yang di-shard

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan data penuh dan inkremental untuk tabel yang baru ditambahkan.
  scan.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true
 
route:
  # Gabungkan semua tabel di database dlf_test yang namanya dimulai dengan product_ dan diakhiri angka ke tabel dlf.products.
  - source-table: dlf_test.product_[0-9]+
    sink-table: dlf.products

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

Sinkronkan seluruh database

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan data penuh dan inkremental untuk tabel yang baru ditambahkan.
  scan.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  
route:
  # Ubah nama tabel secara seragam. Sinkronkan semua tabel dari database dlf_test ke database dlf, dengan memberi awalan ods_ pada setiap nama tabel destinasi.
  - source-table: dlf_test.\.*
    sink-table: dlf.ods_<>
    replace-symbol: <>
        
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

Studi kasus komprehensif untuk skenario bisnis kompleks

Konfigurasi pekerjaan ingesti data Flink CDC berikut merupakan contoh komprehensif untuk skenario bisnis kompleks yang menggabungkan fitur-fitur yang dijelaskan pada bagian sebelumnya. Anda dapat menyesuaikan kode ini sesuai kebutuhan bisnis spesifik Anda.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  # (Opsional) Kecualikan tabel yang tidak ingin Anda sinkronkan.
  tables.exclude: dlf_test.products_tmp
  server-id: 8601-8604
  # (Opsional) Sinkronkan data penuh dan inkremental untuk tabel yang baru ditambahkan.
  scan.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Gunakan waktu operasi sebagai metadata.
  metadata-column.include-list: op_ts

transform:
  - source-table: dlf_test.customers
    projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
    # (Opsional) Ubah primary key.
    primary-keys: id,identifier
    # (Opsional) Atur bidang partisi.
    partition-keys: dt
    # (Opsional) Konversi data yang dihapus menjadi insert.
    converter-after-transform: SOFT_DELETE
    
route:
  # Sinkronkan semua tabel dari database dlf_test ke database dlf, dengan memberi awalan ods_ pada setiap nama tabel destinasi.
  - source-table: dlf_test.\.*
    sink-table: dlf.ods_<>
    replace-symbol: <>
    
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

Mulai dari timestamp tertentu

Saat melakukan startup tanpa status (stateless) pada pekerjaan ingesti data Flink CDC, Anda dapat menentukan waktu mulai untuk sumber data. Hal ini memungkinkan Anda melanjutkan pembacaan data dari posisi binary logging (binlog) tertentu.

Konfigurasi di halaman O&M

Di halaman Operations and Maintenance (O&M) pekerjaan, Anda dapat menentukan waktu mulai untuk tabel sumber saat melakukan startup tanpa status.

image

Konfigurasi parameter pekerjaan

Dalam draf pekerjaan, Anda dapat menentukan waktu mulai untuk tabel sumber dengan mengonfigurasi parameter.

Untuk sumber MySQL, Anda dapat mengatur scan.startup.mode ke `timestamp` dalam konfigurasi pekerjaan untuk menentukan waktu mulai. Berikut adalah contoh konfigurasinya:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Opsional) Mulai dalam mode yang menentukan waktu mulai untuk tabel sumber.
  scan.startup.mode: timestamp
  # Tentukan timestamp startup dalam mode startup berbasis timestamp.
  scan.startup.timestamp-millis: 1667232000000
  # (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental.
  scan.binlog.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Tentukan username commit. Kami merekomendasikan menetapkan username berbeda untuk setiap pekerjaan guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true
Catatan

Jika Anda menentukan waktu mulai baik di halaman O&M maupun parameter pekerjaan, konfigurasi di halaman O&M memiliki prioritas lebih tinggi.