Topik ini menjelaskan praktik terbaik untuk menggunakan pekerjaan ingesti data CDC YAML dalam menulis data real-time ke gudang data.
Sinkronisasi real-time dari Flink CDC ke gudang data Hologres
Menggunakan pekerjaan ingesti data CDC YAML untuk menyinkronkan data ke Hologres dalam membangun gudang data real-time memungkinkan Anda memanfaatkan sepenuhnya kemampuan pemrosesan real-time yang kuat dari Flink dan fitur-fitur Hologres, seperti binary logging, penyimpanan hibrid baris-kolom, serta isolasi sumber daya yang kuat. Kombinasi ini mendukung pemrosesan dan analisis data real-time yang efisien dan skalabel.
Sinkronisasi real-time dari MySQL ke gudang data Hologres
Kode berikut menunjukkan pekerjaan ingesti data CDC YAML paling dasar untuk menyinkronkan seluruh database MySQL ke Hologres:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronkan komentar tabel dan komentar bidang.
include-comments.enabled: true
# (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi kesalahan TaskManager OutOfMemory.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi kesalahan TaskManager OutOfMemory.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan data.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADENDisarankan untuk mengatur include-comments.enabled di MySQL Source agar komentar tabel dan bidang disinkronkan saat pekerjaan dimulai untuk pertama kalinya. Untuk informasi lebih lanjut, lihat MySQL.
Disarankan untuk mengatur scan.incremental.snapshot.unbounded-chunk-first.enabled di MySQL Source guna mencegah potensi kesalahan TaskManager OutOfMemory saat pekerjaan dimulai untuk pertama kalinya. Untuk informasi lebih lanjut, lihat MySQL.
Gunakan pemetaan tipe yang lebih toleran
Konektor Hologres tidak dapat memproses peristiwa perubahan tipe kolom, tetapi mendukung beberapa hubungan pemetaan tipe. Untuk mendukung perubahan pada sumber data dengan lebih baik, Anda dapat memetakan beberapa tipe data MySQL ke tipe Hologres yang lebih luas. Hal ini memungkinkan Anda melewati peristiwa perubahan tipe yang tidak perlu dan memastikan pekerjaan berjalan sesuai harapan. Pemetaan dapat diubah menggunakan item konfigurasi sink.type-normalize-strategy, dengan nilai default STANDARD. Untuk informasi lebih lanjut, lihat Pemetaan tipe konektor Hologres untuk pekerjaan YAML ingesti data.
Sebagai contoh, Anda dapat menggunakan ONLY_BIGINT_OR_TEXT untuk memetakan tipe hanya ke tipe int8 dan text di Hologres. Dalam hal ini, jika tipe kolom di MySQL berubah dari INT menjadi BIGINT, Hologres akan memetakan kedua tipe tersebut ke tipe int8. Akibatnya, pekerjaan tidak melaporkan kesalahan untuk konversi tipe yang tidak didukung.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronkan komentar tabel dan komentar bidang.
include-comments.enabled: true
# (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi kesalahan TaskManager OutOfMemory.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi kesalahan TaskManager OutOfMemory.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan data.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXTTulis data ke tabel partisi
Saat menggunakan konektor Hologres sebagai sink dalam pekerjaan ingesti data CDC YAML, Anda dapat menulis data ke tabel partisi. Untuk informasi lebih lanjut, lihat Tulis Data ke Tabel Partisi.
Sinkronisasi real-time dari Kafka ke gudang data Hologres
Implementasikan Distribusi Data Real-time Menggunakan Flink CDC menjelaskan solusi untuk menyimpan data MySQL di Kafka. Anda juga dapat menggunakan pekerjaan ingesti data CDC YAML untuk menyinkronkan data dari Kafka ke Hologres dalam membangun gudang data real-time.
Asumsikan bahwa topik Kafka bernama inventory berisi data dari dua tabel, customers dan products, dalam format debezium-json. Pekerjaan berikut dapat menyinkronkan data dari kedua tabel tersebut ke tabel yang sesuai di Hologres.
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idSumber data Kafka mendukung membaca data dalam format berikut: json, canal-json, dan debezium-json (default).
Jika format datanya adalah debezium-json, Anda harus secara manual menambahkan kunci utama ke tabel menggunakan aturan transformasi:
transform: - source-table: \.*.\.* projection: \* primary-keys: idJika data dari satu tabel didistribusikan di beberapa partisi, atau jika data dari tabel bersharding di partisi berbeda perlu digabungkan, Anda harus mengatur item konfigurasi debezium-json.distributed-tables atau canal-json.distributed-tables menjadi true.
Sumber data Kafka mendukung beberapa kebijakan inferensi skema. Anda dapat mengatur kebijakan tersebut menggunakan item konfigurasi schema.inference.strategy. Untuk informasi lebih lanjut tentang inferensi skema dan kebijakan sinkronisasi perubahan, lihat Message Queue for Kafka.