全部产品
Search
文档中心

Realtime Compute for Apache Flink:Ingesti data database Real-time ke data lake

更新时间:Feb 06, 2026

Topik ini menjelaskan praktik terbaik untuk menggunakan Pekerjaan YAML ingesti data Change Data Capture (CDC) guna menulis data real-time ke Alibaba Cloud Data Lake Formation (DLF).

Alibaba Cloud Data Lake Formation (DLF) adalah platform terkelola penuh yang menyediakan metadata terpadu, penyimpanan data, dan manajemen data. DLF menawarkan fitur-fitur seperti pengelolaan metadata, pengelolaan izin, dan optimasi penyimpanan. Untuk informasi selengkapnya, lihat Apa itu Data Lake Formation?.

Pekerjaan ingesti data dapat menggunakan Paimon Catalog DLF sebagai tujuan. Anda dapat menggunakan pekerjaan ingesti data untuk mengingesti seluruh database skala besar ke dalam data lake.

Sinkronkan seluruh database MySQL ke DLF

Pekerjaan CDC YAML berikut menyinkronkan seluruh database MySQL ke DLF:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan data dari tabel yang baru dibuat pada fase inkremental.
  scan.binlog.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan distribusi chunk tak terbatas untuk mencegah potensi masalah OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  # Jenis Metastore. Atur ke rest.
  catalog.properties.metastore: rest
  # Penyedia token. Atur ke dlf.
  catalog.properties.token.provider: dlf
  # URI untuk mengakses DLF Rest Catalog Server. Formatnya adalah http://[region-id]-vpc.dlf.aliyuncs.com, contohnya http://cn-hangzhou-vpc.dlf.aliyuncs.com.
  catalog.properties.uri: dlf_uri
  # Nama DLF Catalog.
  catalog.properties.warehouse: your_warehouse
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true
Catatan
  • Untuk informasi selengkapnya tentang parameter sumber MySQL, lihat MySQL.

  • Tambahkan konfigurasi deletion-vectors.enabled ke parameter pembuatan tabel. Ini secara signifikan meningkatkan performa baca dengan dampak minimal pada performa tulis dan pembaruan, memungkinkan pembaruan hampir real-time dan kueri berkecepatan tinggi.

  • Karena DLF menyediakan fitur penggabungan file otomatis, jangan tambahkan parameter terkait penggabungan file atau bucket, seperti bucket dan num-sorted-run.compaction-trigger, ke parameter pembuatan tabel.

Tulis ke tabel partisi di DLF

Tabel sumber untuk pekerjaan ingesti data biasanya tidak mencakup informasi bidang partisi. Untuk menulis ke tabel turunan partisi, Anda harus mengonfigurasi bidang partisi menggunakan pengaturan partition-keys dalam Referensi Pengembangan Pekerjaan Ingesti Data Flink CDC, sebagaimana dijelaskan dalam Referensi Pengembangan Pekerjaan Ingesti Data Flink CDC. Contoh berikut menunjukkan cara mengonfigurasi pengaturan ini:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan data dari tabel yang baru dibuat pada fase inkremental.
  scan.binlog.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan distribusi chunk tak terbatas untuk mencegah potensi masalah OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    # (Opsional) Tetapkan bidang partisi.  
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

Tulis ke tabel Append-Only di DLF

Tabel sumber dalam pekerjaan ingesti data berisi data perubahan lengkap. Untuk menerapkan soft delete dengan mengonversi operasi hapus menjadi operasi sisip di tabel tujuan, lihat referensi pengembangan pekerjaan ingesti data Flink CDC. Contoh berikut menunjukkan konfigurasinya:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan data dari tabel yang baru dibuat pada fase inkremental.
  scan.binlog.newly-added-table.enabled: true
  # (Opsional) Sinkronkan komentar tabel dan bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan distribusi chunk tak terbatas untuk mencegah potensi masalah OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true
  
transform:
  - source-table: mysql_test.tbl1
    # (Opsional) Tetapkan bidang partisi.
    partition-keys: id,pt
    # (Opsional) Terapkan soft delete.
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    # (Opsional) Tetapkan bidang partisi.
    partition-keys: id,pt
    # (Opsional) Terapkan soft delete.
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
Catatan
  • Tambahkan __data_event_type ke projection untuk menulis jenis perubahan sebagai bidang baru ke tabel tujuan. Atur converter-after-transform ke SOFT_DELETE untuk mengonversi operasi hapus menjadi operasi sisip. Hal ini memastikan bahwa semua operasi perubahan direkam sepenuhnya di tabel hilir. Untuk informasi selengkapnya, lihat referensi pengembangan pekerjaan ingesti data Flink CDC.

Sinkronkan data CDC Kafka ke DLF secara real-time

Anda dapat menyinkronkan data MySQL ke Kafka secara real-time menggunakan distribusi real-time. Kemudian, Anda dapat mengonfigurasi pekerjaan CDC YAML untuk menyinkronkan data Kafka tersebut ke DLF.

Asumsikan topik inventory di Kafka menyimpan data untuk dua tabel, customers dan products, dengan format data Debezium JSON. Pekerjaan contoh berikut menyinkronkan data dari kedua tabel tersebut ke tabel tujuan yang sesuai di DLF:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
  table.properties.deletion-vectors.enabled: true

# Debezium JSON tidak menyertakan informasi primary key. Tambahkan primary key ke tabel.  
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
Catatan
  • Sumber data Kafka mendukung pembacaan data dalam format canal-json, debezium-json (default), dan json.

  • Jika format pesan Kafka berubah—misalnya dengan penambahan bidang baru—perubahan tersebut secara otomatis disinkronkan ke skema tabel Paimon, sehingga meningkatkan fleksibilitas sinkronisasi data.

  • Jika format datanya debezium-json, Anda harus menambahkan primary key secara manual ke tabel menggunakan aturan transform karena pesan debezium-json tidak mencatat informasi primary key:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • Jika data satu tabel didistribusikan di beberapa partisi, atau untuk menggabungkan tabel dari partisi berbeda menggunakan sharding, atur item konfigurasi debezium-json.distributed-tables atau canal-json.distributed-tables ke true.

  • Sumber data Kafka mendukung beberapa kebijakan inferensi skema. Anda dapat mengatur kebijakan tersebut menggunakan item konfigurasi schema.inference.strategy. Untuk informasi selengkapnya tentang kebijakan inferensi skema dan sinkronisasi perubahan, lihat Message Queue for Kafka.