All Products
Search
Document Center

Data Lake Formation:Dokumen

Last Updated:Mar 26, 2026

Flink Change Data Capture (CDC) adalah alat ingesti data yang disediakan oleh Realtime Compute for Apache Flink. Alat ini mendukung sinkronisasi seluruh database dari sumber ke danau data terpadu Anda. Topik ini memandu Anda dalam menggunakan Flink CDC untuk mengingesti data ke katalog DLF secara real time melalui Paimon REST.

Setelah menyelesaikan topik ini, Anda akan:

  • Membuat katalog DLF sebagai tujuan ingesti

  • Mengonfigurasi pekerjaan YAML Flink CDC dengan sink Paimon yang mengarah ke DLF

  • Menjalankan contoh untuk skenario ingesti umum: sinkronisasi database penuh, tabel partisi, tabel append-only, dan sumber Kafka

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Ruang kerja Realtime Compute for Apache Flink. Lihat Buat ruang kerja.

  • Ruang kerja Realtime Compute for Apache Flink dan katalog DLF berada di Wilayah yang sama.

  • VPC ruang kerja Realtime Compute for Apache Flink Anda telah ditambahkan ke daftar putih VPC DLF. Lihat Konfigurasi daftar putih VPC.

Persyaratan engine

Pekerjaan Realtime Compute for Apache Flink Anda harus menjalankan Ververica Runtime (VVR) versi 11.1.0 atau lebih baru.

Buat katalog DLF

Lihat Memulai dengan DLF.

Buat dan konfigurasi pekerjaan ingesti data

  1. Buat draf YAML untuk mengingesti data menggunakan Flink CDC. Untuk informasi selengkapnya, lihat Kembangkan pekerjaan Flink CDC untuk ingesti data (Beta).

  2. Konfigurasi modul sink:

    sink:
      type: paimon
      catalog.properties.metastore: rest
      catalog.properties.uri: dlf_uri
      catalog.properties.warehouse: your_warehouse
      catalog.properties.token.provider: dlf
      # (Opsional) Pengguna commit. Tetapkan pengguna commit berbeda untuk pekerjaan berbeda guna menghindari konflik.
      commit.user: your_job_name
      # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
      table.properties.deletion-vectors.enabled: true

    Ganti nilai placeholder dengan nilai aktual Anda:

    Opsi konfigurasiDeskripsiWajibBawaanContoh nilai
    catalog.properties.metastoreJenis metastore. Tetapkan ke rest.Yarest
    catalog.properties.token.providerPenyedia token. Tetapkan ke dlf.Yadlf
    catalog.properties.uriURI untuk mengakses server katalog REST DLF. Format: http://[region-id]-vpc.dlf.aliyuncs.com. Untuk ID wilayah, lihat Wilayah dan titik akhir.Yahttp://ap-southeast-1-vpc.dlf.aliyuncs.com
    catalog.properties.warehouseNama katalog Paimon.Yadlf_test
    commit.userPengguna commit untuk penulisan data. Tetapkan pengguna commit unik untuk pekerjaan berbeda guna menghindari konflik.Tidakadminyour_job_name
    table.properties.deletion-vectors.enabledMengaktifkan deletion vectors untuk meningkatkan performa baca dengan dampak minimal pada penulisan.Tidaktrue

Catatan penggunaan

Sebelum menjalankan pekerjaan, perhatikan batasan berikut:

  • Konflik pengguna commit: Pengguna commit bawaan adalah admin. Menjalankan beberapa pekerjaan penulisan data secara bersamaan ke tabel yang sama dengan pengguna commit yang sama menyebabkan konflik commit dan inkonsistensi data. Tetapkan commit.user unik untuk setiap pekerjaan.

  • Tidak ada opsi kompaksi atau bucket: DLF menyediakan kompaksi file otomatis. Jangan mengonfigurasi opsi kompaksi atau bucket seperti bucket dan num-sorted-run.compaction-trigger.

  • Deletion vectors: Tetapkan table.properties.deletion-vectors.enabled: true untuk mempercepat pembacaan secara signifikan dengan dampak minimal pada penulisan.

Contoh

Mengimpor data dari seluruh database MySQL ke DLF

Pekerjaan berikut menyinkronkan semua tabel dalam database MySQL ke DLF:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan data dari tabel yang dibuat pada fase inkremental.
  scan.binlog.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan shard tak terbatas untuk mencegah potensi error kehabisan memori (OOM) pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Deserialisasi data hanya dari tabel yang cocok untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Pengguna commit. Tetapkan pengguna commit berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

Opsi sumber MySQL yang direkomendasikan adalah:

OpsiDeskripsi
scan.binlog.newly-added-table.enabledMenyinkronkan data dari tabel yang dibuat selama fase inkremental.
include-comments.enabledMenyinkronkan komentar tabel dan bidang.
scan.incremental.snapshot.unbounded-chunk-first.enabledMencegah potensi error OOM pada TaskManager dengan mengutamakan shard tak terbatas.
scan.only.deserialize.captured.tables.changelog.enabledMendeserialisasi data hanya dari tabel yang cocok untuk mempercepat pembacaan.

Memasukkan data ke tabel partisi

Untuk mengingesti dari tabel sumber non-partisi ke tabel partisi di DLF, tambahkan opsi partition-keys ke modul transform. Untuk informasi selengkapnya, lihat Ingesti data dengan Flink CDC.

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan data dari tabel yang dibuat pada fase inkremental.
  scan.binlog.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan shard tak terbatas untuk mencegah potensi error kehabisan memori (OOM) pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Deserialisasi data hanya dari tabel yang cocok untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Pengguna commit. Tetapkan pengguna commit berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

Ingesti data ke tabel append-only

Untuk menerapkan penghapusan logis (soft delete) selama ingesti, gunakan converter-after-transform: SOFT_DELETE dalam modul transform. Ini mengubah operasi hapus menjadi operasi sisip, sehingga tabel downstream mencatat semua operasi perubahan secara lengkap. Bidang __data_event_type__ dalam projection menulis jenis perubahan sebagai kolom baru ke tabel downstream.

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan data dari tabel yang dibuat pada fase inkremental.
  scan.binlog.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan shard tak terbatas untuk mencegah potensi error kehabisan memori (OOM) pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Deserialisasi data hanya dari tabel yang cocok untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Pengguna commit. Tetapkan pengguna commit berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    partition-keys: id,pt
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    partition-keys: id,pt
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE

Untuk informasi selengkapnya, lihat Ingesti data dengan Flink CDC.

Sinkronkan data dari Kafka ke DLF secara real time

Pekerjaan berikut membaca data CDC dari topik Kafka inventory (tabel customers dan products dalam format Debezium JSON) dan menyinkronkannya ke tabel tujuan yang sesuai di DLF. Karena pesan Debezium JSON tidak berisi informasi primary key, primary key ditentukan secara eksplisit dalam modul transform.

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Pengguna commit. Tetapkan pengguna commit berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

# Debezium JSON tidak berisi info primary key, jadi tentukan primary key secara eksplisit.
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id

Catatan tambahan untuk sumber Kafka:

  • Sumber Kafka mendukung tiga format data: canal-json, debezium-json (bawaan), dan json.

  • Saat mengingesti dari beberapa partisi Kafka ke satu tabel di DLF, tetapkan debezium-json.distributed-tables atau canal-json.distributed-tables ke true.

  • Sumber Kafka mendukung beberapa kebijakan inferensi skema melalui opsi schema.inference.strategy. Untuk informasi selengkapnya, lihat Message Queue for Apache Kafka.

Untuk informasi selengkapnya, lihat Ingesti data dengan Flink CDC.