全部产品
Search
文档中心

Realtime Compute for Apache Flink:Modul Transform Flink CDC

更新时间:Feb 06, 2026

Topik ini menjelaskan aturan sintaks dan fungsi bawaan yang didukung oleh modul Transform untuk pekerjaan data ingestion Flink CDC.

Parameter aturan transformasi

Modul Transform memungkinkan Anda memanipulasi kolom data secara langsung, termasuk menghapus atau menambahkan kolom yang sudah ada serta menyaring data yang tidak diinginkan selama sinkronisasi. Gunakan parameter berikut untuk mendefinisikan aturan transformasi:

Parameter

Deskripsi

Diperlukan

Catatan

source-table

Menentukan tabel hulu yang akan ditransformasikan.

Ya

Mendukung ekspresi reguler.

projection

Menentukan aturan proyeksi untuk tabel hulu. Ini menentukan semua kolom data setelah transformasi.

Tidak

Sintaksisnya mirip dengan pernyataan SELECT SQL.

Jika Anda biarkan kosong, tidak ada kolom yang ditambahkan atau dihapus.

Untuk informasi lebih lanjut, lihat Penyaringan data. Untuk informasi tentang fungsi bawaan yang tersedia, lihat dokumen Fungsi bawaan Flink CDC.

filter

Aturan penyaringan baris.

Tidak

Sintaksnya mirip dengan pernyataan WHERE SQL.

Jika Anda biarkan kosong, tidak ada baris yang disaring.

primary-keys

Menentukan kolom kunci primer setelah transformasi.

Tidak

Jika Anda biarkan kosong, definisi kunci primer dari skema asli dipertahankan. Pisahkan kunci primer dengan koma (,).

Penting

Saat Anda menentukan kolom kunci primer kustom, pastikan data hulu yang masuk memenuhi kendala KUNCI UTAMA (PRIMARY KEY). Kami menyarankan agar Anda menyertakan kolom kunci primer dari tabel hulu dalam kolom kunci primer kustom Anda. Hal ini membantu mencegah masalah ketidakteraturan data selama penulisan lintas partisi.

partition-keys

Menentukan daftar kolom kunci partisi setelah transformasi.

Tidak

Jika Anda biarkan kosong, definisi kunci partisi dari skema asli dipertahankan. Pisahkan kunci partisi dengan koma (,).

Penting

Saat Anda menentukan kolom kunci partisi kustom, pastikan data hulu yang masuk memenuhi kendala KUNCI UTAMA (PRIMARY KEY). Hal ini membantu mencegah masalah ketidakteraturan data selama penulisan lintas partisi.

table-options

Informasi konfigurasi tambahan yang diteruskan ke sink.

Tidak

Daftar atribut opsional, seperti jumlah bucket dan komentar untuk sink Paimon.

Pisahkan item konfigurasi berbeda dengan koma (,). Pisahkan kunci dan nilai suatu item konfigurasi dengan tanda sama dengan (=).

Contoh konfigurasi:

key1=value1,key2=value2

description

Deskripsi aturan transformasi.

Tidak

Tidak ada.

converter-after-transform

Konverter yang melakukan pemrosesan tambahan terhadap data setelah transformasi selesai.

Tidak

Untuk informasi lebih lanjut, lihat Konverter setelah Transform.

