全部产品
Search
文档中心

Realtime Compute for Apache Flink:Kembangkan pekerjaan ingesti data Flink CDC (pratinjau publik)

更新时间:Jan 27, 2026

Realtime Compute for Apache Flink menggunakan Flink CDC untuk membantu Anda mengembangkan pekerjaan YAML yang menyinkronkan data dari sumber ke tujuan. Topik ini menjelaskan cara mengembangkan pekerjaan ingesti data Flink CDC.

Informasi latar belakang

Ingesti data Flink CDC memanfaatkan Flink CDC untuk integrasi data. Dengan konfigurasi YAML, Anda dapat dengan mudah mendefinisikan pipa ekstrak, transformasi, dan muat (ETL) yang kompleks, yang secara otomatis dikonversi menjadi logika komputasi Flink. Fitur ini mendukung secara efisien sinkronisasi database penuh, sinkronisasi tabel tunggal, sinkronisasi database dan tabel terpartisi, sinkronisasi tabel yang baru ditambahkan, evolusi skema, serta sinkronisasi kolom terhitung kustom. Selain itu, fitur ini juga mendukung pemrosesan ETL, penyaringan data berbasis klausa WHERE, dan pemangkasan kolom, sehingga sangat menyederhanakan proses integrasi data serta meningkatkan efisiensi dan keandalannya.

Keunggulan Flink CDC

Di Realtime Compute for Apache Flink, Anda dapat mengembangkan pekerjaan ingesti data Flink CDC, pekerjaan SQL, atau pekerjaan DataStream untuk menyinkronkan data. Bagian berikut menjelaskan keunggulan pekerjaan ingesti data Flink CDC dibandingkan dua jenis pekerjaan lainnya.

Flink CDC vs. Flink SQL

Pekerjaan ingesti data Flink CDC dan pekerjaan SQL menggunakan tipe data yang berbeda untuk mentransfer data:

  • Pekerjaan SQL mentransfer RowData, sedangkan pekerjaan Flink CDC mentransfer DataChangeEvent dan SchemaChangeEvent. Setiap RowData dalam pekerjaan SQL memiliki tipe perubahan sendiri, yaitu insert (+I), update_before (-U), update_after (+U), dan delete (-D).

  • Flink CDC menggunakan SchemaChangeEvent untuk mentransfer informasi perubahan skema, seperti pembuatan tabel, penambahan kolom, dan pemotongan tabel. DataChangeEvent digunakan untuk mentransfer perubahan data, seperti insert, update, dan delete. Pesan update berisi konten sebelum dan sesudah update, sehingga memungkinkan Anda menulis data perubahan mentah ke tujuan.

Tabel berikut membandingkan keunggulan pekerjaan ingesti data Flink CDC dengan pekerjaan SQL.

Flink CDC data ingestion

Flink SQL

Mendeteksi skema secara otomatis dan mendukung sinkronisasi database penuh.

Mengharuskan Anda menulis pernyataan CREATE TABLE dan INSERT secara manual.

Mendukung berbagai kebijakan evolusi skema.

Tidak mendukung evolusi skema.

Mendukung sinkronisasi changelog mentah.

Mengubah struktur changelog mentah.

Mendukung pembacaan dan penulisan data ke beberapa tabel.

Membaca dan menulis data hanya ke satu tabel.

Dibandingkan dengan pernyataan CTAS atau CDAS, pekerjaan Flink CDC menyediakan fitur yang lebih kuat, termasuk:

  • Segera menyinkronkan evolusi skema di tabel leluhur tanpa menunggu data baru ditulis.

  • Mendukung sinkronisasi changelog mentah tanpa memisahkan pesan update.

  • Menyinkronkan lebih banyak jenis evolusi skema, seperti TRUNCATE TABLE dan DROP TABLE.

  • Mendukung pemetaan tabel untuk mendefinisikan nama tabel tujuan secara fleksibel.

  • Mendukung perilaku evolusi skema yang fleksibel dan dapat dikonfigurasi.

  • Mendukung penyaringan data menggunakan klausa WHERE.

  • Mendukung pemangkasan kolom.

Flink CDC vs. Flink DataStream

Tabel berikut membandingkan keunggulan pekerjaan ingesti data Flink CDC dengan pekerjaan DataStream.

Flink CDC data ingestion

Flink DataStream

Dirancang untuk pengguna di semua tingkat, bukan hanya ahli.

Memerlukan keakraban dengan Java dan sistem terdistribusi.

Menyembunyikan detail dasar untuk menyederhanakan pengembangan.

Memerlukan keakraban dengan kerangka kerja Flink.

Menggunakan format YAML yang mudah dipahami dan dipelajari.

Memerlukan pengetahuan tentang alat seperti Maven untuk mengelola dependensi.

Memungkinkan penggunaan ulang pekerjaan yang sudah ada dengan mudah.

Menyulitkan penggunaan ulang kode yang sudah ada.

Batasan

  • Anda harus menggunakan Ververica Runtime (VVR) 11.1 untuk mengembangkan pekerjaan ingesti data Flink CDC. Untuk VVR 8.x, gunakan VVR 8.0.11.

  • Anda hanya dapat menyinkronkan data dari satu sumber ke satu tujuan. Untuk membaca data dari beberapa sumber atau menulis ke beberapa tujuan, Anda harus membuat beberapa pekerjaan Flink CDC.

  • Anda tidak dapat men-deploy pekerjaan Flink CDC ke session cluster.

  • Penyetelan otomatis tidak didukung untuk pekerjaan Flink CDC.

Konektor ingesti data Flink CDC

Tabel berikut mencantumkan konektor yang didukung sebagai sumber dan tujuan untuk pekerjaan ingesti data Flink CDC.

Catatan

Anda dapat memberikan masukan mengenai penyimpanan hulu dan hilir yang Anda minati melalui saluran seperti ticket atau DingTalk. Kami berencana mendukung lebih banyak opsi penyimpanan di masa depan untuk memenuhi kebutuhan Anda dengan lebih baik.

Konektor yang didukung

Konektor

Tipe yang didukung

Source

Sink

MySQL

Catatan

Terhubung ke ApsaraDB RDS for MySQL, PolarDB for MySQL, dan MySQL yang dikelola sendiri.

×

Streaming Lakehouse Paimon

×

Message Queue for Kafka

Catatan

Memerlukan Ververica Runtime (VVR) 8.0.10 atau versi yang lebih baru.

Upsert Kafka

×

StarRocks

×

Hologres

×

Simple Log Service (SLS)

Catatan

Memerlukan Ververica Runtime (VVR) 11.1 atau versi yang lebih baru.

×

MongoDB

Catatan

Memerlukan Ververica Runtime (VVR) 11.2 atau versi yang lebih baru.

×

MaxCompute

×

Catatan

Memerlukan Ververica Runtime (VVR) 11.1 atau versi yang lebih baru.

SelectDB

×

Catatan

Memerlukan Ververica Runtime (VVR) 11.1 atau versi yang lebih baru.

Postgres CDC (public preview)

Catatan

Memerlukan Ververica Runtime (VVR) 11.4 atau versi yang lebih baru.

×

Print

×

Gunakan kembali informasi koneksi dari Catalog yang sudah ada

Mulai dari Ververica Runtime (VVR) 11.5, Anda dapat menggunakan kembali informasi koneksi dari Catalog yang sudah ada dalam pekerjaan ingesti data Flink CDC Anda. Anda dapat mereferensikan Catalog bawaan yang dibuat pada halaman Data Management untuk mengambil properti koneksi secara otomatis, seperti URL, username, dan password. Hal ini mengurangi upaya konfigurasi manual.

Sintaks
source:
  type: mysql
  using.built-in-catalog: mysql_rds_catalog
  
sink:
  type: paimon
  using.built-in-catalog: paimon_dlf_catalog

Dalam modul source dan sink, gunakan sintaks using.built-in-catalog untuk mereferensikan Catalog bawaan Anda.

Sebagai contoh, dalam kode di atas, metadata untuk Catalog mysql_rds_catalog sudah mencakup parameter yang diperlukan seperti hostname, username, dan password. Anda tidak perlu menentukan parameter-parameter ini lagi dalam pekerjaan YAML.

Batasan

Konektor berikut mendukung penggunaan kembali informasi koneksi dari Catalog:

  • MySQL (source)

  • Kafka (source)

  • Upsert Kafka (sink)

  • StarRocks (sink)

  • Hologres (sink)

  • Paimon (sink)

  • SLS (source)

Catatan

Parameter Catalog yang tidak kompatibel dengan sintaks YAML CDC tidak akan berlaku. Untuk informasi selengkapnya, lihat daftar parameter untuk setiap konektor.

Buat pekerjaan ingesti data Flink CDC

Hasilkan dari templat pekerjaan sinkronisasi

  1. Masuk ke Konsol Realtime Compute for Apache Flink.

  2. Klik Console di kolom Actions ruang kerja target.

  3. Di panel navigasi sebelah kiri, pilih Development > Data Ingestion.

  4. Klik image, lalu klik Create From Template.

  5. Pilih templat sinkronisasi data.

    Saat ini, hanya templat MySQL ke StarRocks, MySQL ke Paimon, dan MySQL ke Hologres yang didukung.

    image

  6. Masukkan nama pekerjaan, lokasi penyimpanan, dan versi engine, lalu klik OK.

  7. Konfigurasikan informasi source dan sink untuk pekerjaan Flink CDC.

    Untuk informasi selengkapnya tentang pengaturan parameter, lihat dokumentasi untuk konektor yang sesuai.

Hasilkan dari pekerjaan CTAS/CDAS

Penting
  • Jika suatu pekerjaan berisi beberapa pernyataan CXAS, Flink hanya mendeteksi dan mengubah pernyataan pertama.

  • Karena perbedaan dukungan fungsi bawaan antara Flink SQL dan Flink CDC, aturan transformasi yang dihasilkan mungkin belum siap digunakan. Anda harus memverifikasi dan menyesuaikan aturan tersebut secara manual.

  • Jika sumbernya adalah MySQL dan pekerjaan CTAS/CDAS asli sedang berjalan, Anda harus menyesuaikan server ID sumber untuk pekerjaan ingesti data Flink CDC agar tidak terjadi konflik dengan pekerjaan asli.

  1. Masuk ke Konsol Realtime Compute for Apache Flink.

  2. Klik Console di kolom Actions ruang kerja target.

  3. Di panel navigasi sebelah kiri, pilih Data Studio > Data Ingestion.

  4. Klik image, klik New Draft From CTAS/CDAS, pilih pekerjaan CTAS atau CDAS target, lalu klik OK.

    Di halaman pemilihan, sistem hanya menampilkan pekerjaan CTAS dan CDAS yang valid. Pekerjaan ETL biasa atau draf pekerjaan yang memiliki kesalahan sintaks tidak ditampilkan.

  5. Masukkan nama pekerjaan, lokasi penyimpanan, dan versi engine, lalu klik OK.

Migrasi Pekerjaan dari Komunitas Open Source

  1. Masuk ke Konsol Realtime Compute for Apache Flink.

  2. Klik Console di kolom Actions ruang kerja target.

  3. Di panel navigasi sebelah kiri, pilih Development > Data Ingestion.

  4. Klik image dan pilih New Data Ingestion Draft. Atur File Name dan Engine Version, lalu klik Create.

  5. Salin pekerjaan Flink CDC dari komunitas open source.

  6. (Opsional) Klik Depth Check.

    Anda dapat memeriksa sintaks, konektivitas jaringan, dan izin akses.

