Flink Change Data Capture (CDC) adalah alat ingesti data yang disediakan oleh Realtime Compute for Apache Flink. Alat ini mendukung sinkronisasi seluruh database dari sumber ke danau data terpadu Anda. Topik ini memandu Anda dalam menggunakan Flink CDC untuk mengingesti data ke katalog DLF secara real time melalui Paimon REST.
Setelah menyelesaikan topik ini, Anda akan:
Membuat katalog DLF sebagai tujuan ingesti
Mengonfigurasi pekerjaan YAML Flink CDC dengan sink Paimon yang mengarah ke DLF
Menjalankan contoh untuk skenario ingesti umum: sinkronisasi database penuh, tabel partisi, tabel append-only, dan sumber Kafka
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Ruang kerja Realtime Compute for Apache Flink. Lihat Buat ruang kerja.
Ruang kerja Realtime Compute for Apache Flink dan katalog DLF berada di Wilayah yang sama.
VPC ruang kerja Realtime Compute for Apache Flink Anda telah ditambahkan ke daftar putih VPC DLF. Lihat Konfigurasi daftar putih VPC.
Persyaratan engine
Pekerjaan Realtime Compute for Apache Flink Anda harus menjalankan Ververica Runtime (VVR) versi 11.1.0 atau lebih baru.
Buat katalog DLF
Lihat Memulai dengan DLF.
Buat dan konfigurasi pekerjaan ingesti data
Buat draf YAML untuk mengingesti data menggunakan Flink CDC. Untuk informasi selengkapnya, lihat Kembangkan pekerjaan Flink CDC untuk ingesti data (Beta).
Konfigurasi modul sink:
sink: type: paimon catalog.properties.metastore: rest catalog.properties.uri: dlf_uri catalog.properties.warehouse: your_warehouse catalog.properties.token.provider: dlf # (Opsional) Pengguna commit. Tetapkan pengguna commit berbeda untuk pekerjaan berbeda guna menghindari konflik. commit.user: your_job_name # (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca. table.properties.deletion-vectors.enabled: trueGanti nilai placeholder dengan nilai aktual Anda:
Opsi konfigurasi Deskripsi Wajib Bawaan Contoh nilai catalog.properties.metastoreJenis metastore. Tetapkan ke rest.Ya — restcatalog.properties.token.providerPenyedia token. Tetapkan ke dlf.Ya — dlfcatalog.properties.uriURI untuk mengakses server katalog REST DLF. Format: http://[region-id]-vpc.dlf.aliyuncs.com. Untuk ID wilayah, lihat Wilayah dan titik akhir.Ya — http://ap-southeast-1-vpc.dlf.aliyuncs.comcatalog.properties.warehouseNama katalog Paimon. Ya — dlf_testcommit.userPengguna commit untuk penulisan data. Tetapkan pengguna commit unik untuk pekerjaan berbeda guna menghindari konflik. Tidak adminyour_job_nametable.properties.deletion-vectors.enabledMengaktifkan deletion vectors untuk meningkatkan performa baca dengan dampak minimal pada penulisan. Tidak — true
Catatan penggunaan
Sebelum menjalankan pekerjaan, perhatikan batasan berikut:
Konflik pengguna commit: Pengguna commit bawaan adalah
admin. Menjalankan beberapa pekerjaan penulisan data secara bersamaan ke tabel yang sama dengan pengguna commit yang sama menyebabkan konflik commit dan inkonsistensi data. Tetapkancommit.userunik untuk setiap pekerjaan.Tidak ada opsi kompaksi atau bucket: DLF menyediakan kompaksi file otomatis. Jangan mengonfigurasi opsi kompaksi atau bucket seperti
bucketdannum-sorted-run.compaction-trigger.Deletion vectors: Tetapkan
table.properties.deletion-vectors.enabled: trueuntuk mempercepat pembacaan secara signifikan dengan dampak minimal pada penulisan.
Contoh
Mengimpor data dari seluruh database MySQL ke DLF
Pekerjaan berikut menyinkronkan semua tabel dalam database MySQL ke DLF:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronkan data dari tabel yang dibuat pada fase inkremental.
scan.binlog.newly-added-table.enabled: true
# (Opsional) Sinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan shard tak terbatas untuk mencegah potensi error kehabisan memori (OOM) pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Deserialisasi data hanya dari tabel yang cocok untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Pengguna commit. Tetapkan pengguna commit berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: trueOpsi sumber MySQL yang direkomendasikan adalah:
| Opsi | Deskripsi |
|---|---|
scan.binlog.newly-added-table.enabled | Menyinkronkan data dari tabel yang dibuat selama fase inkremental. |
include-comments.enabled | Menyinkronkan komentar tabel dan bidang. |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Mencegah potensi error OOM pada TaskManager dengan mengutamakan shard tak terbatas. |
scan.only.deserialize.captured.tables.changelog.enabled | Mendeserialisasi data hanya dari tabel yang cocok untuk mempercepat pembacaan. |
Memasukkan data ke tabel partisi
Untuk mengingesti dari tabel sumber non-partisi ke tabel partisi di DLF, tambahkan opsi partition-keys ke modul transform. Untuk informasi selengkapnya, lihat Ingesti data dengan Flink CDC.
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronkan data dari tabel yang dibuat pada fase inkremental.
scan.binlog.newly-added-table.enabled: true
# (Opsional) Sinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan shard tak terbatas untuk mencegah potensi error kehabisan memori (OOM) pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Deserialisasi data hanya dari tabel yang cocok untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Pengguna commit. Tetapkan pengguna commit berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
partition-keys: id,pt
- source-table: mysql_test.tbl2
partition-keys: id,ptIngesti data ke tabel append-only
Untuk menerapkan penghapusan logis (soft delete) selama ingesti, gunakan converter-after-transform: SOFT_DELETE dalam modul transform. Ini mengubah operasi hapus menjadi operasi sisip, sehingga tabel downstream mencatat semua operasi perubahan secara lengkap. Bidang __data_event_type__ dalam projection menulis jenis perubahan sebagai kolom baru ke tabel downstream.
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronkan data dari tabel yang dibuat pada fase inkremental.
scan.binlog.newly-added-table.enabled: true
# (Opsional) Sinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan shard tak terbatas untuk mencegah potensi error kehabisan memori (OOM) pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Deserialisasi data hanya dari tabel yang cocok untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Pengguna commit. Tetapkan pengguna commit berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
partition-keys: id,pt
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
- source-table: mysql_test.tbl2
partition-keys: id,pt
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETEUntuk informasi selengkapnya, lihat Ingesti data dengan Flink CDC.
Sinkronkan data dari Kafka ke DLF secara real time
Pekerjaan berikut membaca data CDC dari topik Kafka inventory (tabel customers dan products dalam format Debezium JSON) dan menyinkronkannya ke tabel tujuan yang sesuai di DLF. Karena pesan Debezium JSON tidak berisi informasi primary key, primary key ditentukan secara eksplisit dalam modul transform.
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Pengguna commit. Tetapkan pengguna commit berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: true
# Debezium JSON tidak berisi info primary key, jadi tentukan primary key secara eksplisit.
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idCatatan tambahan untuk sumber Kafka:
Sumber Kafka mendukung tiga format data:
canal-json,debezium-json(bawaan), danjson.Saat mengingesti dari beberapa partisi Kafka ke satu tabel di DLF, tetapkan
debezium-json.distributed-tablesataucanal-json.distributed-tablesketrue.Sumber Kafka mendukung beberapa kebijakan inferensi skema melalui opsi
schema.inference.strategy. Untuk informasi selengkapnya, lihat Message Queue for Apache Kafka.
Untuk informasi selengkapnya, lihat Ingesti data dengan Flink CDC.