本文為您介紹如何使用PolarDB-X連接器。
背景資訊
PolarDB 分布式版(PolarDB for Xscale,簡稱“PolarDB-X”)是阿里雲自主設計研發的高效能雲原生分散式資料庫產品,為使用者提供高吞吐、大儲存、低延時、易擴充和超高可用的雲時代資料庫服務。
僅支援VVR 11.5及以上版本,配合PolarDB-X 2.0及更高版本使用。
目前,PolarDB-X CDC連接器只支援作為源表使用。如您需要對PolarDB-X執行個體進行維表查詢、或作為結果表寫入,請使用MySQL連接器(公測中)。
類別 | 詳情 |
支援類型 | 源表 |
運行模式 | 僅支援流模式 |
資料格式 | 暫不適用 |
特有監控指標 |
|
API種類 | SQL |
是否支援更新或刪除結果表資料 | 否 |
特色功能
PolarDB-X CDC連接器針對Binlog解析階段進行效能最佳化,支援PolarDB-X伺服器側對無關Binlog進行過濾和裁剪,從而提升輸送量、節省網路頻寬。
Binlog按需訂閱樣本
此版本支援在服務端對Binlog進行過濾、只發送所需的變更日誌給用戶端,從而起到降低網路流量壓力、提升日誌消費吞吐的最佳化作用。
例如,如果您只需訂閱PolarDB-X伺服器中db.table1和db.table2表的變更資料,可以像這樣配置Flink SQL作業:
CREATE TABLE polardbx_table_foo (
... -- 在這裡定義表結構
) WITH (
'connector' = 'polardbx-cdc',
'database-name' = 'db',
'table-name' = '.*',
..., -- 其他參數
'polardbx.binlog.include.tables' = 'db.table1,db.table2' -- 只訂閱對應表的資料
);相較於MySQL CDC連接器會將整個執行個體中全量的變更Binlog日誌載入到本地、並在用戶端進行過濾,PolarDB-X CDC連接器具備服務端過濾Binlog、用戶端按需訂閱Binlog的能力,能夠大幅減少網路IO開銷。
使用限制
如您需要使用服務端Binlog服務端過濾、按表訂閱功能,需要保證PolarDB-X服務端版本為2.5.0及以上,且Log Service組件為5.4.20或更高版本。
SQL
文法結構
CREATE TABLE polardbx_customer_table(
`id` STRING,
[columnName dataType,]*
PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
'connector' = 'polardbx-cdc',
'hosts' = 'pxc-**************-pub.polarx.rds.aliyuncs.com',
'username' = 'pdx_user',
'password' = 'pdx_password',
'database' = 'full_db',
'collection' = 'customers'
)WITH參數
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 連接器名稱。 | STRING | 是 | 無 | 固定值為polardbx-cdc。 |
hostname | PolarDB-X資料庫的IP地址或者Hostname。 | STRING | 是 | 無 | 建議填寫執行個體串連資訊中的叢集地址。 |
port | PolarDB-X資料庫的服務連接埠號碼。 | INTEGER | 否 | 3306 | 無。 |
username | PolarDB-X資料庫服務的使用者名稱。 | STRING | 是 | 無 | 無。 |
password | PolarDB-X資料庫服務的密碼。 | STRING | 是 | 無 | 無。 |
database-name | PolarDB-X資料庫名稱。 | STRING | 是 | 無 | 支援使用Regex匹配讀取多個資料庫的資料。 說明 使用Regex時,不要使用^和$符號匹配開頭和結尾。 |
table-name | PolarDB-X表名。 | STRING | 是 | 無 | 支援使用Regex匹配讀取多張表的資料。 說明 使用Regex時,不要使用^和$符號匹配開頭和結尾。 |
server-time-zone | 資料庫在使用的會話時區。 | STRING | 否 | 作業運行環境可用性區域時區。 | 指定 IANA 時區標識符,例如 Asia/Shanghai。該參數控制了源表中的TIMESTAMP類型如何轉換為STRING類型。 |
scan.incremental.snapshot.chunk.size | 增量快照分塊讀取時,每個chunk的大小(包含的行數)。 | INTEGER | 否 | 8096 | PolarDB-X 將表切分為多個分區(Chunk)進行讀取,並在記憶體中緩衝分區資料。減少單分區行數會增加分區總量。這雖然細化了故障恢複粒度,但也增加了記憶體溢出(OOM)風險並降低輸送量。請配置合理的分區大小以平衡效能。 |
scan.snapshot.fetch.size | 當讀取表的全量資料時,每次最多拉取的記錄數。 | INTEGER | 否 | 1024 | 無。 |
connect.timeout | 串連PolarDB-X資料庫伺服器逾時時,重試串連之前等待逾時的最長時間。 | DURATION | 否 | 30s | 無。 |
connection.pool.size | 資料庫連接池大小。 | INTEGER | 否 | 20 | 資料庫連接池用於複用串連,可以降低資料庫連接數量。 |
connect.max-retries | 串連MySQL資料庫服務時,串連失敗後重試的最大次數。 | INTEGER | 否 | 3 | 無。 |
scan.startup.mode | 消費資料時的啟動模式。 | STRING | 否 | initial | 參數取值如下:
重要 對於earliest-offset,specific-offset和timestamp啟動模式,啟動時的表結構必須與指錨點一致。結構不匹配將導致作業報錯。請確保在指定 Binlog 位點至作業啟動期間,表結構未發生變更。 |
scan.startup.specific-offset.file | 使用指錨點模式啟動時,啟動位點的Binlog檔案名稱。 | STRING | 否 | 無 | 使用該配置時,scan.startup.mode必須配置為specific-offset。檔案名稱格式例如 |
scan.startup.specific-offset.pos | 使用指錨點模式啟動時,啟動位點在指定Binlog檔案中的位移量。 | INTEGER | 否 | 無 | 使用該配置時,scan.startup.mode必須配置為specific-offset。 |
scan.startup.specific-offset.gtid-set | 使用指錨點模式啟動時,啟動位點的GTID集合。 | STRING | 否 | 無 | 使用該配置時,scan.startup.mode必須配置為specific-offset。GTID集合格式例如 |
scan.startup.timestamp-millis | 使用指定時間模式啟動時,啟動位點的毫秒時間戳記。 | LONG | 否 | 無 | 使用該配置時,scan.startup.mode必須配置為timestamp。時間戳記單位為毫秒。 |
scan.startup.specific-offset.skip-events | 從指定的位點讀取時,跳過多少Binlog事件。 | INTEGER | 否 | 無 | 使用該配置時,scan.startup.mode必須配置為specific-offset。 |
scan.startup.specific-offset.skip-rows | 從指定的位點讀取時,跳過多少行變更(一個Binlog事件可能對應多行變更)。 | INTEGER | 否 | 無 | 使用該配置時,scan.startup.mode必須配置為specific-offset。 |
heartbeat.interval | Source通過心跳事件推動Binlog位點前進的時間間隔。 | DURATION | 否 | 無 | 心跳事件強制推進 Source 端的 Binlog 位點。此機制防止低頻更新導致 Binlog 到期。Binlog 到期將引發作業失敗,且僅能通過無狀態重啟恢複。 |
chunk-meta.group.size | chunk元資訊的大小。 | INTEGER | 否 | 1000 | 如果元資訊大於該值,元資訊會分為多份傳遞。 |
chunk-key.even-distribution.factor.upper-bound | 是否可以均勻分區的chunk分布因子的上限。 | DOUBLE | 否 | 1000.0 | 分布因子大於該值會使用非均勻分區。 chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 總資料行數。 |
chunk-key.even-distribution.factor.lower-bound | 是否可以均勻分區的chunk分布因子的下限。 | DOUBLE | 否 | 0.05 | 分布因子小於該值會使用非均勻分區。 chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 總資料行數。 |
scan.newly-added-table.enabled | 從Checkpoint重啟時,是否掃描新增的捕獲表。 | BOOLEAN | 否 | false | 啟用後,系統將同步此前未匹配的新增表,並從狀態中移除不再匹配的表。從Checkpoint或Savepoint重啟時生效。 |
scan.incremental.snapshot.chunk.key-column | 指定快照階段用於資料分區的列。 | STRING | 見備忘 | 無 |
|
scan.incremental.close-idle-reader.enabled | 是否在快照結束後關閉閒置 Reader。 | BOOLEAN | 否 | false | 該配置生效需要同時設定execution.checkpointing.checkpoints-after-tasks-finish.enabled為true。 |
scan.incremental.snapshot.backfill.skip | 是否在快照讀取階段跳過backfill。 | BOOLEAN | 否 | false | 參數取值如下:
如果跳過backfill,快照階段表的更改將在稍後的增量階段讀取,而不是合并到快照中。 重要 跳過backfill可能導致資料不一致,因為快照階段發生的變更可能會被重放,僅保證at-least-once語義。 |
scan.parse.online.schema.changes.enabled | 在增量階段,是否嘗試解析 RDS 無鎖變更 DDL 事件。 | BOOLEAN | 否 | false | 參數取值如下:
實驗性功能。建議在執行線上無鎖變更前,先對Flink作業建立一個Savepoint以便恢複。 |
scan.only.deserialize.captured.tables.changelog.enabled | 在增量階段,是否僅對指定表的變更事件進行還原序列化。 | BOOLEAN | 否 | true | 參數取值如下:
|
scan.read-changelog-as-append-only.enabled | 是否將changelog資料流轉換為append-only資料流。 | BOOLEAN | 否 | false | 參數取值如下:
|
scan.parallel-deserialize-changelog.enabled | 在增量階段,是否使用多線程對變更事件進行解析。 | BOOLEAN | 否 | false | 參數取值如下:
|
scan.parallel-deserialize-changelog.handler.size | 多線程對變更事件進行解析時,事件處理器的數量。 | INTEGER | 否 | 2 | 無。 |
scan.incremental.snapshot.unbounded-chunk-first.enabled | 快照讀取階段是否先分發無界的分區。 | BOOLEAN | 否 | false | 參數取值如下:
實驗性功能。開啟後能夠降低TaskManager在快照階段同步最後一個分區時遇到記憶體溢出 (OOM) 的風險,建議在作業第一次啟動前添加。 |
polardbx.binlog.ignore.archive-events.enabled | 是否忽略 PolarDB-X Binlog 中的歸檔事件(主要是 `DELETE` 事件)。 | BOOLEAN | 否 | false | |
polardbx.binlog.ignore.query-events.enabled | 是否忽略 PolarDB-X Binlog 中的ROWS_QUERY_LOG_EVENT事件。 | BOOLEAN | 否 | false | |
polardbx.binlog.include.tables | 僅訂閱這些表的Binlog日誌。多個表名之間用逗號(`,`)分隔。 | STRING | 否 | 無 | |
polardbx.binlog.exclude.tables | 不訂閱這些表的Binlog日誌。多個表名之間用逗號(`,`)分隔。 | STRING | 否 | 無 |
類型映射
PolarDB-X欄位類型 | Flink欄位類型 |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
INT | INT |
MEDIUMINT | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] | |
BOOLEAN | BOOLEAN |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] [WITHOUT TIME ZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIME ZONE] |
TIMESTAMP [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] WITH LOCAL TIME ZONE | |
CHAR(n) | STRING |
VARCHAR(n) | |
TEXT | |
BINARY | BYTES |
VARBINARY | |
BLOB |