Topik ini menjelaskan sintaksis dan fungsi bawaan yang didukung oleh modul transform dalam pekerjaan ingesti data Flink CDC.
Parameter Aturan Transformasi
Modul transform memungkinkan Anda menambah, menghapus, atau memodifikasi kolom serta menyaring data selama sinkronisasi. Gunakan parameter berikut untuk mendefinisikan aturan transformasi:
Parameter | Deskripsi | Wajib | Catatan |
| Menentukan tabel sumber yang akan ditransformasi. | Ya | Ekspresi reguler didukung. |
| Menentukan aturan proyeksi untuk tabel sumber. Ini mendefinisikan semua kolom output setelah transformasi. | Tidak | Sintaksisnya mirip dengan pernyataan SELECT SQL. Jika parameter ini dikosongkan, tidak ada kolom yang ditambahkan atau dihapus. Untuk informasi lebih lanjut, lihat Definisikan aturan proyeksi. Untuk fungsi bawaan yang tersedia, lihat Fungsi bawaan Flink CDC. |
| Menentukan aturan untuk menyaring baris data. | Tidak | Sintaksisnya mirip dengan klausa WHERE SQL. Jika parameter ini dikosongkan, tidak ada baris yang dikecualikan. |
| Menentukan kolom kunci primer untuk tabel sink. | Tidak | Jika parameter ini dikosongkan, definisi kunci primer diwariskan dari skema tabel sumber. Masukkan kolom kunci primer sebagai daftar yang dipisahkan koma ( Penting Secara default, Anda tidak dapat menghapus batasan kunci primer dari tabel hulu. Untuk mengurangi kolom kunci primer hulu, tambahkan item konfigurasi Jika Anda mendefinisikan kolom kunci primer kustom, pastikan semua data yang diingesti mematuhi batasan kunci primer. Untuk menghindari ketidakteraturan data selama penulisan lintas partisi, sertakan kolom kunci primer tabel sumber dalam definisi kustom Anda. |
| Menentukan kolom kunci partisi untuk tabel sink. | Tidak | Jika parameter ini dikosongkan, definisi kunci partisi diwariskan dari skema tabel sumber. Masukkan kolom kunci partisi sebagai daftar yang dipisahkan koma ( Penting Saat mendefinisikan kolom kunci partisi kustom, pastikan semua data yang diingesti mematuhi batasan kunci primer. Hal ini mencegah ketidakteraturan data selama penulisan lintas partisi. |
| Menentukan item konfigurasi tambahan yang akan diteruskan ke sink. | Tidak | Properti opsional, seperti jumlah bucket atau komentar untuk sink Paimon. Gunakan koma ( Contoh:
|
| Deskripsi aturan transformasi. | Tidak | Tidak berlaku. |
| Konverter yang melakukan pemrosesan tambahan setelah transformasi. | Tidak |
Catatan penggunaan
Setelah memodifikasi pernyataan dalam modul transform, lakukan restart tanpa status untuk pekerjaan tersebut.
Umumnya, Anda tidak perlu membungkus pernyataan proyeksi dan filter dalam tanda kutip.
transform: - projection: a, b, c # Baris di atas setara dengan baris berikut. - projection: "a, b, c"Namun, jika ekspresi proyeksi diawali dengan karakter khusus (seperti
*atau'), bungkus seluruh ekspresi dalam tanda kutip tunggal (') atau ganda ("), atau escape karakter khusus tersebut dengan backslash (\):transform: - projection: *, 42 # Tidak valid - projection: '*, 42' # Valid - projection: \*, 42 # Valid
Penyaringan bidang
Modul transform menggunakan sintaksis mirip SQL untuk mendefinisikan aturan proyeksi. Anda dapat menggunakan aturan ini untuk menyinkronkan kolom tertentu, menambahkan kolom terhitung, atau mereferensikan kolom metadata.
Lakukan pemangkasan kolom
Untuk menyinkronkan hanya kolom tertentu dari tabel sumber ke hilir, tentukan kolom tersebut dalam aturan proyeksi. Kolom yang tidak disebutkan tidak akan dikirim ke hilir:
transform:
- source-table: db.tbl
projection: col_1, col_3, col_4 # col_2 akan dikecualikanPemangkasan kolom dapat mencegah pembaruan skema sumber mencapai sink. Karena pemangkasan kolom bergantung pada daftar eksplisit kolom yang akan disinkronkan, perubahan pada skema sumber (seperti penambahan atau penghapusan kolom) tidak akan secara otomatis tercermin di hilir.
Gunakan karakter wildcard
Jika Anda ingin mengirim semua kolom dari tabel sumber beserta kolom baru apa pun yang ditambahkan nanti ke hilir secara langsung, gunakan karakter wildcard asterisk (*) dalam aturan proyeksi.
Jika aturan proyeksi tidak menggunakan wildcard (*), skema hasilnya bersifat statis. Skema tersebut tetap mengacu pada versi yang didefinisikan dalam aturan, dan perubahan skema tidak akan diteruskan dengan benar.
Sebagai contoh, *, 'extras' AS extras menambahkan kolom tambahan di akhir skema sumber dan terus-menerus meneruskan perubahan skema dari sumber ke sink.
transform:
- source-table: db.tbl
projection: \*, 'extras' AS extrasKolom terhitung
Anda dapat menambahkan kolom terhitung menggunakan sintaksis <Expression> AS <ColName> dalam aturan proyeksi. Ekspresi ini dievaluasi untuk setiap baris data, dan hasilnya mengisi kolom terhitung tersebut.
Ekspresi kolom terhitung tidak dapat mereferensikan kolom terhitung lainnya. Pembatasan ini berlaku terlepas dari urutan pendefinisian. Sebagai contoh, a, b AS c, c AS d tidak valid.
Sebagai contoh, saat Anda menerima catatan data [+I, id = 1] dari tabel hulu db.tbl, ubah menjadi baris data [+I, id = 1, inc_id = 2] dan kirim ke hilir.
transform:
- source-table: db.tbl
projection: id, id + 1 AS inc_idKolom Metadata
Anda dapat menggunakan kolom metadata pradefinisi berikut dalam aturan proyeksi seolah-olah merupakan kolom data biasa:
Jangan mendefinisikan kolom data biasa dengan nama yang identik dengan kolom metadata.
Semua konektor mendukung kolom metadata berikut.
Nama kolom metadata | Tipe data | Deskripsi |
| String | Nama namespace tabel sumber. |
| String | Nama skema tabel sumber. |
| String | Nama tabel sumber. |
| String | Jenis operasi event perubahan data ( Penting Karena event CDC selalu menggabungkan status Update Before dan Update After dari satu pembaruan menjadi satu event, bidang |
Contoh: Tambahkan kolom terhitung yang menyimpan nama lengkap tabel sumber.
transform:
- source-table: \.*.\.*
projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifierPadanan namespace, skema, dan tabel di berbagai sistem database:
Jenis Database | Nama namespace | Nama Skema | Nama Tabel |
MySQL | - | Database | Tabel |
Kafka | - | - | Topik |
SLS | - | Proyek | LogStore |
MongoDB | - | Database | Koleksi |
Paimon | - | Database | Tabel |
Hologres | - | Skema | Tabel |
StarRocks | - | Database | Tabel |
Doris | - | Database | Tabel |
Postgres | Database Catatan Ini hanya berlaku ketika parameter | Skema | Tabel |
Penyaringan data
Modul transform menggunakan sintaksis mirip SQL untuk mendefinisikan aturan penyaringan data.
Aturan filter adalah ekspresi yang dievaluasi menjadi Nilai Boolean. Ekspresi ini dapat mereferensikan kolom tabel sumber atau kolom terhitung apa pun.
Event perubahan data yang ekspresi filternya dievaluasi sebagai FALSE tidak akan dikirim ke hilir.
Jika Anda menimpa kolom tabel sumber dengan kolom terhitung dalam aturan proyeksi, referensi apa pun ke kolom tersebut dalam ekspresi filter mengacu pada nilai kolom terhitung.
Berikut adalah contoh aturan Transform:
transform:
- source-table: db.tbl
projection: CAST(id AS VARCHAR) AS id
filter: CHAR_LENGTH(id) > 5id yang direferensikan dalam ekspresi filter adalah kolom terhitung, yang telah di-cast ke tipe VARCHAR.
Aturan konfigurasi lanjutan
Referensikan kolom yang tidak dipangkas dan metadata
Contoh 1: Filter berdasarkan kolom yang dipangkas
Tabel sumber memiliki struktur [INT a, INT b, BOOLEAN c]. Untuk menghasilkan kolom a dan b, tetapi hanya menyimpan baris di mana c bernilai true, gunakan konfigurasi berikut:
transform:
- source-table: ...
projection: a, b
filter: cContoh 2: Filter berdasarkan kolom metadata
Anda dapat menggunakan kolom metadata langsung sebagai kondisi filter tanpa harus mendefinisikannya secara eksplisit dalam projection. Misalnya, kecualikan event perubahan bertipe DELETE.
transform:
- source-table: db.tbl
projection: a, b
filter: __data_event_type__ = '+I'Timpa kolom yang sudah ada
Definisikan bidang dalam projection dengan nama yang sama seperti kolom hulu untuk menimpa nilai atau tipenya. Hal ini memastikan evolusi skema berfungsi dengan benar.
Contoh: Terapkan konversi tipe
Tabel sumber memiliki struktur [INT a, INT b, BOOLEAN c]. Untuk mempertahankan nama kolom tetapi memaksa konversi kolom a ke tipe string, gunakan konfigurasi berikut:
transform:
- source-table: db.tbl
projection: \*, CAST(a AS STRING)Struktur tabel sink menjadi [STRING a, INT b, BOOLEAN c]. Kolom a asli digantikan oleh definisi tipe baru.
Gunakan kembali kolom terhitung dalam kondisi filter
Aturan filter dapat langsung mereferensikan alias kolom terhitung yang didefinisikan dalam proyeksi.
Contoh: Referensikan hasil terhitung
Definisikan kolom baru d dalam projection, dan gunakan langsung dalam filter:
transform:
- source-table: db.tbl
projection: a, b, c, a + b + c AS d
filter: d > 100Konfigurasi ini menghasilkan output yang sama seperti berikut, tetapi lebih mudah dibaca:
transform:
- source-table: ...
projection: a, b, c, a + b + c AS d
filter: a + b + c > 100Konverter Pasca-Transformasi
Gunakan parameter converter-after-transform untuk memproses perubahan data setelah semua aturan transformasi diterapkan. Tentukan beberapa konverter dalam daftar yang dipisahkan koma (,). Flink menerapkan konverter secara berurutan sesuai definisi Anda. Nilai yang valid:
Nama konverter | Fitur | Versi yang didukung |
SOFT_DELETE | Mengonversi operasi penghapusan menjadi penyisipan. | Ververica Runtime (VVR) 8.0.11 dan versi lebih baru |
FIELD_NAME_LOWER_CASE | Mengonversi semua nama kolom tabel menjadi huruf kecil. | VVR 11.1 dan versi lebih baru |
Lakukan soft delete
Untuk melakukan penghapusan logis, gabungkan kolom metadata __data_event_type__ dengan konverter SOFT_DELETE. Sebagai contoh, konfigurasi transform berikut menerapkan penghapusan logis: data yang dihapus tidak dihapus secara fisik dari sink. Sebaliknya, Flink mengonversi operasi penghapusan menjadi operasi penyisipan dan menandainya dengan nilai op_type -D untuk menunjukkan penghapusan.
transform:
- source-table: db.tbl
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE