全部产品
Search
文档中心

Realtime Compute for Apache Flink:Mengembangkan pekerjaan ingesti data Flink CDC (Pratinjau Publik)

更新时间:Nov 10, 2025

Realtime Compute for Apache Flink menggunakan Flink CDC untuk mengingesti data. Anda dapat mengembangkan pekerjaan YAML guna menyinkronkan data dari sumber ke sink. Topik ini menjelaskan cara mengembangkan pekerjaan ingesti data Flink CDC.

Informasi latar belakang

Ingesti data Flink CDC memanfaatkan Flink CDC untuk integrasi data. Anda dapat menggunakan konfigurasi YAML untuk mendefinisikan proses ekstrak, transformasi, dan muat (ETL) yang kompleks, yang secara otomatis dikonversi menjadi logika komputasi Flink. Fitur ini mendukung sinkronisasi basis data penuh, sinkronisasi tabel tunggal, sinkronisasi sharding, sinkronisasi tabel baru, evolusi skema, serta sinkronisasi kolom terhitung kustom. Selain itu, fitur ini juga mendukung pemrosesan ETL, penyaringan data dengan klausa WHERE, pemangkasan kolom, dan kolom terhitung. Hal ini menyederhanakan proses integrasi data sekaligus meningkatkan efisiensi dan keandalannya.

Keunggulan Flink CDC

Di Realtime Compute for Apache Flink, Anda dapat mengembangkan pekerjaan ingesti data Flink CDC, pekerjaan SQL, atau pekerjaan DataStream sendiri untuk menyinkronkan data. Bagian berikut menjelaskan keunggulan pekerjaan ingesti data Flink CDC dibandingkan dua metode pengembangan lainnya.

Flink CDC vs. Flink SQL

Pekerjaan ingesti data Flink CDC dan pekerjaan SQL menggunakan tipe data yang berbeda untuk transmisi data:

  • Pekerjaan SQL mentransmisikan RowData, sedangkan pekerjaan Flink CDC mentransmisikan DataChangeEvent dan SchemaChangeEvent. Setiap RowData dalam pekerjaan SQL memiliki tipe perubahan tersendiri, yaitu insert (+I), update before (-U), update after (+U), dan delete (-D).

  • Flink CDC menggunakan SchemaChangeEvent untuk mentransmisikan informasi perubahan skema, seperti membuat tabel, menambah kolom, atau membersihkan tabel. DataChangeEvent digunakan untuk mentransmisikan perubahan data, seperti operasi insert, update, dan delete. Pesan update mencakup konten update before dan update after, sehingga memungkinkan Anda menulis data perubahan asli ke sink.

Tabel berikut merangkum keunggulan pekerjaan ingesti data Flink CDC dibandingkan pekerjaan SQL.

Ingesti data Flink CDC

Flink SQL

Mendeteksi skema secara otomatis dan mendukung sinkronisasi basis data penuh.

Memerlukan penulisan manual pernyataan CREATE TABLE dan INSERT.

Mendukung berbagai kebijakan evolusi skema.

Tidak mendukung evolusi skema.

Mendukung sinkronisasi changelog asli.

Mengganggu struktur changelog asli.

Mendukung pembacaan dari dan penulisan ke beberapa tabel.

Hanya membaca dari dan menulis ke satu tabel.

Dibandingkan dengan pernyataan CTAS atau CDAS, pekerjaan Flink CDC lebih kuat dan mendukung fitur-fitur berikut:

  • Evolusi skema pada tabel leluhur disinkronkan segera tanpa perlu menunggu penulisan data baru untuk memicu sinkronisasi.

  • Mendukung sinkronisasi changelog asli tanpa memisahkan pesan update.

  • Menyinkronkan lebih banyak jenis perubahan skema, seperti TRUNCATE TABLE dan DROP TABLE.

  • Mendukung pemetaan tabel untuk menentukan nama tabel sink secara fleksibel.

  • Mendukung perilaku evolusi skema yang fleksibel dan dapat dikonfigurasi.

  • Mendukung penyaringan data dengan klausa WHERE.

  • Mendukung pemangkasan bidang.

Flink CDC vs. Flink DataStream

Tabel berikut merangkum keunggulan pekerjaan ingesti data Flink CDC dibandingkan pekerjaan DataStream.

Ingesti data Flink CDC

Flink DataStream

Dirancang untuk pengguna dari semua tingkat keahlian, bukan hanya ahli.

Memerlukan keakraban dengan Java dan sistem terdistribusi.

Menyembunyikan detail implementasi dasar untuk menyederhanakan pengembangan.

Memerlukan keakraban dengan kerangka kerja Flink.

Format YAML mudah dipahami dan dipelajari.

Memerlukan pengetahuan tentang alat seperti Maven untuk mengelola dependensi.

Pekerjaan yang sudah ada mudah digunakan kembali.

Kode yang sudah ada sulit digunakan kembali.

Batasan

  • Anda dapat menggunakan Ververica Runtime (VVR) 11.1 untuk mengembangkan pekerjaan ingesti data Flink CDC. Untuk menggunakan VVR 8.x, Anda harus menggunakan VVR 8.0.11.

  • Hanya satu sumber dan satu sink yang didukung. Untuk membaca dari beberapa sumber data atau menulis ke beberapa sink, Anda harus membuat beberapa pekerjaan Flink CDC.

  • Anda tidak dapat menerapkan pekerjaan Flink CDC ke kluster sesi.

  • Penyetelan otomatis tidak didukung untuk pekerjaan Flink CDC.

Konektor ingesti data Flink CDC

Tabel berikut mencantumkan konektor yang didukung sebagai sumber dan sink untuk ingesti data Flink CDC.

Catatan

Anda dapat memberikan masukan mengenai penyimpanan hulu dan hilir yang Anda minati melalui saluran seperti Tiket atau DingTalk. Kami berencana untuk mendukung lebih banyak opsi penyimpanan ini di masa depan guna memenuhi kebutuhan Anda dengan lebih baik.

Konektor yang didukung

Konektor

Tipe yang didukung

Sumber

Sink

MySQL

Catatan

Terhubung ke ApsaraDB RDS for MySQL, PolarDB for MySQL, dan MySQL yang dikelola sendiri.

×

Paimon

×

Kafka

Catatan

Memerlukan Ververica Runtime (VVR) 8.0.10 atau versi lebih baru.

Upsert Kafka

×

StarRocks

×

Hologres

×

SLS

Catatan

Memerlukan VVR 11.1 atau versi lebih baru.

×

MongoDB

Catatan

Memerlukan VVR 11.2 atau versi lebih baru.

×

MaxCompute

×

Catatan

Memerlukan VVR 11.1 atau versi lebih baru.

SelectDB

×

Catatan

Memerlukan VVR 11.1 atau versi lebih baru.

Print

×

Membuat pekerjaan ingesti data Flink CDC

Hasilkan pekerjaan dari templat pekerjaan sinkronisasi

  1. Masuk ke Konsol Realtime Compute for Apache Flink.

  2. Pada kolom Actions ruang kerja target, klik Console.

  3. Di panel navigasi sebelah kiri, pilih Data Development > Data Ingestion.

  4. Klik image, lalu klik Create From Template.

  5. Pilih templat sinkronisasi data.

    Saat ini hanya templat MySQL ke StarRocks, MySQL ke Paimon, dan MySQL ke Hologres yang didukung.

    image

  6. Masukkan pengaturan pekerjaan, seperti Job Name, Storage Location, dan Engine Version, lalu klik OK.

  7. Konfigurasikan informasi sumber dan sink untuk pekerjaan Flink CDC.

    Untuk informasi selengkapnya mengenai parameter, lihat dokumentasi untuk konektor yang sesuai.

