Topik ini menjelaskan praktik terbaik untuk menggunakan pekerjaan ingesti data CDC (Change Data Capture) YAML guna menulis data real-time ke gudang data.
Sinkronisasi real-time dari Flink CDC ke gudang data Hologres
Penggunaan pekerjaan ingesti data CDC YAML untuk menyinkronkan data ke Hologres dalam membangun gudang data real-time memungkinkan Anda memanfaatkan sepenuhnya kemampuan pemrosesan real-time yang andal dari Flink serta fitur-fitur Hologres, seperti binary logging, penyimpanan hibrida baris-kolom, dan isolasi resource yang kuat. Kombinasi ini mendukung pemrosesan dan analisis data real-time yang efisien serta skalabel.
Sinkronisasi real-time dari MySQL ke gudang data Hologres
Kode berikut menunjukkan pekerjaan ingesti data CDC YAML paling dasar untuk menyinkronkan seluruh database MySQL ke Hologres:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronkan komentar tabel dan komentar bidang.
include-comments.enabled: true
# (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan data.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
-
Disarankan mengaktifkan include-comments.enabled di MySQL Source untuk menyinkronkan komentar tabel dan bidang saat pekerjaan pertama kali dijalankan. Untuk informasi selengkapnya, lihat MySQL.
-
Disarankan mengaktifkan scan.incremental.snapshot.unbounded-chunk-first.enabled di MySQL Source untuk mencegah potensi error OutOfMemory pada TaskManager saat pekerjaan pertama kali dijalankan. Untuk informasi selengkapnya, lihat MySQL.
Gunakan pemetaan tipe yang lebih toleran
Konektor Hologres tidak dapat memproses event perubahan tipe kolom, tetapi mendukung berbagai hubungan pemetaan tipe. Untuk mendukung perubahan pada sumber data secara lebih baik, Anda dapat memetakan beberapa tipe data MySQL ke tipe Hologres yang lebih luas. Pendekatan ini memungkinkan Anda melewati event perubahan tipe yang tidak perlu dan memastikan pekerjaan berjalan sesuai harapan. Pemetaan tersebut dapat diatur melalui item konfigurasi sink.type-normalize-strategy. Nilai default-nya adalah STANDARD. Untuk informasi selengkapnya, lihat Pemetaan tipe konektor Hologres untuk pekerjaan YAML ingesti data.
Sebagai contoh, Anda dapat menggunakan ONLY_BIGINT_OR_TEXT untuk memetakan tipe hanya ke tipe int8 dan text di Hologres. Dalam kasus ini, jika tipe kolom di MySQL berubah dari INT ke BIGINT, Hologres akan memetakan kedua tipe tersebut ke int8, sehingga pekerjaan tidak melaporkan error akibat konversi tipe yang tidak didukung.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronkan komentar tabel dan komentar bidang.
include-comments.enabled: true
# (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan data.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
Tulis data ke tabel partisi
Saat menggunakan konektor Hologres sebagai sink dalam pekerjaan ingesti data CDC YAML, Anda dapat menulis data ke tabel partisi. Untuk informasi selengkapnya, lihat Tulis data ke tabel partisi.
Sinkronisasi real-time dari Kafka ke gudang data Hologres
Implementasi distribusi real-time menjelaskan solusi untuk menyimpan data MySQL di Kafka. Anda kemudian dapat menggunakan file YAML ingesti data untuk menyinkronkan data tersebut ke Hologres guna membangun gudang data real-time.
Misalnya, topik Kafka bernama inventory berisi data dari dua tabel, customers dan products, dalam format debezium-json. Pekerjaan berikut dapat menyinkronkan data dari kedua tabel tersebut ke tabel yang sesuai di Hologres.
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
-
Sumber data Kafka mendukung pembacaan data dalam format berikut: json, canal-json, dan debezium-json (default).
-
Jika format datanya adalah debezium-json, Anda harus menambahkan kunci primer ke tabel secara manual menggunakan aturan transform:
transform: - source-table: \.*.\.* projection: \* primary-keys: id -
Jika data satu tabel didistribusikan ke beberapa partisi, atau jika data dari tabel ter-shard di partisi berbeda perlu digabungkan, Anda harus mengatur item konfigurasi debezium-json.distributed-tables atau canal-json.distributed-tables ke true.
-
Sumber data Kafka mendukung beberapa kebijakan inferensi skema. Anda dapat mengatur kebijakan tersebut melalui item konfigurasi schema.inference.strategy. Untuk informasi selengkapnya tentang kebijakan inferensi skema dan sinkronisasi perubahan, lihat Message Queue for Kafka.