Topik ini menjelaskan cara menggunakan Flink CDC di Realtime Compute for Apache Flink untuk mengakses Katalog DLF melalui API REST Paimon.
Prasyarat
Anda telah membuat ruang kerja Realtime Compute for Apache Flink. Lihat Aktifkan Realtime Compute for Apache Flink.
Ruang kerja Realtime Compute for Apache Flink dan katalog DLF berada di Wilayah yang sama.
Anda telah menambahkan VPC dari ruang kerja Realtime Compute for Apache Flink ke daftar putih VPC DLF. Lihat Konfigurasi daftar putih VPC.
Persyaratan mesin
Pekerjaan Realtime Compute for Apache Flink Anda menjalankan Ververica Runtime (VVR) versi 11.1.0 atau lebih baru.
Buat katalog DLF
Lihat Mulai dengan DLF.
Parameter Flink CDC untuk menghubungkan ke katalog
Untuk membuat pekerjaan ingesti data, lihat Kembangkan pekerjaan ingesti data Flink CDC (Pratinjau Publik).
Gunakan konfigurasi berikut untuk sink dalam pekerjaan ingesti data Flink:
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk mengirimkan pekerjaan. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: trueTabel berikut menjelaskan parameter-parameter tersebut.
Parameter | Deskripsi | Wajib | Contoh |
| Jenis metastore. Tetapkan ke `rest`. | Ya | rest |
| Penyedia token. Tetapkan ke `dlf`. | Ya | dlf |
| URI untuk mengakses Server Katalog REST DLF. Formatnya adalah | Ya | http://cn-hangzhou-vpc.dlf.aliyuncs.com |
| Nama Katalog DLF. | Ya | dlf_test |
Contoh konfigurasi
Bagian berikut menyediakan contoh konfigurasi khas untuk menyinkronkan data ke data lake DLF menggunakan pekerjaan YAML Flink CDC.
Menyinkronkan seluruh database MySQL ke data lake DLF
Kode YAML berikut menunjukkan pekerjaan CDC yang menyinkronkan seluruh database MySQL ke DLF:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Opsional) Menyinkronkan data dari tabel yang baru dibuat selama fase inkremental.
scan.binlog.newly-added-table.enabled: true
# (Opsional) Menyinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Mendahulukan distribusi shard tak terbatas untuk mencegah potensi kesalahan OutOfMemory Pengelola Tugas.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Mengaktifkan filter penguraian untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk mengirimkan pekerjaan. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: true[Konfigurasi Sumber MySQL] Tetapkan parameter berikut. Untuk informasi lebih lanjut, lihat MySQL.
Parameter: scan.binlog.newly-added-table.enabled
Fungsi: Menyinkronkan data dari tabel yang baru dibuat selama fase inkremental.
Parameter: include-comments.enabled
Fungsi: Menyinkronkan komentar tabel dan bidang.
Parameter: scan.incremental.snapshot.unbounded-chunk-first.enabled
Fungsi: Mencegah potensi kesalahan OutOfMemory Pengelola Tugas.
Parameter: scan.only.deserialize.captured.tables.changelog.enabled: true
Fungsi: Mengurai data hanya dari tabel yang sesuai dengan pekerjaan untuk mempercepat pembacaan.
[Konfigurasi Sink Paimon]
Parameter koneksi katalog
Awalan parameter: catalog.properties
Fungsi: Menentukan informasi koneksi katalog.
Parameter pembuatan tabel
Awalan parameter: table.properties
Fungsi: Menentukan informasi pembuatan tabel.
Konfigurasi yang direkomendasikan: Tambahkan konfigurasi `deletion-vectors.enabled` ke parameter pembuatan tabel. Ini sangat meningkatkan performa baca dengan dampak minimal pada performa tulis dan pembaruan, mencapai pembaruan hampir real-time dan kueri berkecepatan tinggi.
Informasi tambahan: DLF menyediakan fitur penggabungan file otomatis. Jangan tambahkan parameter terkait penggabungan file dan bucket, seperti `bucket` dan `num-sorted-run.compaction-trigger`, ke parameter pembuatan tabel.
Pengirim
Nama parameter: commit.user
Fungsi: Pengguna yang melakukan commit file yang ditulis ke Paimon.
Konfigurasi yang direkomendasikan: Tetapkan pengguna commit berbeda untuk pekerjaan berbeda. Anda dapat menggunakan nama pekerjaan.
Informasi tambahan: Pengguna commit default adalah `admin`. Menggunakan pengguna default dapat menyebabkan konflik commit dan inkonsistensi ketika beberapa pekerjaan menulis ke tabel yang sama.
Menulis data ke tabel partisi di data lake DLF
Tabel sumber dalam pekerjaan ingesti data biasanya tidak berisi bidang partisi. Untuk menulis data ke tabel turunan partisi, Anda dapat menetapkan bidang partisi menggunakan `partition-keys`. Untuk informasi lebih lanjut, lihat Referensi Pengembangan Pekerjaan Ingesti Data Flink CDC. Kode berikut memberikan contoh konfigurasi:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Opsional) Menyinkronkan data dari tabel yang baru dibuat selama fase inkremental.
scan.binlog.newly-added-table.enabled: true
# (Opsional) Menyinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Mendahulukan distribusi shard tak terbatas untuk mencegah potensi kesalahan OutOfMemory Pengelola Tugas.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Mengaktifkan filter penguraian untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk mengirimkan pekerjaan. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
# (Opsional) Tetapkan bidang partisi.
partition-keys: id,pt
- source-table: mysql_test.tbl2
partition-keys: id,ptMenulis data ke tabel append-only di data lake DLF
Tabel sumber dalam pekerjaan ingesti data berisi jenis perubahan lengkap. Untuk mengonfigurasi tabel turunan agar mengubah operasi hapus menjadi operasi sisip untuk penghapusan logis, lihat Referensi Pengembangan Pekerjaan Ingesti Data Flink CDC. Kode berikut memberikan contoh konfigurasi:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (Opsional) Menyinkronkan data dari tabel yang baru dibuat selama fase inkremental.
scan.binlog.newly-added-table.enabled: true
# (Opsional) Menyinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Mendahulukan distribusi shard tak terbatas untuk mencegah potensi kesalahan OutOfMemory Pengelola Tugas.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Mengaktifkan filter penguraian untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk mengirimkan pekerjaan. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
# (Opsional) Tetapkan bidang partisi.
partition-keys: id,pt
# (Opsional) Implementasikan soft delete.
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
- source-table: mysql_test.tbl2
# (Opsional) Tetapkan bidang partisi.
partition-keys: id,pt
# (Opsional) Implementasikan soft delete.
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETEAnda dapat menambahkan `__data_event_type` ke proyeksi untuk menulis jenis perubahan ke tabel turunan sebagai bidang baru, dan menetapkan `converter-after-transform` ke `SOFT_DELETE` untuk mengubah operasi hapus menjadi operasi sisip. Hal ini memungkinkan sistem downstream untuk merekam secara lengkap semua operasi perubahan. Untuk informasi lebih lanjut, lihat Referensi Pengembangan Pekerjaan Ingesti Data Flink CDC.
Menyinkronkan data dari Kafka ke data lake DLF secara real time
Asumsikan bahwa topik `inventory` di Kafka menyimpan data untuk dua tabel, `customers` dan `products`, dan format datanya adalah Debezium JSON. Pekerjaan contoh berikut menyinkronkan data dari kedua tabel tersebut ke tabel tujuan yang sesuai di DLF:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk mengirimkan pekerjaan. Tetapkan nama pengguna berbeda untuk pekerjaan berbeda guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan performa baca.
table.properties.deletion-vectors.enabled: true
# Debezium JSON tidak berisi informasi kunci primer. Anda harus menambahkan kunci primer ke tabel secara terpisah.
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idSumber data Kafka mendukung pembacaan data dalam format `canal-json`, `debezium-json` (default), dan `json`.
Ketika format data adalah `debezium-json`, Anda harus menambahkan kunci primer ke tabel secara manual menggunakan aturan transform karena pesan Debezium JSON tidak mencatat informasi kunci primer:
transform: - source-table: \.*.\.* projection: \* primary-keys: idKetika data satu tabel didistribusikan di beberapa partisi, atau ketika tabel di partisi berbeda harus digabung setelah sharding, Anda dapat menetapkan parameter debezium-json.distributed-tables atau canal-json.distributed-tables ke `true`.
Sumber data Kafka mendukung beberapa kebijakan inferensi skema. Anda dapat menetapkan kebijakan tersebut menggunakan parameter schema.inference.strategy. Untuk informasi lebih lanjut tentang kebijakan inferensi skema dan sinkronisasi perubahan, lihat Antrian Pesan Kafka.
Untuk konfigurasi pekerjaan yang lebih rinci, lihat Referensi Pengembangan Pekerjaan Ingesti Data Flink CDC.