本文為您介紹使用資料攝入YAML作業將即時資料寫入常用訊息佇列的最佳實務。
資料庫全增量即時寫入Kafka
通過資料攝入YAML作業將來自MySQL的資料匯入到Kafka中後,您可以按照需要再將資料分發到下遊的不同系統中,避免多個作業直接連接業務庫,降低業務庫處理壓力。
MySQL Binlog資料同步到Kafka
在某些情境下,您希望儲存原始的Binlog資料,方便後續的資料審計、資料重放等工作。資料攝入YAML支援同步MySQL原始Binlog資料到Kafka,方便您分布式讀取Binlog資料,解決資料熱點問題。
假設資料庫kafka_test中有兩張表customers和products,下面的作業可以分別將表資料同步到customers和products兩個topic中。
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: kafka_test.\.*
server-id: 8601-8604
#(可選)同步新增表的全量和增量資料
scan.newly-added-table.enabled: true
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
# 將資料的變更時間作為中繼資料下發
metadata-column.include-list: op_ts
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
# 阿里雲Kafka不支援等冪和事務寫入,關閉等冪功能
properties.enable.idempotence: false
#(可選)設定上遊表與Kafka topic的映射關係
sink.tableId-to-topic.mapping: kafka_test.customers:customers;kafka_test.products:productscustomers表的一條Update語句產生Kafka訊息的訊息體格式如下:
// debezium-json
{
"before": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
},
"after": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
},
"op": "u",
"source": {
"db": "kafka_test",
"table": "customers",
"ts_ms": 1728528674000
}
}
// canal-json
{
"old": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
}
],
"data": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
}
],
"type": "UPDATE",
"database": "kafka_test",
"table": "customers",
"pkNames": [
"id"
],
"ts": 1728528674000,
"es": 0
}寫入的Binlog格式支援json、canal-json和debezium-json(預設),詳情請參見訊息佇列Kafka。
如果不使用sink.tableId-to-topic.mapping參數,在Kafka中會使用database.table的格式建立topic。例如MySQL表
kafka_test.customers在Kafka中對應的topic名稱為kafka_test.customers。使用sink.tableId-to-topic.mapping可以配置上遊表與寫入topic的映射關係,能夠在寫出Kafka訊息保留源表表名的同時修改寫出的topic。詳情請參見訊息佇列Kafka。預設所有資料寫入Topic的0號分區,可以使用
partition.strategy配置進行調整,詳情請參見訊息佇列Kafka。例如可使用如下配置:partition.strategy: hash-by-key,每個表的資料會根據主鍵的雜湊值將資料寫到多個分區,並保證同一個主鍵的資料在同一個分區並且有序。阿里雲訊息佇列Kafka版不支援等冪和事務寫入,作為資料攝入目標端時,需要在資料攝入目標端添加配置項
properties.enable.idempotence: false以關閉等冪寫入功能。
Kafka即時攝入到資料湖DLF
MySQL Binlog資料同步到Kafka為您提供了在Kafka中儲存MySQL資料的方案,進一步的,您可以配置資料攝入YAML同步Kafka資料到DLF儲存。
假設Kafka中名為inventory的topic中存有debezium-json格式的兩張表customers和products的資料,下面的作業可以將兩張表的資料分別同步到DLF對應的兩張表中:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可選)提交使用者名稱,建議為不同作業設定不同的提交使用者以避免衝突
commit.user: your_job_name
#(可選)開啟刪除向量,提升讀取效能
table.properties.deletion-vectors.enabled: true
# debezium-json不包含主鍵資訊,需要另外為表添加主鍵
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idKafka資料來源讀取的格式支援canal-json和debezium-json(預設)。
當資料格式為debezium-json時,需要通過transform規則手動為表添加主鍵:
transform: - source-table: \.*.\.* projection: \* primary-keys: id當單表的資料分布在多個分區中,或資料位元於不同分區中的表需要進行分庫分表合并時,需要將配置項debezium-json.distributed-tables或canal-json.distributed-tables設為true。
kafka資料來源支援多種Schema推導策略,可以通過配置項schema.inference.strategy設定,Schema推導和變更同步策略詳情請參見訊息佇列Kafka。