Hasilkan pekerjaan dari pekerjaan CTAS/CDAS

Penting
  • Jika suatu pekerjaan berisi beberapa pernyataan CXAS, Flink hanya mendeteksi dan mengonversi pernyataan pertama.

  • Karena fungsi bawaan yang didukung oleh Flink SQL dan Flink CDC berbeda, aturan transformasi yang dihasilkan mungkin belum siap digunakan. Anda harus memeriksa dan menyesuaikan aturan tersebut secara manual.

  • Jika sumbernya adalah MySQL dan pekerjaan CTAS/CDAS asli sedang berjalan, Anda harus menyesuaikan server ID sumber dalam pekerjaan ingesti data Flink CDC untuk menghindari konflik dengan pekerjaan asli.

  1. Masuk ke Konsol Realtime Compute for Apache Flink.

  2. Klik Console di kolom Actions ruang kerja target.

  3. Di panel navigasi sebelah kiri, pilih Data Development > Data Ingestion.

  4. Klik image, lalu klik Generate From Existing CTAS/CDAS Job. Pilih pekerjaan CTAS atau CDAS target dan klik OK.

    Di halaman pemilihan, sistem hanya menampilkan pekerjaan CTAS dan CDAS yang valid. Pekerjaan ETL biasa dan draf pekerjaan dengan kesalahan sintaks tidak ditampilkan.

  5. Masukkan informasi pekerjaan, seperti Job Name, Storage Location, dan Engine Version, lalu klik OK.

Migrasikan pekerjaan dari komunitas open source

  1. Masuk ke Konsol Realtime Compute for Apache Flink.

  2. Untuk ruang kerja target, klik Console di kolom Actions.

  3. Di panel navigasi sebelah kiri, buka Data Development > Data Ingestion.

  4. Klik image, pilih Create Data Ingestion Draft, masukkan File Name dan Engine Version, lalu klik Create.

  5. Salin pekerjaan Flink CDC dari komunitas open source.

  6. (Opsional) Klik Deep Check.

    Anda dapat memeriksa sintaks, konektivitas jaringan, dan izin akses.

Buat pekerjaan ingesti data Flink CDC dari awal

  1. Masuk ke Konsol Realtime Compute for Apache Flink.

  2. Pada kolom Actions untuk ruang kerja target, klik Console.

  3. Di panel navigasi sebelah kiri, pilih Data Development > Data Ingestion.

  4. Klik image dan pilih Create Data Ingestion Draft. Masukkan File Name dan Engine Version, lalu klik Create.

  5. Konfigurasikan pekerjaan Flink CDC.

    # Wajib
    source:
      # Tipe sumber data
      type: <Ganti dengan tipe konektor sumber Anda>
      # Konfigurasi sumber data. Untuk informasi selengkapnya mengenai item konfigurasi, lihat dokumentasi untuk konektor yang sesuai.
      ...
    
    # Wajib
    sink:
      # Tipe sink
      type: <Ganti dengan tipe konektor sink Anda>
      # Konfigurasi sink. Untuk informasi selengkapnya mengenai item konfigurasi, lihat dokumentasi untuk konektor yang sesuai.
      ...
    
    # Opsional
    transform:
      # Aturan transformasi untuk tabel flink_test.customers
      - source-table: flink_test.customers
        # Konfigurasi proyeksi. Menentukan kolom yang akan disinkronkan dan melakukan transformasi data.
        projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name
        # Kondisi filter. Hanya menyinkronkan data di mana id lebih besar dari 10.
        filter: id > 10
        # Deskripsi yang digunakan untuk menjelaskan aturan transformasi
        description: tambahkan kolom terhitung berdasarkan tabel sumber
    
    # Opsional
    route:
      # Aturan routing. Menentukan pemetaan antara tabel sumber dan tabel sink.
      - source-table: flink_test.customers
        sink-table: db.customers_o
        # Deskripsi yang digunakan untuk menjelaskan aturan routing
        description: sinkronisasi tabel customers
      - source-table: flink_test.customers_suffix
        sink-table: db.customers_s
        # Deskripsi yang digunakan untuk menjelaskan aturan routing
        description: sinkronisasi tabel customers_suffix
    
    #Opsional
    pipeline:
      # Nama pekerjaan
      name: MySQL to Hologres Pipeline
    Catatan

    Dalam pekerjaan Flink CDC, kunci dan nilai harus dipisahkan oleh spasi. Formatnya adalah Kunci: Nilai.

    Tabel berikut menjelaskan blok kode tersebut.

    Wajib

    Blok kode

    Deskripsi

    Wajib

    source

    Awal dari pipa data. Flink CDC menangkap data perubahan dari sumber data.

    Catatan
    • Saat ini, hanya MySQL yang didukung sebagai sumber data. Untuk informasi selengkapnya mengenai item konfigurasi, lihat MySQL.

    • Anda dapat menggunakan variabel untuk mengatur informasi sensitif. Untuk informasi selengkapnya, lihat Manajemen Variabel.

    sink

    Akhir dari pipa data. Flink CDC mentransmisikan perubahan data yang ditangkap ke sistem sink ini.

    Catatan
    • Untuk informasi mengenai sistem sink yang saat ini didukung, lihat Konektor ingesti data Flink CDC. Untuk informasi selengkapnya mengenai item konfigurasi sink, lihat dokumentasi untuk konektor yang sesuai.

    • Anda dapat menggunakan variabel untuk mengatur informasi sensitif. Untuk informasi selengkapnya, lihat Manajemen Variabel.

    Opsional

    pipeline

    pipeline

    Menentukan konfigurasi dasar untuk seluruh pekerjaan pipa data, seperti nama pipeline.

    transform

    Menentukan aturan transformasi data. Transformasi adalah proses operasi pada data yang mengalir melalui pipeline Flink. Mendukung pemrosesan ETL, penyaringan klausa WHERE, pemangkasan kolom, dan kolom terhitung.

    Ketika data perubahan asli yang ditangkap oleh Flink CDC perlu ditransformasi agar sesuai dengan sistem hilir tertentu, Anda dapat menggunakan blok transform.

    route

    Jika blok ini tidak dikonfigurasi, berarti menunjukkan sinkronisasi basis data penuh atau tabel target.

    Dalam beberapa kasus, data perubahan yang ditangkap mungkin perlu dikirim ke tujuan berbeda berdasarkan aturan tertentu. Mekanisme routing memungkinkan Anda secara fleksibel menentukan pemetaan antara sistem hulu dan hilir untuk mengirim data ke sink yang berbeda.

    Untuk informasi selengkapnya mengenai sintaks dan item konfigurasi setiap blok, lihat Referensi pengembangan pekerjaan ingesti data Flink CDC.

    Kode berikut memberikan contoh cara menyinkronkan semua tabel dari basis data app_db di MySQL ke basis data di Hologres.

    source:
      type: mysql
      hostname: <hostname>
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: app_db.\.*
      server-id: 5400-5404
    
    sink:
      type: hologres
      name: Hologres Sink
      endpoint: <endpoint>
      dbname: <database-name>
      username: ${secret_values.holousername}
      password: ${secret_values.holopassword}
    
    pipeline:
      name: Sync MySQL Database to Hologres
  6. (Opsional) Anda dapat mengklik Deep Check.

    Anda dapat memeriksa sintaks, konektivitas jaringan, dan izin akses.

Referensi

  • Setelah mengembangkan pekerjaan Flink CDC, Anda harus menerapkan dan mempublikasikannya. Untuk informasi selengkapnya, lihat Deploy a job.

  • Untuk membangun pekerjaan Flink CDC yang menyinkronkan data dari basis data MySQL ke StarRocks secara cepat, lihat Quick start for Flink CDC data ingestion jobs.