全部產品
Search
文件中心

Realtime Compute for Apache Flink:PolarDB-X CDC(公測中)

更新時間:Feb 06, 2026

本文為您介紹如何使用PolarDB-X連接器。

背景資訊

PolarDB 分布式版(PolarDB for Xscale,簡稱“PolarDB-X”)是阿里雲自主設計研發的高效能雲原生分散式資料庫產品,為使用者提供高吞吐、大儲存、低延時、易擴充和超高可用的雲時代資料庫服務。

重要

僅支援VVR 11.5及以上版本,配合PolarDB-X 2.0及更高版本使用。

目前,PolarDB-X CDC連接器只支援作為源表使用。如您需要對PolarDB-X執行個體進行維表查詢、或作為結果表寫入,請使用MySQL連接器(公測中)。

類別

詳情

支援類型

源表

運行模式

僅支援流模式

資料格式

暫不適用

特有監控指標

  • currentFetchEventTimeLag:資料產生到拉取到Source Operator的間隔。

    該指標僅在Binlog階段有效,Snapshot階段該值恒為0。

  • currentEmitEventTimeLag:資料產生到離開Source Operator的間隔。

    該指標僅在Binlog階段有效,Snapshot階段該值恒為0。

  • sourceIdleTime:源表至今有多久未產生新資料。

API種類

SQL

是否支援更新或刪除結果表資料

特色功能

PolarDB-X CDC連接器針對Binlog解析階段進行效能最佳化,支援PolarDB-X伺服器側對無關Binlog進行過濾和裁剪,從而提升輸送量、節省網路頻寬。

Binlog按需訂閱樣本

此版本支援在服務端對Binlog進行過濾、只發送所需的變更日誌給用戶端,從而起到降低網路流量壓力、提升日誌消費吞吐的最佳化作用。

例如,如果您只需訂閱PolarDB-X伺服器中db.table1db.table2表的變更資料,可以像這樣配置Flink SQL作業:

CREATE TABLE polardbx_table_foo (
  ... -- 在這裡定義表結構
) WITH (
  'connector' = 'polardbx-cdc',
  'database-name' = 'db',
  'table-name' = '.*',
  ..., -- 其他參數
  'polardbx.binlog.include.tables' = 'db.table1,db.table2' -- 只訂閱對應表的資料
);

相較於MySQL CDC連接器會將整個執行個體中全量的變更Binlog日誌載入到本地、並在用戶端進行過濾,PolarDB-X CDC連接器具備服務端過濾Binlog、用戶端按需訂閱Binlog的能力,能夠大幅減少網路IO開銷。

使用限制

如您需要使用服務端Binlog服務端過濾、按表訂閱功能,需要保證PolarDB-X服務端版本為2.5.0及以上,且Log Service組件為5.4.20或更高版本。

SQL

文法結構

CREATE TABLE polardbx_customer_table(
  `id` STRING,
  [columnName dataType,]*
  PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
  'connector' = 'polardbx-cdc',
  'hosts' = 'pxc-**************-pub.polarx.rds.aliyuncs.com',
  'username' = 'pdx_user',
  'password' = 'pdx_password',
  'database' = 'full_db',
  'collection' = 'customers'
)

WITH參數

參數

說明

資料類型

是否必填

預設值

備忘

connector

連接器名稱。

STRING

固定值為polardbx-cdc。

hostname

PolarDB-X資料庫的IP地址或者Hostname。

STRING

建議填寫執行個體串連資訊中的叢集地址。

port

PolarDB-X資料庫的服務連接埠號碼。

INTEGER

3306

無。

username

PolarDB-X資料庫服務的使用者名稱。

STRING

無。

password

PolarDB-X資料庫服務的密碼。

STRING

無。

database-name

PolarDB-X資料庫名稱。

STRING

支援使用Regex匹配讀取多個資料庫的資料。

說明

使用Regex時,不要使用^$符號匹配開頭和結尾。

table-name

PolarDB-X表名。

STRING

支援使用Regex匹配讀取多張表的資料。

說明

使用Regex時,不要使用^$符號匹配開頭和結尾。

server-time-zone

資料庫在使用的會話時區。

STRING

作業運行環境可用性區域時區。

指定 IANA 時區標識符,例如 Asia/Shanghai。該參數控制了源表中的TIMESTAMP類型如何轉換為STRING類型。

scan.incremental.snapshot.chunk.size

增量快照分塊讀取時,每個chunk的大小(包含的行數)。

INTEGER

8096

PolarDB-X 將表切分為多個分區(Chunk)進行讀取,並在記憶體中緩衝分區資料。減少單分區行數會增加分區總量。這雖然細化了故障恢複粒度,但也增加了記憶體溢出(OOM)風險並降低輸送量。請配置合理的分區大小以平衡效能。

scan.snapshot.fetch.size

當讀取表的全量資料時,每次最多拉取的記錄數。

INTEGER

1024

無。

connect.timeout

串連PolarDB-X資料庫伺服器逾時時,重試串連之前等待逾時的最長時間。

DURATION

30s

無。

connection.pool.size

資料庫連接池大小。

INTEGER

20

資料庫連接池用於複用串連,可以降低資料庫連接數量。

connect.max-retries

串連MySQL資料庫服務時,串連失敗後重試的最大次數。

INTEGER

3

無。

scan.startup.mode

消費資料時的啟動模式。

STRING

initial

參數取值如下:

  • initial(預設):在第一次啟動時,會先掃描歷史全量資料,然後讀取最新的Binlog資料。

  • latest-offset:在第一次啟動時,不會掃描歷史全量資料,直接從Binlog的末尾(最新的Binlog處)開始讀取,即唯讀取該連接器啟動以後的最新變更。

  • earliest-offset:不掃描歷史全量資料,直接從可讀取的最早Binlog開始讀取。

  • specific-offset:不掃描歷史全量資料,從您指定的Binlog位點啟動,位點可通過同時配置scan.startup.specific-offset.filescan.startup.specific-offset.pos參數來指定從特定Binlog檔案名稱和位移量啟動,也可以只配置scan.startup.specific-offset.gtid-set來指定從某個GTID集合啟動。

  • timestamp:不掃描歷史全量資料,從指定的時間戳記開始讀取Binlog。時間戳記通過scan.startup.timestamp-millis指定,單位為毫秒。

重要

對於earliest-offsetspecific-offsettimestamp啟動模式,啟動時的表結構必須與指錨點一致。結構不匹配將導致作業報錯。請確保在指定 Binlog 位點至作業啟動期間,表結構未發生變更。

scan.startup.specific-offset.file

使用指錨點模式啟動時,啟動位點的Binlog檔案名稱。

STRING

使用該配置時,scan.startup.mode必須配置為specific-offset。檔案名稱格式例如mysql-bin.000003

scan.startup.specific-offset.pos

使用指錨點模式啟動時,啟動位點在指定Binlog檔案中的位移量。

INTEGER

使用該配置時,scan.startup.mode必須配置為specific-offset

scan.startup.specific-offset.gtid-set

使用指錨點模式啟動時,啟動位點的GTID集合。

STRING

使用該配置時,scan.startup.mode必須配置為specific-offset。GTID集合格式例如24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

scan.startup.timestamp-millis

使用指定時間模式啟動時,啟動位點的毫秒時間戳記。

LONG

使用該配置時,scan.startup.mode必須配置為timestamp。時間戳記單位為毫秒。

scan.startup.specific-offset.skip-events

從指定的位點讀取時,跳過多少Binlog事件。

INTEGER

使用該配置時,scan.startup.mode必須配置為specific-offset

scan.startup.specific-offset.skip-rows

從指定的位點讀取時,跳過多少行變更(一個Binlog事件可能對應多行變更)。

INTEGER

使用該配置時,scan.startup.mode必須配置為specific-offset

heartbeat.interval

Source通過心跳事件推動Binlog位點前進的時間間隔。

DURATION

心跳事件強制推進 Source 端的 Binlog 位點。此機制防止低頻更新導致 Binlog 到期。Binlog 到期將引發作業失敗,且僅能通過無狀態重啟恢複。

chunk-meta.group.size

chunk元資訊的大小。

INTEGER

1000

如果元資訊大於該值,元資訊會分為多份傳遞。

chunk-key.even-distribution.factor.upper-bound

是否可以均勻分區的chunk分布因子的上限。

DOUBLE

1000.0

分布因子大於該值會使用非均勻分區。

chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 總資料行數。

chunk-key.even-distribution.factor.lower-bound

是否可以均勻分區的chunk分布因子的下限。

DOUBLE

0.05

分布因子小於該值會使用非均勻分區。

chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 總資料行數。

scan.newly-added-table.enabled

從Checkpoint重啟時,是否掃描新增的捕獲表。

BOOLEAN

false

啟用後,系統將同步此前未匹配的新增表,並從狀態中移除不再匹配的表。從Checkpoint或Savepoint重啟時生效。

scan.incremental.snapshot.chunk.key-column

指定快照階段用於資料分區的列。

STRING

見備忘

  • 無主鍵表必填,選擇的列必須是非空類型(NOT NULL)。

  • 有主鍵的表為選填,僅支援從主鍵中選擇一列。

scan.incremental.close-idle-reader.enabled

是否在快照結束後關閉閒置 Reader。

BOOLEAN

false

該配置生效需要同時設定execution.checkpointing.checkpoints-after-tasks-finish.enabled為true。

scan.incremental.snapshot.backfill.skip

是否在快照讀取階段跳過backfill。

BOOLEAN

false

參數取值如下:

  • true:快照讀取階段跳過backfill。

  • false(預設):快照讀取階段不跳過backfill。

如果跳過backfill,快照階段表的更改將在稍後的增量階段讀取,而不是合并到快照中。

重要

跳過backfill可能導致資料不一致,因為快照階段發生的變更可能會被重放,僅保證at-least-once語義。

scan.parse.online.schema.changes.enabled

在增量階段,是否嘗試解析 RDS 無鎖變更 DDL 事件。

BOOLEAN

false

參數取值如下:

  • true:解析 RDS 無鎖變更 DDL 事件。

  • false(預設):不解析 RDS 無鎖變更 DDL 事件。

實驗性功能。建議在執行線上無鎖變更前,先對Flink作業建立一個Savepoint以便恢複。

scan.only.deserialize.captured.tables.changelog.enabled

在增量階段,是否僅對指定表的變更事件進行還原序列化。

BOOLEAN

true

參數取值如下:

  • true:僅對目標表的變更資料進行還原序列化,加快Binlog讀取速度。

  • false(預設):對所有表的變更資料進行還原序列化。

scan.read-changelog-as-append-only.enabled

是否將changelog資料流轉換為append-only資料流。

BOOLEAN

false

參數取值如下:

  • true:所有類型的訊息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都會轉換成INSERT類型的訊息。僅在需要儲存上遊表刪除訊息等特殊情境下開啟使用。

  • false(預設):所有類型的訊息都保持原樣下發。

scan.parallel-deserialize-changelog.enabled

在增量階段,是否使用多線程對變更事件進行解析。

BOOLEAN

false

參數取值如下:

  • true:在變更事件的還原序列化階段採用多執行緒,同時保證Binlog事件順序不變,從而加快讀取速度。

  • false(預設):在事件的還原序列化階段使用單線程處理。

scan.parallel-deserialize-changelog.handler.size

多線程對變更事件進行解析時,事件處理器的數量。

INTEGER

2

無。

scan.incremental.snapshot.unbounded-chunk-first.enabled

快照讀取階段是否先分發無界的分區。

BOOLEAN

false

參數取值如下:

  • true:快照讀取階段優先分發無界的分區。

  • false(預設):快照讀取階段不優先分發無界的分區。

實驗性功能。開啟後能夠降低TaskManager在快照階段同步最後一個分區時遇到記憶體溢出 (OOM) 的風險,建議在作業第一次啟動前添加。

polardbx.binlog.ignore.archive-events.enabled

是否忽略 PolarDB-X Binlog 中的歸檔事件(主要是 `DELETE` 事件)。

BOOLEAN

false

polardbx.binlog.ignore.query-events.enabled

是否忽略 PolarDB-X Binlog 中的ROWS_QUERY_LOG_EVENT事件。

BOOLEAN

false

polardbx.binlog.include.tables

僅訂閱這些表的Binlog日誌。多個表名之間用逗號(`,`)分隔。

STRING

polardbx.binlog.exclude.tables

不訂閱這些表的Binlog日誌。多個表名之間用逗號(`,`)分隔。

STRING

類型映射

PolarDB-X欄位類型

Flink欄位類型

TINYINT

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

INT

INT

MEDIUMINT

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

BOOLEAN

BOOLEAN

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)] [WITHOUT TIME ZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIME ZONE]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)] WITH LOCAL TIME ZONE

CHAR(n)

STRING

VARCHAR(n)

TEXT

BINARY

BYTES

VARBINARY

BLOB