全部产品
Search
文档中心

Data Lake Formation:Mengakses DLF menggunakan Flink CDC

更新时间:Nov 11, 2025

Topik ini menjelaskan cara menggunakan Flink CDC di Realtime Compute for Apache Flink untuk mengakses Katalog DLF melalui API REST Paimon.

Prasyarat

Persyaratan mesin

Pekerjaan Realtime Compute for Apache Flink Anda menjalankan Ververica Runtime (VVR) versi 11.1.0 atau lebih baru.

Buat katalog DLF

Lihat Mulai dengan DLF.

Parameter Flink CDC untuk menghubungkan ke katalog

Untuk membuat pekerjaan ingesti data, lihat Kembangkan pekerjaan ingesti data Flink CDC (Pratinjau Publik).

Gunakan konfigurasi berikut untuk sink dalam pekerjaan ingesti data Flink:

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk mengirimkan pekerjaan. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

Tabel berikut menjelaskan parameter-parameter tersebut.

Parameter

Deskripsi

Wajib

Contoh

catalog.properties.metastore

Jenis metastore. Tetapkan ke `rest`.

Ya

rest

catalog.properties.token.provider

Penyedia token. Tetapkan ke `dlf`.

Ya

dlf

catalog.properties.uri

URI untuk mengakses Server Katalog REST DLF. Formatnya adalah http://[region-id]-vpc.dlf.aliyuncs.com. Untuk informasi lebih lanjut tentang ID Wilayah, lihat Titik akhir layanan.

Ya

http://cn-hangzhou-vpc.dlf.aliyuncs.com

catalog.properties.warehouse

Nama Katalog DLF.

Ya

dlf_test

Contoh konfigurasi

Bagian berikut menyediakan contoh konfigurasi khas untuk menyinkronkan data ke data lake DLF menggunakan pekerjaan YAML Flink CDC.

Menyinkronkan seluruh database MySQL ke data lake DLF

Kode YAML berikut menunjukkan pekerjaan CDC yang menyinkronkan seluruh database MySQL ke DLF:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Opsional) Menyinkronkan data dari tabel yang baru dibuat selama fase inkremental.
  scan.binlog.newly-added-table.enabled: true
  # (Opsional) Menyinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Mendahulukan distribusi shard tak terbatas untuk mencegah potensi kesalahan OutOfMemory Pengelola Tugas.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Mengaktifkan filter penguraian untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk mengirimkan pekerjaan. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true
Catatan

[Konfigurasi Sumber MySQL] Tetapkan parameter berikut. Untuk informasi lebih lanjut, lihat MySQL.

  1. Parameter: scan.binlog.newly-added-table.enabled

    Fungsi: Menyinkronkan data dari tabel yang baru dibuat selama fase inkremental.

  2. Parameter: include-comments.enabled

    Fungsi: Menyinkronkan komentar tabel dan bidang.

  3. Parameter: scan.incremental.snapshot.unbounded-chunk-first.enabled

    Fungsi: Mencegah potensi kesalahan OutOfMemory Pengelola Tugas.

  4. Parameter: scan.only.deserialize.captured.tables.changelog.enabled: true

    Fungsi: Mengurai data hanya dari tabel yang sesuai dengan pekerjaan untuk mempercepat pembacaan.

Catatan

[Konfigurasi Sink Paimon]

  1. Parameter koneksi katalog

  • Awalan parameter: catalog.properties

  • Fungsi: Menentukan informasi koneksi katalog.

  1. Parameter pembuatan tabel

  • Awalan parameter: table.properties

  • Fungsi: Menentukan informasi pembuatan tabel.

  • Konfigurasi yang direkomendasikan: Tambahkan konfigurasi `deletion-vectors.enabled` ke parameter pembuatan tabel. Ini sangat meningkatkan performa baca dengan dampak minimal pada performa tulis dan pembaruan, mencapai pembaruan hampir real-time dan kueri berkecepatan tinggi.

  • Informasi tambahan: DLF menyediakan fitur penggabungan file otomatis. Jangan tambahkan parameter terkait penggabungan file dan bucket, seperti `bucket` dan `num-sorted-run.compaction-trigger`, ke parameter pembuatan tabel.

  1. Pengirim

  • Nama parameter: commit.user

  • Fungsi: Pengguna yang melakukan commit file yang ditulis ke Paimon.

  • Konfigurasi yang direkomendasikan: Tetapkan pengguna commit berbeda untuk pekerjaan berbeda. Anda dapat menggunakan nama pekerjaan.

  • Informasi tambahan: Pengguna commit default adalah `admin`. Menggunakan pengguna default dapat menyebabkan konflik commit dan inkonsistensi ketika beberapa pekerjaan menulis ke tabel yang sama.

Menulis data ke tabel partisi di data lake DLF

Tabel sumber dalam pekerjaan ingesti data biasanya tidak berisi bidang partisi. Untuk menulis data ke tabel turunan partisi, Anda dapat menetapkan bidang partisi menggunakan `partition-keys`. Untuk informasi lebih lanjut, lihat Referensi Pengembangan Pekerjaan Ingesti Data Flink CDC. Kode berikut memberikan contoh konfigurasi:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Opsional) Menyinkronkan data dari tabel yang baru dibuat selama fase inkremental.
  scan.binlog.newly-added-table.enabled: true
  # (Opsional) Menyinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Mendahulukan distribusi shard tak terbatas untuk mencegah potensi kesalahan OutOfMemory Pengelola Tugas.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Mengaktifkan filter penguraian untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk mengirimkan pekerjaan. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    # (Opsional) Tetapkan bidang partisi.  
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

Menulis data ke tabel append-only di data lake DLF

Tabel sumber dalam pekerjaan ingesti data berisi jenis perubahan lengkap. Untuk mengonfigurasi tabel turunan agar mengubah operasi hapus menjadi operasi sisip untuk penghapusan logis, lihat Referensi Pengembangan Pekerjaan Ingesti Data Flink CDC. Kode berikut memberikan contoh konfigurasi:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Opsional) Menyinkronkan data dari tabel yang baru dibuat selama fase inkremental.
  scan.binlog.newly-added-table.enabled: true
  # (Opsional) Menyinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Mendahulukan distribusi shard tak terbatas untuk mencegah potensi kesalahan OutOfMemory Pengelola Tugas.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Mengaktifkan filter penguraian untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk mengirimkan pekerjaan. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true
  
transform:
  - source-table: mysql_test.tbl1
    # (Opsional) Tetapkan bidang partisi.
    partition-keys: id,pt
    # (Opsional) Implementasikan soft delete.
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    # (Opsional) Tetapkan bidang partisi.
    partition-keys: id,pt
    # (Opsional) Implementasikan soft delete.
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
Catatan
  • Anda dapat menambahkan `__data_event_type` ke proyeksi untuk menulis jenis perubahan ke tabel turunan sebagai bidang baru, dan menetapkan `converter-after-transform` ke `SOFT_DELETE` untuk mengubah operasi hapus menjadi operasi sisip. Hal ini memungkinkan sistem downstream untuk merekam secara lengkap semua operasi perubahan. Untuk informasi lebih lanjut, lihat Referensi Pengembangan Pekerjaan Ingesti Data Flink CDC.

Menyinkronkan data dari Kafka ke data lake DLF secara real time

Asumsikan bahwa topik `inventory` di Kafka menyimpan data untuk dua tabel, `customers` dan `products`, dan format datanya adalah Debezium JSON. Pekerjaan contoh berikut menyinkronkan data dari kedua tabel tersebut ke tabel tujuan yang sesuai di DLF:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Nama pengguna untuk mengirimkan pekerjaan. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
  commit.user: your_job_name
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

# Debezium JSON tidak berisi informasi kunci primer. Anda harus menambahkan kunci primer ke tabel secara terpisah.  
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
Catatan
  • Sumber data Kafka mendukung pembacaan data dalam format `canal-json`, `debezium-json` (default), dan `json`.

  • Ketika format data adalah `debezium-json`, Anda harus menambahkan kunci primer ke tabel secara manual menggunakan aturan transform karena pesan Debezium JSON tidak mencatat informasi kunci primer:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • Ketika data satu tabel didistribusikan di beberapa partisi, atau ketika tabel di partisi berbeda harus digabung setelah sharding, Anda dapat menetapkan parameter debezium-json.distributed-tables atau canal-json.distributed-tables ke `true`.

  • Sumber data Kafka mendukung beberapa kebijakan inferensi skema. Anda dapat menetapkan kebijakan tersebut menggunakan parameter schema.inference.strategy. Untuk informasi lebih lanjut tentang kebijakan inferensi skema dan sinkronisasi perubahan, lihat Antrian Pesan Kafka.

Untuk konfigurasi pekerjaan yang lebih rinci, lihat Referensi Pengembangan Pekerjaan Ingesti Data Flink CDC.