Catatan

  • Setelah mengubah pernyataan dalam modul transform, Anda harus melakukan restart tanpa status (stateless restart) pada pekerjaan tersebut.

  • Umumnya, Anda tidak perlu membungkus pernyataan projection dan filter dalam tanda kutip.

    transform:
      - projection: a, b, c
        # Setara dengan
      - projection: "a, b, c"

    Namun, jika karakter pertama dari ekspresi projection merupakan karakter khusus—seperti tanda bintang (*) atau tanda kutip tunggal (')—seluruh ekspresi mungkin tidak dapat diurai sebagai literal string YAML yang valid. Dalam kasus ini, Anda harus membungkus seluruh ekspresi secara manual dalam tanda kutip tunggal (') atau ganda ("), atau menggunakan garis miring terbalik (\) untuk meng-escape karakter tersebut:

    transform:
      - projection: *, 42      # Tidak valid dalam YAML
      - projection: '*, 42'    # OK
      - projection: \*, 42     # OK  

Penyaringan bidang

Modul Transform data ingestion menggunakan sintaks mirip SQL untuk menentukan aturan proyeksi. Anda dapat menggunakan aturan ini untuk memilih kolom tertentu, menambahkan kolom terhitung, dan menambahkan kolom metadata.

Pemangkasan kolom

Jika ingin mengekstrak kolom tertentu dari tabel sumber dan menyinkronkannya ke tujuan hilir, cantumkan kolom tersebut dalam aturan projection. Kolom yang tidak ditentukan tidak dikirim ke tujuan hilir:

transform:
  - source-table: db.tbl
    projection: col_1, col_3, col_4 # col_2 dipangkas
Penting

Pemangkasan kolom dapat menyebabkan skema tabel hulu dan hilir menjadi tidak selaras jika skema tabel hulu berubah.

Karakter wildcard

Jika ingin mengirim semua kolom dari tabel sumber beserta kolom baru apa pun yang ditambahkan selanjutnya ke hilir secara apa adanya, Anda dapat menggunakan karakter wildcard tanda bintang (*) dalam aturan projection.

Catatan

Jika aturan projection tidak menggunakan karakter wildcard tanda bintang (*), skema yang dihasilkan bersifat tetap dan selalu sesuai dengan versi yang ditentukan dalam aturan projection.

Sebagai contoh, *, 'extras' AS extras menambahkan kolom tambahan di akhir kolom skema hulu. Kolom ini juga terus-menerus mengirim evolusi skema hulu ke tujuan hilir.

transform:
  - source-table: db.tbl
    projection: \*, 'extras' AS extras

Kolom terhitung

Anda dapat menggunakan sintaks <Ekspresi> AS <NamaKolom> dalam aturan projection untuk menambahkan kolom terhitung. Ekspresi dievaluasi untuk setiap catatan data hulu, dan hasilnya dimasukkan ke kolom yang sesuai.

Catatan

Ekspresi untuk kolom terhitung tidak dapat mereferensikan nilai kolom terhitung lain, bahkan jika kolom yang direferensikan muncul sebelumnya. Misalnya, a, b AS c, c AS d bukanlah ekspresi yang valid.

Sebagai contoh, ketika catatan data [+I, id = 1] diterima dari tabel hulu db.tbl, catatan tersebut ditransformasi menjadi baris data [+I, id = 1, inc_id = 2] dan dikirim ke tujuan hilir.

transform:
  - source-table: db.tbl
    projection: id, id + 1 AS inc_id

Kolom metadata

Saat menulis aturan projection, Anda dapat menggunakan kolom metadata pradidefinisi berikut sebagai kolom data biasa:

Penting

Jangan mendefinisikan kolom data biasa dengan nama yang sama seperti kolom metadata.

Kolom metadata berikut berlaku untuk semua konektor.

Nama kolom metadata

Tipe data

Deskripsi

__namespace_name__

String

Nama namespace dari tabel sumber yang sesuai dengan catatan perubahan data ini.

__schema_name__

String

Nama skema dari tabel sumber yang sesuai dengan catatan perubahan data ini.

__table_name__

String

Nama tabel dari tabel sumber yang sesuai dengan catatan perubahan data ini.

__data_event_type__

String

Jenis operasi catatan perubahan data ini (+I, -U, +U, atau -D).

Penting

Event CDC selalu mengemas operasi Update Before dan Update After untuk satu pembaruan tunggal ke dalam satu event. Oleh karena itu, konten __data_event_type__ untuk event pembaruan yang sama adalah -U dan +U. Jangan gunakan kolom ini sebagai kunci primer.

Sebagai contoh, tulis nama lengkap tabel hulu ke kolom terhitung dan kirimkan ke tujuan hilir.

transform:
  - source-table: \.*.\.*
    projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier

Tabel berikut menunjukkan pemetaan antara nama namespace, skema, dan tabel untuk konektor database yang berbeda.

Jenis database

Nama namespace

Nama skema

Nama tabel

MySQL

-

Database

Tabel

Kafka

-

-

Topik

SLS

-

Proyek

LogStore

MongoDB

-

Database

Koleksi

Paimon

-

Database

Tabel

Hologres

-

Skema

Tabel

StarRocks

-

Database

Tabel

Doris

-

Database

Tabel

Postgres

Database

Catatan

Hanya berlaku saat parameter table-id.include-database diaktifkan.

Skema

Tabel

Penyaringan data

Modul Transform ingesti data menggunakan sintaksis mirip SQL untuk mendefinisikan aturan penyaringan baris.

Aturan filter harus berupa ekspresi yang dapat dievaluasi menjadi tipe BOOLEAN. Ekspresi tersebut dapat mereferensikan kolom apa pun dari tabel sumber dan kolom terhitung apa pun.

Jika catatan perubahan data cocok dengan aturan transform yang memiliki filter tidak kosong, dan ekspresi filter mengevaluasi ke FALSE, baris data tersebut tidak dikirim ke tujuan hilir.

Catatan

Jika Anda menggunakan kolom terhitung dalam aturan projection untuk menimpa kolom hulu yang sudah ada, ekspresi filter akan mereferensikan kolom terhitung tersebut.

Sebagai contoh, pertimbangkan aturan transform berikut:

transform:
  - source-table: db.tbl
    projection: CAST(id AS VARCHAR) AS id
    filter: CHAR_LENGTH(id) > 5

Ini valid. Kolom id yang direferensikan dalam ekspresi filter adalah kolom terhitung yang telah dikonversi ke tipe VARCHAR.

Aturan konfigurasi lanjutan

Referensi kolom yang tidak dipangkas dan metadata

Contoh 1: Filter berdasarkan kolom yang dipangkas

Skema tabel hulu adalah [INT a, INT b, BOOLEAN c]. Untuk mengeluarkan kolom a dan b tetapi hanya menyimpan baris di mana c bernilai true, gunakan aturan konfigurasi berikut:

transform:
  - source-table: ...
    projection: a, b
    filter: c

Contoh 2: Filter berdasarkan kolom metadata

Gunakan langsung kolom metadata sebagai kondisi filter tanpa mendefinisikannya secara eksplisit dalam projection. Sebagai contoh, untuk menyaring perubahan data bertipe DELETE:

transform:
  - source-table: db.tbl
    projection: a, b
    filter: __data_event_type__ = '+I'

Timpa kolom yang sudah ada

Definisikan bidang dalam projection dengan nama yang sama seperti kolom hulu untuk menimpa nilai atau tipe kolom tersebut.

Contoh: Paksa konversi tipe

Skema tabel hulu adalah [INT a, INT b, BOOLEAN c]. Untuk mempertahankan nama kolom tetapi memaksa konversi kolom a ke tipe STRING:

transform:
  - source-table: db.tbl
    projection: \*, CAST(a AS STRING)

Skema tabel hilir menjadi [STRING a, INT b, BOOLEAN c]. Kolom asli a ditimpa oleh tipe yang baru didefinisikan.

Gunakan kembali kolom terhitung dalam kondisi filter

Filter dapat langsung mereferensikan alias kolom terhitung yang didefinisikan dalam projection.

Contoh: Merujuk Hasil Komputasi

Definisikan kolom baru d dalam projection dan gunakan langsung dalam filter.

transform:
  - source-table: db.tbl
    projection: a, b, c, a + b + c AS d
    filter: d > 100

Konfigurasi ini memiliki efek yang sama dengan konfigurasi berikut, tetapi lebih mudah dibaca:

transform:
  - source-table: ...
    projection: a, b, c, a + b + c AS d
    filter: a + b + c > 100

Konverter setelah Transform

Parameter converter-after-transform digunakan untuk memproses perubahan data setelah semua aturan transform diterapkan. Anda dapat menghubungkan beberapa konverter menggunakan koma (,). Pesan dimodifikasi sesuai urutan konverter. Nilai konfigurasi berikut saat ini didukung.

Nama konverter

Fitur

Versi yang didukung

SOFT_DELETE

Mengonversi perubahan penghapusan menjadi penyisipan (insert).

Ververica Runtime (VVR) 8.0.11 dan yang lebih baru.

FIELD_NAME_LOWER_CASE

Mengonversi semua nama bidang dalam tabel menjadi huruf kecil.

VVR 11.1 dan yang lebih baru.

Penghapusan logis

Konverter SOFT_DELETE, dikombinasikan dengan kolom metadata __data_event_type__, dapat mengimplementasikan penghapusan logis. Sebagai contoh, konfigurasi transform berikut mengimplementasikan penghapusan logis. Data yang dihapus tidak benar-benar dihapus di tujuan hilir. Sebaliknya, operasi penghapusan dikonversi menjadi penyisipan, dan op_type untuk data yang sesuai diperbarui menjadi -D untuk menandakan penghapusan.

transform: 
  - source-table: db.tbl
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE