全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konfigurasi evolusi skema

更新时间:Feb 06, 2026

Topik ini menjelaskan cara mengonfigurasi evolusi skema untuk pekerjaan ingest data Flink CDC.

Konfigurasi evolusi skema

Pekerjaan Ingesti Data Flink CDC mendukung sinkronisasi perubahan skema dari sumber ke tabel sink. Event yang didukung meliputi pembuatan tabel, penambahan kolom, penggantian nama kolom, pengubahan tipe kolom, penghapusan kolom, dan penghapusan tabel.

Tabel turunan mungkin tidak mendukung semua perubahan skema. Anda dapat menambahkan konfigurasi schema.change.behavior ke modul pipeline untuk menyesuaikan cara tabel turunan menangani perubahan tersebut.

pipeline:
  schema.change.behavior: EVOLVE

Framework hanya mendukung jenis perubahan skema yang tercantum di bawah ini. Perubahan skema yang tidak didukung dapat menyebabkan exception pada pekerjaan dan memerlukan restart tanpa status untuk pemulihan.

  • Create Table (CREATE TABLE ...)

  • Add Column (ALTER TABLE ... ADD COLUMN ...)

  • Alter Column Type (ALTER TABLE ... MODIFY COLUMN ...)

  • Drop Column (ALTER TABLE ... DROP COLUMN ...)

  • Rename Column (ALTER TABLE ... RENAME COLUMN ...)

  • Truncate Table (TRUNCATE TABLE ...)

  • Drop Table (DROP TABLE ...)

Perilaku evolusi skema

pattern

Deskripsi

LENIENT (default)

Flink CDC mengubah perubahan skema yang tidak didukung menjadi tipe yang didukung oleh sink berdasarkan aturan berikut:

  • Tidak mengirim perubahan Drop table atau Truncate table.

  • Untuk event rename.column, kirim event alter.column.type dan add.column. Pertahankan kolom asli. Ubah tipe kolomnya menjadi nullable. Tambahkan kolom baru yang nullable dengan nama baru.

  • Untuk event drop.column, kirim event alter.column.type dan atur tipe kolom menjadi nullable.

  • Saat Anda menambahkan kolom baru, sistem tetap mengirim event (column), tetapi tipe bidangnya menjadi nullable.

Gunakan perilaku ini jika Anda ingin Pekerjaan Ingesti Data menyinkronkan perubahan skema secara otomatis semaksimal mungkin.

IGNORE

Pekerjaan Ingesti Data Flink CDC mengabaikan seluruh evolusi skema, artinya modifikasi skema tabel hulu tidak diterapkan ke tabel sink hilir.

Gunakan perilaku ini jika sink Anda tidak mendukung perubahan skema atau Anda tidak menginginkannya, namun tetap ingin menerima data dari kolom yang sudah ada.

EVOLVE

Pekerjaan Ingesti Data Flink CDC menerapkan semua perubahan skema ke tabel turunan.

Jika penerapan perubahan skema ke tabel turunan gagal, pekerjaan akan melempar exception dan memicu restart karena kegagalan.

Gunakan perilaku ini jika Anda menginginkan Sinkronisasi Skema yang ketat dan eksak dalam Pekerjaan Ingesti Data Anda.

Penting

Jika sink tidak dapat memproses semua event perubahan skema, pekerjaan mungkin mengalami failover dan tidak dapat pulih sendiri.

TRY_EVOLVE

Pekerjaan Ingesti Data Flink CDC mencoba menerapkan perubahan skema ke tabel turunan. Jika tabel turunan tidak dapat memproses perubahan skema tersebut, pekerjaan tidak gagal atau restart. Sebaliknya, pekerjaan mencoba menangani masalah tersebut dengan mentransformasi data berikutnya.

Gunakan perilaku ini jika Anda menginginkan Sinkronisasi Skema yang ketat dengan toleransi kesalahan tertentu.

Penting

Jika penerapan perubahan skema gagal, data selanjutnya mungkin kehilangan beberapa kolom atau dipotong agar sesuai dengan skema tabel sink.

EXCEPTION

Pekerjaan Ingesti Data Flink CDC tidak mengizinkan perubahan skema apa pun dan melempar exception saat menerima event perubahan skema.

Gunakan perilaku ini jika Anda harus memastikan bahwa tidak ada perubahan skema yang terjadi—hanya data yang disinkronkan.

Kontrol perubahan skema pada sink

Dalam skenario Sinkronisasi Data, Flink CDC menyediakan manajemen evolusi skema detail halus. Anda dapat mengonfigurasi aturan untuk mengontrol jenis perubahan skema mana yang mencapai sink guna mencegah kehilangan data atau gangguan layanan akibat perubahan tak terduga.

Anda dapat mengontrolnya dengan mengatur opsi include.schema.changes dan exclude.schema.changes di modul sink.

Opsi Konfigurasi

Deskripsi

Diperlukan

Tipe data

Nilai default

Catatan

include.schema.changes

Mendukung perubahan skema aplikasi.

Tidak

List<String>

Tidak ada nilai default

Semua perubahan didukung secara default.

exclude.schema.changes

Anda tidak dapat mengubah skema untuk aplikasi.

Tidak

List<String>

Tidak ada nilai default

Opsi ini memiliki prioritas lebih tinggi daripada include.schema.changes.

Berikut adalah daftar lengkap tipe event perubahan skema yang dapat dikonfigurasi:

Jenis peristiwa

Deskripsi

add.column

Menambahkan kolom.

alter.column.type

Mengubah tipe kolom.

create.table

Membuat tabel.

drop.column

Menghapus kolom.

drop.table

Menghapus tabel.

rename.column

Mengubah nama kolom.

truncate.table

Menghapus data.

Catatan

Jenis event ini mendukung pencocokan parsial. Misalnya, menentukan drop akan cocok dengan drop.column dan drop.table. Menentukan table akan cocok dengan create.table, truncate.table, dan drop.table.

Contoh

  • Contoh 1: Terapkan semua perubahan skema dari sumber ke sink dengan mengatur schema.change.behavior ke EVOLVE:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  
pipeline:
  name: mysql to print job
  schema.change.behavior: EVOLVE
  • Contoh 2: Terapkan hanya event pembuatan tabel dan yang terkait kolom ke sink, sambil mengecualikan event drop.column:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  include.schema.changes: [create.table, column] # Wildcard `column` cocok dengan `add.column`, `alter.column.type`, `rename.column`, dan `drop.column`.
  exclude.schema.changes: [drop.column] # Mengecualikan event `drop.column`.
  
pipeline:
  name: mysql to print job
  schema.change.behavior: EVOLVE