Buat pekerjaan ingesti data Flink CDC dari awal

  1. Masuk ke Konsol Realtime Compute for Apache Flink.

  2. Klik Console di kolom Actions ruang kerja target.

  3. Di panel navigasi sebelah kiri, pilih Data Development > Data Ingestion.

  4. Klik image, pilih New Data Ingestion Draft, masukkan File Name dan Engine Version, lalu klik Create.

  5. Konfigurasikan pekerjaan Flink CDC.

    # Required
    source:
      # Tipe sumber data
      type: <Ganti dengan tipe konektor sumber Anda>
      # Konfigurasi sumber data. Untuk informasi selengkapnya tentang item konfigurasi, lihat dokumentasi untuk konektor yang sesuai.
      ...
    
    # Required
    sink:
      # Tipe tujuan
      type: <Ganti dengan tipe konektor tujuan Anda>
      # Konfigurasi tujuan. Untuk informasi selengkapnya tentang item konfigurasi, lihat dokumentasi untuk konektor yang sesuai.
      ...
    
    # Optional
    transform:
      # Aturan transformasi untuk tabel flink_test.customers
      - source-table: flink_test.customers
        # Konfigurasi proyeksi. Menentukan kolom yang akan disinkronkan dan melakukan transformasi data.
        projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name
        # Kondisi filter. Hanya menyinkronkan data dengan id lebih besar dari 10.
        filter: id > 10
        # Deskripsi aturan transformasi
        description: tambahkan kolom terhitung berdasarkan tabel sumber
    
    # Optional
    route:
      # Aturan routing yang menentukan pemetaan antara tabel sumber dan tujuan.
      - source-table: flink_test.customers
        sink-table: db.customers_o
        # Deskripsi aturan routing
        description: sinkronisasi tabel customers
      - source-table: flink_test.customers_suffix
        sink-table: db.customers_s
        # Deskripsi aturan routing
        description: sinkronisasi tabel customers_suffix
    
    #Optional
    pipeline:
      # Nama pekerjaan
      name: MySQL to Hologres Pipeline
    Catatan

    Dalam pekerjaan Flink CDC, kunci dan nilai harus dipisahkan oleh spasi. Formatnya adalah Key: Value.

    Blok kode dijelaskan sebagai berikut.

    Wajib

    Modul kode

    Deskripsi

    Required

    source (data source)

    Awal dari pipa data. Flink CDC menangkap data perubahan dari sumber data.

    Catatan
    • Saat ini, hanya MySQL yang didukung sebagai sumber data. Untuk informasi selengkapnya mengenai item konfigurasi, lihat MySQL.

    • Anda dapat menggunakan variabel untuk mengatur informasi sensitif. Untuk informasi selengkapnya, lihat Variable Management.

    sink (destination)

    Akhir dari pipa data. Flink CDC mengirimkan perubahan data yang ditangkap ke sistem tujuan ini.

    Catatan
    • Untuk informasi tentang sistem tujuan yang didukung, lihat Konektor ingesti data Flink CDC. Untuk informasi selengkapnya tentang item konfigurasi tujuan, lihat dokumentasi untuk konektor yang sesuai.

    • Anda dapat menggunakan variabel untuk mengatur informasi sensitif. Untuk informasi selengkapnya, lihat Variable Management.

    Optional

    pipeline

    (data pipeline)

    Menentukan konfigurasi dasar untuk seluruh pekerjaan pipa data, seperti nama pipa.

    transform (data transformation)

    Menentukan aturan transformasi data. Transformasi adalah operasi pada data yang mengalir melalui pipa Flink. Modul ini mendukung pemrosesan ETL, penyaringan klausa WHERE, pemangkasan kolom, dan kolom terhitung.

    Anda dapat menggunakan transform ketika data perubahan mentah yang ditangkap oleh Flink CDC perlu ditransformasi agar sesuai dengan sistem hilir tertentu.

    route (routing)

    Jika modul ini tidak dikonfigurasi, berarti menunjukkan sinkronisasi database penuh atau tabel target.

    Dalam beberapa kasus, data perubahan yang ditangkap perlu dikirim ke tujuan berbeda berdasarkan aturan tertentu. Mekanisme routing memungkinkan Anda menentukan pemetaan antara sistem hulu dan hilir secara fleksibel serta mengirim data ke tujuan data yang berbeda.

    Untuk informasi selengkapnya tentang struktur sintaks dan item konfigurasi setiap modul, lihat Referensi Pengembangan untuk pekerjaan ingesti data Flink CDC.

    Kode berikut memberikan contoh cara menyinkronkan semua tabel dari database app_db di MySQL ke database di Hologres.

    source:
      type: mysql
      hostname: <hostname>
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: app_db.\.*
      server-id: 5400-5404
      # (Optional) Sinkronkan data dari tabel yang baru dibuat pada fase inkremental.
      scan.binlog.newly-added-table.enabled: true
      # (Optional) Sinkronkan komentar tabel dan bidang.
      include-comments.enabled: true
      # (Optional) Utamakan pengiriman chunk tak terbatas untuk mencegah potensi masalah OutOfMemory pada TaskManager.
      scan.incremental.snapshot.unbounded-chunk-first.enabled: true
      # (Optional) Aktifkan filter parsing untuk mempercepat pembacaan.
      scan.only.deserialize.captured.tables.changelog.enabled: true
    
    sink:
      type: hologres
      name: Hologres Sink
      endpoint: <endpoint>
      dbname: <database-name>
      username: ${secret_values.holousername}
      password: ${secret_values.holopassword}
    
    pipeline:
      name: Sync MySQL Database to Hologres
  6. (Opsional) Klik Depth Check.

    Anda dapat memeriksa sintaks, konektivitas jaringan, dan izin akses.

Referensi