本文將向您介紹基於Flink CDC的資料攝入作業的基本結構及重要參數。
作業結構樣本
一個典型的Flink CDC資料攝入作業由以下模組組成:
例如,一個從MySQL寫入Paimon的作業結構如下所示:
# MySQL Source 模組
source:
type: mysql
name: MySQL Source
host: localhost
port: 3306
username: admin
password: <yourPassword>
tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*, mydb.\\.*
# Paimon Sink 模組
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse
# Transform 模組
transform:
- source-table: mydb.app_order_.*
projection: id, order_id, TO_UPPER(product_name)
filter: id > 10 AND order_id > 100
primary-keys: id
partition-keys: product_name
table-options: comment=app order
description: project fields from source table
converter-after-transform: SOFT_DELETE
- source-table: mydb.web_order_.*
projection: CONCAT(id, order_id) as uniq_id, *
filter: uniq_id > 10
description: add new uniq_id for each row
# Route 模組
route:
- source-table: mydb.default.app_order_.*
sink-table: odsdb.default.app_order
description: sync all table shards to one
- source-table: mydb.default.web_order
sink-table: odsdb.default.ods_web_order
description: sync table to with given prefix ods_
# Pipeline 模組
pipeline:
name: source-database-sync-pipe
schema.change.behavior: evolveSource(源端)模組
Source模組定義Flink CDC資料攝入作業的資料來源端,目前支援的連接器包括訊息佇列Kafka、MySQL、MongoDB、Log ServiceSLS。
文法結構
source:
type: mysql
name: mysql source
xxx: ...各連接器支援的具體配置請查看對應連接器的文檔。
Sink(目標端)模組
Sink模組定義Flink CDC資料攝入作業的目標端,目前支援的連接器包括訊息佇列Kafka、Upsert Kafka、即時數倉Hologres、流式資料湖倉Paimon、StarRocks、MaxCompute和Print。
文法結構
sink:
type: hologres
name: hologres sink
xxx: ...各連接器支援的具體配置請查看對應連接器的文檔。
Transform模組
您可以在Flink CDC資料攝入作業的Transform模組中定義若干規則,從而實現源表中資料的投影、計算和過濾等功能。
文法結構
transform:
- source-table: db.tbl1
projection: ...
filter: ...
- source-table: db.tbl2
projection: ...
filter: ...具體運算式文法請參考Transform模組文檔。
Route模組
您可以在Flink CDC資料攝入作業的Route模組中定義若干規則,從而實現上遊分庫分表的合并、廣播等。
文法結構
route:
- source-table: db.tbl1
sink-table: sinkdb.tbl1
- source-table: db.tbl2
sink-table: sinkdb.tbl2具體運算式文法請參考Route模組文檔。
Pipeline模組
您可以在Flink CDC資料攝入作業的Pipeline模組中配置一些全域參數。
文法結構
pipeline:
name: CDC YAML job
schema.change.behavior: LENIENT可配置的選項請參考Pipeline模組文檔。