All Products
Search
Document Center

Realtime Compute for Apache Flink:Transform

Last Updated:Feb 27, 2026

Topik ini menjelaskan sintaksis dan fungsi bawaan yang didukung oleh modul transform dalam pekerjaan ingesti data Flink CDC.

Parameter Aturan Transformasi

Modul transform memungkinkan Anda menambah, menghapus, atau memodifikasi kolom serta menyaring data selama sinkronisasi. Gunakan parameter berikut untuk mendefinisikan aturan transformasi:

Parameter

Deskripsi

Wajib

Catatan

source-table

Menentukan tabel sumber yang akan ditransformasi.

Ya

Ekspresi reguler didukung.

projection

Menentukan aturan proyeksi untuk tabel sumber. Ini mendefinisikan semua kolom output setelah transformasi.

Tidak

Sintaksisnya mirip dengan pernyataan SELECT SQL.

Jika parameter ini dikosongkan, tidak ada kolom yang ditambahkan atau dihapus.

Untuk informasi lebih lanjut, lihat Definisikan aturan proyeksi. Untuk fungsi bawaan yang tersedia, lihat Fungsi bawaan Flink CDC.

filter

Menentukan aturan untuk menyaring baris data.

Tidak

Sintaksisnya mirip dengan klausa WHERE SQL.

Jika parameter ini dikosongkan, tidak ada baris yang dikecualikan.

primary-keys

Menentukan kolom kunci primer untuk tabel sink.

Tidak

Jika parameter ini dikosongkan, definisi kunci primer diwariskan dari skema tabel sumber. Masukkan kolom kunci primer sebagai daftar yang dipisahkan koma (,).

Penting

Secara default, Anda tidak dapat menghapus batasan kunci primer dari tabel hulu. Untuk mengurangi kolom kunci primer hulu, tambahkan item konfigurasi pipeline: transform.allow.trimming.pk-columns.

Jika Anda mendefinisikan kolom kunci primer kustom, pastikan semua data yang diingesti mematuhi batasan kunci primer. Untuk menghindari ketidakteraturan data selama penulisan lintas partisi, sertakan kolom kunci primer tabel sumber dalam definisi kustom Anda.

partition-keys

Menentukan kolom kunci partisi untuk tabel sink.

Tidak

Jika parameter ini dikosongkan, definisi kunci partisi diwariskan dari skema tabel sumber. Masukkan kolom kunci partisi sebagai daftar yang dipisahkan koma (,).

Penting

Saat mendefinisikan kolom kunci partisi kustom, pastikan semua data yang diingesti mematuhi batasan kunci primer. Hal ini mencegah ketidakteraturan data selama penulisan lintas partisi.

table-options

Menentukan item konfigurasi tambahan yang akan diteruskan ke sink.

Tidak

Properti opsional, seperti jumlah bucket atau komentar untuk sink Paimon.

Gunakan koma (,) untuk memisahkan item konfigurasi dan tanda sama dengan (=) untuk memisahkan kunci dan nilai.

Contoh:

key1=value1,key2=value2

description

Deskripsi aturan transformasi.

Tidak

Tidak berlaku.

converter-after-transform

Konverter yang melakukan pemrosesan tambahan setelah transformasi.

Tidak

Lihat Gunakan konverter setelah transformasi.

