本文為您介紹使用資料攝入CDC YAML作業將即時資料寫入常用資料倉儲的最佳實務。
Flink CDC即時同步至Hologres數倉
使用資料攝入CDC YAML同步資料到Hologres搭建即時數倉,可以充分利用Flink強大的即時處理能力和Hologres提供的Binlog、行列共存和資源強隔離等能力,實現高效、可擴充的即時資料處理和分析。
MySQL即時同步至Hologres數倉
最基本的MySQL整庫同步Hologres的資料攝入CDC YAML作業如下所示:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN使用更寬容的類型映射
Hologres連接器無法處理列類型變更事件,但支援了多種類型映射關係。為了更好地支援資料來源的變更,您可以通過將多個MySQL資料類型映射到更寬的Hologres類型,跳過不必要的類型變更事件,從而讓作業正常運行。您可以通過配置項sink.type-normalize-strategy變更,預設值為STANDARD,詳情請見資料攝入YAML作業Hologres連接器類型映射。
例如,可以使用ONLY_BIGINT_OR_TEXT讓類型只對應到Hologres的int8和text類型。此時如果MySQL某個列的類型從INT改為BIGINT,Hologres將這兩種MySQL類型對應到int8類型,作業不會因為無法處理類型轉換而報錯。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
#(可選)同步表注釋和欄位注釋
include-comments.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)優先分發無界的分區以避免可能出現的TaskManager OutOfMemory問題
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可選)開啟解析過濾,加速讀取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT分區表寫入
資料攝入YAML在使用Hologres連接器作為目標端時支援寫入分區表,詳情請參見分區表寫入。
Kafka即時同步至Hologres數倉
實現即時分發為您提供了在Kafka中儲存MySQL資料的方案。進一步的,資料攝入YAML支援同步Kafka資料到Hologres搭建即時數倉。
假設Kafka中名為inventory的topic中存有debezium-json格式的兩張表customers和products的資料,下面的作業可以將兩張表的資料分別同步到Hologres對應的兩張表中。
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idKafka資料來源讀取的格式支援json、canal-json和debezium-json(預設)。
當資料格式為debezium-json時,需要通過transform規則手動為表添加主鍵:
transform: - source-table: \.*.\.* projection: \* primary-keys: id當單表的資料分布在多個分區中,或資料位元於不同分區中的表需要進行分庫分表合并時,需要將配置項debezium-json.distributed-tables或canal-json.distributed-tables設為true。
kafka資料來源支援多種Schema推導策略,可以通過配置項schema.inference.strategy設定,Schema推導和變更同步策略詳情請參見訊息佇列Kafka。