Topik ini menjelaskan praktik terbaik untuk menulis data real-time ke antrian pesan umum menggunakan pekerjaan YAML ingesti data.
Tulis data database lengkap dan inkremental ke Kafka secara real-time
Anda dapat mengimpor data dari MySQL ke Kafka menggunakan pekerjaan YAML ingesti data, lalu mendistribusikannya ke berbagai sistem downstream sesuai kebutuhan. Pendekatan ini menghindari koneksi langsung dari beberapa pekerjaan ke database bisnis dan mengurangi beban pemrosesan.
Sinkronkan data binary logging MySQL ke Kafka
Dalam beberapa skenario, Anda mungkin perlu menyimpan data binary logging mentah untuk tugas seperti audit atau replay data. Pekerjaan YAML ingesti data mendukung sinkronisasi data binary logging MySQL mentah ke Kafka, memungkinkan pembacaan data secara terdistribusi dan membantu mengatasi masalah hot spot data.
Misalnya, database kafka_test berisi dua tabel, customers dan products. Pekerjaan berikut menyinkronkan data dari kedua tabel tersebut ke dua topik dengan nama yang sama: customers dan products.
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: kafka_test.\.*
server-id: 8601-8604
# (Opsional) Sinkronkan data lengkap dan inkremental dari tabel yang baru ditambahkan.
scan.newly-added-table.enabled: true
# (Opsional) Sinkronkan komentar tabel dan bidang.
include-comments.enabled: true
# (Opsional) Utamakan mendistribusikan chunk tak terbatas untuk menghindari potensi kesalahan OutOfMemory pada TaskManager.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Kirim waktu perubahan data sebagai metadata.
metadata-column.include-list: op_ts
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
# Alibaba Cloud Kafka tidak mendukung penulisan idempoten atau transaksional. Nonaktifkan fitur idempotensi.
properties.enable.idempotence: false
# (Opsional) Tetapkan pemetaan antara tabel hulu dan topik Kafka.
sink.tableId-to-topic.mapping: kafka_test.customers:customers;kafka_test.products:productsKode berikut menunjukkan format isi pesan Kafka yang dihasilkan oleh pernyataan UPDATE pada tabel customers:
// debezium-json
{
"before": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
},
"after": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
},
"op": "u",
"source": {
"db": "kafka_test",
"table": "customers",
"ts_ms": 1728528674000
}
}
// canal-json
{
"old": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
}
],
"data": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
}
],
"type": "UPDATE",
"database": "kafka_test",
"table": "customers",
"pkNames": [
"id"
],
"ts": 1728528674000,
"es": 0
}Log biner dapat ditulis dalam format json, canal-json, atau debezium-json (default). Untuk informasi selengkapnya, lihat Message Queue for Apache Kafka.
Jika Anda tidak menggunakan parameter sink.tableId-to-topic.mapping, topik dibuat di Kafka menggunakan format database.table. Misalnya, untuk tabel MySQL
kafka_test.customers, nama topik yang sesuai di Kafka adalahkafka_test.customers. Anda dapat menggunakan sink.tableId-to-topic.mapping untuk mengonfigurasi pemetaan antara tabel hulu dan topik tujuan. Hal ini memungkinkan Anda mengubah nama topik tujuan sambil tetap mempertahankan nama tabel sumber dalam pesan Kafka. Untuk informasi selengkapnya, lihat Message Queue for Apache Kafka.Secara default, semua data ditulis ke partisi 0 topik. Anda dapat menyesuaikan perilaku ini melalui konfigurasi
partition.strategy. Untuk informasi selengkapnya, lihat Message Queue for Apache Kafka. Sebagai contoh, pengaturan partition.strategy: hash-by-key menulis data dari setiap tabel ke beberapa partisi berdasarkan nilai hash kunci primer, sehingga data dengan kunci primer yang sama dikirim ke partisi yang sama dan tetap terurut.Alibaba Cloud Message Queue for Apache Kafka tidak mendukung penulisan idempoten atau transaksional. Saat menggunakannya sebagai tujuan ingesti data, Anda harus menambahkan item konfigurasi
properties.enable.idempotence: falseke konfigurasi sink untuk menonaktifkan fitur idempotensi.
Ingesti real-time dari Kafka ke DLF
Menyinkronkan data binary logging MySQL ke Kafka memberikan solusi untuk menyimpan data MySQL di Kafka. Selanjutnya, Anda dapat mengonfigurasi pekerjaan YAML ingesti data untuk menyinkronkan data tersebut ke penyimpanan DLF.
Misalnya, topik Kafka bernama inventory berisi data dari dua tabel, customers dan products, dalam format debezium-json. Pekerjaan berikut menyinkronkan data dari kedua tabel tersebut ke tabel yang sesuai di DLF:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Opsional) Nama pengguna untuk mengirimkan pekerjaan. Tetapkan nama pengguna unik untuk setiap pekerjaan guna menghindari konflik.
commit.user: your_job_name
# (Opsional) Aktifkan deletion vectors untuk meningkatkan kinerja baca.
table.properties.deletion-vectors.enabled: true
# debezium-json tidak menyertakan informasi kunci primer. Anda harus menambahkan kunci primer ke tabel.
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idSumber data Kafka mendukung pembacaan data dalam format canal-json dan debezium-json (default).
Jika format datanya adalah debezium-json, Anda harus menambahkan kunci primer ke tabel secara manual menggunakan aturan transformasi:
transform: - source-table: \.*.\.* projection: \* primary-keys: idJika data satu tabel didistribusikan ke beberapa partisi, atau jika tabel-tabel dari partisi berbeda perlu digabung setelah sharding, tetapkan item konfigurasi debezium-json.distributed-tables atau canal-json.distributed-tables menjadi true.
Sumber data Kafka mendukung beberapa kebijakan inferensi skema. Anda dapat menetapkan kebijakan tersebut melalui item konfigurasi schema.inference.strategy. Untuk informasi selengkapnya tentang kebijakan inferensi skema dan sinkronisasi perubahan, lihat Message Queue for Apache Kafka.