Catatan penggunaan

  • Setelah memodifikasi pernyataan dalam modul transform, lakukan restart tanpa status untuk pekerjaan tersebut.

  • Umumnya, Anda tidak perlu membungkus pernyataan proyeksi dan filter dalam tanda kutip.

    transform:
      - projection: a, b, c
        # Baris di atas setara dengan baris berikut.
      - projection: "a, b, c"

    Namun, jika ekspresi proyeksi diawali dengan karakter khusus (seperti * atau '), bungkus seluruh ekspresi dalam tanda kutip tunggal (') atau ganda ("), atau escape karakter khusus tersebut dengan backslash (\):

    transform:
      - projection: *, 42      # Tidak valid
      - projection: '*, 42'    # Valid
      - projection: \*, 42     # Valid  

Penyaringan bidang

Modul transform menggunakan sintaksis mirip SQL untuk mendefinisikan aturan proyeksi. Anda dapat menggunakan aturan ini untuk menyinkronkan kolom tertentu, menambahkan kolom terhitung, atau mereferensikan kolom metadata.

Lakukan pemangkasan kolom

Untuk menyinkronkan hanya kolom tertentu dari tabel sumber ke hilir, tentukan kolom tersebut dalam aturan proyeksi. Kolom yang tidak disebutkan tidak akan dikirim ke hilir:

transform:
  - source-table: db.tbl
    projection: col_1, col_3, col_4 # col_2 akan dikecualikan
Penting

Pemangkasan kolom dapat mencegah pembaruan skema sumber mencapai sink. Karena pemangkasan kolom bergantung pada daftar eksplisit kolom yang akan disinkronkan, perubahan pada skema sumber (seperti penambahan atau penghapusan kolom) tidak akan secara otomatis tercermin di hilir.

Gunakan karakter wildcard

Jika Anda ingin mengirim semua kolom dari tabel sumber beserta kolom baru apa pun yang ditambahkan nanti ke hilir secara langsung, gunakan karakter wildcard asterisk (*) dalam aturan proyeksi.

Catatan

Jika aturan proyeksi tidak menggunakan wildcard (*), skema hasilnya bersifat statis. Skema tersebut tetap mengacu pada versi yang didefinisikan dalam aturan, dan perubahan skema tidak akan diteruskan dengan benar.

Sebagai contoh, *, 'extras' AS extras menambahkan kolom tambahan di akhir skema sumber dan terus-menerus meneruskan perubahan skema dari sumber ke sink.

transform:
  - source-table: db.tbl
    projection: \*, 'extras' AS extras

Kolom terhitung

Anda dapat menambahkan kolom terhitung menggunakan sintaksis <Expression> AS <ColName> dalam aturan proyeksi. Ekspresi ini dievaluasi untuk setiap baris data, dan hasilnya mengisi kolom terhitung tersebut.

Catatan

Ekspresi kolom terhitung tidak dapat mereferensikan kolom terhitung lainnya. Pembatasan ini berlaku terlepas dari urutan pendefinisian. Sebagai contoh, a, b AS c, c AS d tidak valid.

Sebagai contoh, saat Anda menerima catatan data [+I, id = 1] dari tabel hulu db.tbl, ubah menjadi baris data [+I, id = 1, inc_id = 2] dan kirim ke hilir.

transform:
  - source-table: db.tbl
    projection: id, id + 1 AS inc_id

Kolom Metadata

Anda dapat menggunakan kolom metadata pradefinisi berikut dalam aturan proyeksi seolah-olah merupakan kolom data biasa:

Penting

Jangan mendefinisikan kolom data biasa dengan nama yang identik dengan kolom metadata.

Semua konektor mendukung kolom metadata berikut.

Nama kolom metadata

Tipe data

Deskripsi

__namespace_name__

String

Nama namespace tabel sumber.

__schema_name__

String

Nama skema tabel sumber.

__table_name__

String

Nama tabel sumber.

__data_event_type__

String

Jenis operasi event perubahan data (+I, -U, +U, -D).

Penting

Karena event CDC selalu menggabungkan status Update Before dan Update After dari satu pembaruan menjadi satu event, bidang __data_event_type__ berisi kedua nilai -U dan +U dalam satu event pembaruan yang sama. Jangan gunakan bidang ini sebagai kunci primer.

Contoh: Tambahkan kolom terhitung yang menyimpan nama lengkap tabel sumber.

transform:
  - source-table: \.*.\.*
    projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier

Padanan namespace, skema, dan tabel di berbagai sistem database:

Jenis Database

Nama namespace

Nama Skema

Nama Tabel

MySQL

-

Database

Tabel

Kafka

-

-

Topik

SLS

-

Proyek

LogStore

MongoDB

-

Database

Koleksi

Paimon

-

Database

Tabel

Hologres

-

Skema

Tabel

StarRocks

-

Database

Tabel

Doris

-

Database

Tabel

Postgres

Database

Catatan

Ini hanya berlaku ketika parameter table-id.include-database diaktifkan.

Skema

Tabel

Penyaringan data

Modul transform menggunakan sintaksis mirip SQL untuk mendefinisikan aturan penyaringan data.

Aturan filter adalah ekspresi yang dievaluasi menjadi Nilai Boolean. Ekspresi ini dapat mereferensikan kolom tabel sumber atau kolom terhitung apa pun.

Event perubahan data yang ekspresi filternya dievaluasi sebagai FALSE tidak akan dikirim ke hilir.

Catatan

Jika Anda menimpa kolom tabel sumber dengan kolom terhitung dalam aturan proyeksi, referensi apa pun ke kolom tersebut dalam ekspresi filter mengacu pada nilai kolom terhitung.

Berikut adalah contoh aturan Transform:

transform:
  - source-table: db.tbl
    projection: CAST(id AS VARCHAR) AS id
    filter: CHAR_LENGTH(id) > 5

id yang direferensikan dalam ekspresi filter adalah kolom terhitung, yang telah di-cast ke tipe VARCHAR.

Aturan konfigurasi lanjutan

Referensikan kolom yang tidak dipangkas dan metadata

Contoh 1: Filter berdasarkan kolom yang dipangkas

Tabel sumber memiliki struktur [INT a, INT b, BOOLEAN c]. Untuk menghasilkan kolom a dan b, tetapi hanya menyimpan baris di mana c bernilai true, gunakan konfigurasi berikut:

transform:
  - source-table: ...
    projection: a, b
    filter: c

Contoh 2: Filter berdasarkan kolom metadata

Anda dapat menggunakan kolom metadata langsung sebagai kondisi filter tanpa harus mendefinisikannya secara eksplisit dalam projection. Misalnya, kecualikan event perubahan bertipe DELETE.

transform:
  - source-table: db.tbl
    projection: a, b
    filter: __data_event_type__ = '+I'

Timpa kolom yang sudah ada

Definisikan bidang dalam projection dengan nama yang sama seperti kolom hulu untuk menimpa nilai atau tipenya. Hal ini memastikan evolusi skema berfungsi dengan benar.

Contoh: Terapkan konversi tipe

Tabel sumber memiliki struktur [INT a, INT b, BOOLEAN c]. Untuk mempertahankan nama kolom tetapi memaksa konversi kolom a ke tipe string, gunakan konfigurasi berikut:

transform:
  - source-table: db.tbl
    projection: \*, CAST(a AS STRING)

Struktur tabel sink menjadi [STRING a, INT b, BOOLEAN c]. Kolom a asli digantikan oleh definisi tipe baru.

Gunakan kembali kolom terhitung dalam kondisi filter

Aturan filter dapat langsung mereferensikan alias kolom terhitung yang didefinisikan dalam proyeksi.

Contoh: Referensikan hasil terhitung

Definisikan kolom baru d dalam projection, dan gunakan langsung dalam filter:

transform:
  - source-table: db.tbl
    projection: a, b, c, a + b + c AS d
    filter: d > 100

Konfigurasi ini menghasilkan output yang sama seperti berikut, tetapi lebih mudah dibaca:

transform:
  - source-table: ...
    projection: a, b, c, a + b + c AS d
    filter: a + b + c > 100

Konverter Pasca-Transformasi

Gunakan parameter converter-after-transform untuk memproses perubahan data setelah semua aturan transformasi diterapkan. Tentukan beberapa konverter dalam daftar yang dipisahkan koma (,). Flink menerapkan konverter secara berurutan sesuai definisi Anda. Nilai yang valid:

Nama konverter

Fitur

Versi yang didukung

SOFT_DELETE

Mengonversi operasi penghapusan menjadi penyisipan.

Ververica Runtime (VVR) 8.0.11 dan versi lebih baru

FIELD_NAME_LOWER_CASE

Mengonversi semua nama kolom tabel menjadi huruf kecil.

VVR 11.1 dan versi lebih baru

Lakukan soft delete

Untuk melakukan penghapusan logis, gabungkan kolom metadata __data_event_type__ dengan konverter SOFT_DELETE. Sebagai contoh, konfigurasi transform berikut menerapkan penghapusan logis: data yang dihapus tidak dihapus secara fisik dari sink. Sebaliknya, Flink mengonversi operasi penghapusan menjadi operasi penyisipan dan menandainya dengan nilai op_type -D untuk menunjukkan penghapusan.

transform: 
  - source-table: db.tbl
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE