全部产品
Search
文档中心

实时计算Flink版:PolarDB-X CDC(公测中)

更新时间:Feb 05, 2026

本文为您介绍如何使用PolarDB-X连接器。

背景信息

PolarDB 分布式版(PolarDB for Xscale,简称“PolarDB-X”)是阿里云自主设计研发的高性能云原生分布式数据库产品,为用户提供高吞吐、大存储、低延时、易扩展和超高可用的云时代数据库服务。

重要

仅支持VVR 11.5及以上版本,配合PolarDB-X 2.0及更高版本使用。

目前,PolarDB-X CDC连接器只支持作为源表使用。如您需要对PolarDB-X实例进行维表查询、或作为结果表写入,请使用MySQL连接器(公测中)。

类别

详情

支持类型

源表

运行模式

仅支持流模式

数据格式

暂不适用

特有监控指标

  • currentFetchEventTimeLag:数据产生到拉取到Source Operator的间隔。

    该指标仅在Binlog阶段有效,Snapshot阶段该值恒为0。

  • currentEmitEventTimeLag:数据产生到离开Source Operator的间隔。

    该指标仅在Binlog阶段有效,Snapshot阶段该值恒为0。

  • sourceIdleTime:源表至今有多久未产生新数据。

API种类

SQL

是否支持更新或删除结果表数据

特色功能

PolarDB-X CDC连接器针对Binlog解析阶段进行性能优化,支持PolarDB-X服务器侧对无关Binlog进行过滤和裁剪,从而提升吞吐量、节省网络带宽。

Binlog按需订阅示例

此版本支持在服务端对Binlog进行过滤、只发送所需的变更日志给客户端,从而起到降低网络流量压力、提升日志消费吞吐的优化作用。

例如,如果您只需订阅PolarDB-X服务器中db.table1db.table2表的变更数据,可以像这样配置Flink SQL作业:

CREATE TABLE polardbx_table_foo (
  ... -- 在这里定义表结构
) WITH (
  'connector' = 'polardbx-cdc',
  'database-name' = 'db',
  'table-name' = '.*',
  ..., -- 其他参数
  'polardbx.binlog.include.tables' = 'db.table1,db.table2' -- 只订阅对应表的数据
);

相较于MySQL CDC连接器会将整个实例中全量的变更Binlog日志加载到本地、并在客户端进行过滤,PolarDB-X CDC连接器具备服务端过滤Binlog、客户端按需订阅Binlog的能力,能够大幅减少网络IO开销。

使用限制

如您需要使用服务端Binlog服务端过滤、按表订阅功能,需要保证PolarDB-X服务端版本为2.5.0及以上,且日志服务组件为5.4.20或更高版本。

SQL

语法结构

CREATE TABLE polardbx_customer_table(
  `id` STRING,
  [columnName dataType,]*
  PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
  'connector' = 'polardbx-cdc',
  'hosts' = 'pxc-**************-pub.polarx.rds.aliyuncs.com',
  'username' = 'pdx_user',
  'password' = 'pdx_password',
  'database' = 'full_db',
  'collection' = 'customers'
)

WITH参数

参数

说明

数据类型

是否必填

默认值

备注

connector

连接器名称。

STRING

固定值为polardbx-cdc。

hostname

PolarDB-X数据库的IP地址或者Hostname。

STRING

建议填写实例连接信息中的集群地址。

port

PolarDB-X数据库的服务端口号。

INTEGER

3306

无。

username

PolarDB-X数据库服务的用户名。

STRING

无。

password

PolarDB-X数据库服务的密码。

STRING

无。

database-name

PolarDB-X数据库名称。

STRING

支持使用正则表达式匹配读取多个数据库的数据。

说明

使用正则表达式时,不要使用^$符号匹配开头和结尾。

table-name

PolarDB-X表名。

STRING

支持使用正则表达式匹配读取多张表的数据。

说明

使用正则表达式时,不要使用^$符号匹配开头和结尾。

server-time-zone

数据库在使用的会话时区。

STRING

作业运行环境可用区时区。

指定 IANA 时区标识符,例如 Asia/Shanghai。该参数控制了源表中的TIMESTAMP类型如何转换为STRING类型。

scan.incremental.snapshot.chunk.size

增量快照分块读取时,每个chunk的大小(包含的行数)。

INTEGER

8096

PolarDB-X 将表切分为多个分片(Chunk)进行读取,并在内存中缓存分片数据。减少单分片行数会增加分片总量。这虽然细化了故障恢复粒度,但也增加了内存溢出(OOM)风险并降低吞吐量。请配置合理的分片大小以平衡性能。

scan.snapshot.fetch.size

当读取表的全量数据时,每次最多拉取的记录数。

INTEGER

1024

无。

connect.timeout

连接PolarDB-X数据库服务器超时时,重试连接之前等待超时的最长时间。

DURATION

30s

无。

connection.pool.size

数据库连接池大小。

INTEGER

20

数据库连接池用于复用连接,可以降低数据库连接数量。

connect.max-retries

连接MySQL数据库服务时,连接失败后重试的最大次数。

INTEGER

3

无。

scan.startup.mode

消费数据时的启动模式。

STRING

initial

参数取值如下:

  • initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。

  • latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该连接器启动以后的最新变更。

  • earliest-offset:不扫描历史全量数据,直接从可读取的最早Binlog开始读取。

  • specific-offset:不扫描历史全量数据,从您指定的Binlog位点启动,位点可通过同时配置scan.startup.specific-offset.filescan.startup.specific-offset.pos参数来指定从特定Binlog文件名和偏移量启动,也可以只配置scan.startup.specific-offset.gtid-set来指定从某个GTID集合启动。

  • timestamp:不扫描历史全量数据,从指定的时间戳开始读取Binlog。时间戳通过scan.startup.timestamp-millis指定,单位为毫秒。

重要

对于earliest-offsetspecific-offsettimestamp启动模式,启动时的表结构必须与指定位点一致。结构不匹配将导致作业报错。请确保在指定 Binlog 位点至作业启动期间,表结构未发生变更。

scan.startup.specific-offset.file

使用指定位点模式启动时,启动位点的Binlog文件名。

STRING

使用该配置时,scan.startup.mode必须配置为specific-offset。文件名格式例如mysql-bin.000003

scan.startup.specific-offset.pos

使用指定位点模式启动时,启动位点在指定Binlog文件中的偏移量。

INTEGER

使用该配置时,scan.startup.mode必须配置为specific-offset

scan.startup.specific-offset.gtid-set

使用指定位点模式启动时,启动位点的GTID集合。

STRING

使用该配置时,scan.startup.mode必须配置为specific-offset。GTID集合格式例如24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

scan.startup.timestamp-millis

使用指定时间模式启动时,启动位点的毫秒时间戳。

LONG

使用该配置时,scan.startup.mode必须配置为timestamp。时间戳单位为毫秒。

scan.startup.specific-offset.skip-events

从指定的位点读取时,跳过多少Binlog事件。

INTEGER

使用该配置时,scan.startup.mode必须配置为specific-offset

scan.startup.specific-offset.skip-rows

从指定的位点读取时,跳过多少行变更(一个Binlog事件可能对应多行变更)。

INTEGER

使用该配置时,scan.startup.mode必须配置为specific-offset

heartbeat.interval

Source通过心跳事件推动Binlog位点前进的时间间隔。

DURATION

心跳事件强制推进 Source 端的 Binlog 位点。此机制防止低频更新导致 Binlog 过期。Binlog 过期将引发作业失败,且仅能通过无状态重启恢复。

chunk-meta.group.size

chunk元信息的大小。

INTEGER

1000

如果元信息大于该值,元信息会分为多份传递。

chunk-key.even-distribution.factor.upper-bound

是否可以均匀分片的chunk分布因子的上限。

DOUBLE

1000.0

分布因子大于该值会使用非均匀分片。

chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 总数据行数。

chunk-key.even-distribution.factor.lower-bound

是否可以均匀分片的chunk分布因子的下限。

DOUBLE

0.05

分布因子小于该值会使用非均匀分片。

chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 总数据行数。

scan.newly-added-table.enabled

从Checkpoint重启时,是否扫描新增的捕获表。

BOOLEAN

false

启用后,系统将同步此前未匹配的新增表,并从状态中移除不再匹配的表。从Checkpoint或Savepoint重启时生效。

scan.incremental.snapshot.chunk.key-column

指定快照阶段用于数据分片的列。

STRING

见备注

  • 无主键表必填,选择的列必须是非空类型(NOT NULL)。

  • 有主键的表为选填,仅支持从主键中选择一列。

scan.incremental.close-idle-reader.enabled

是否在快照结束后关闭空闲的 Reader。

BOOLEAN

false

该配置生效需要同时设置execution.checkpointing.checkpoints-after-tasks-finish.enabled为true。

scan.incremental.snapshot.backfill.skip

是否在快照读取阶段跳过backfill。

BOOLEAN

false

参数取值如下:

  • true:快照读取阶段跳过backfill。

  • false(默认):快照读取阶段不跳过backfill。

如果跳过backfill,快照阶段表的更改将在稍后的增量阶段读取,而不是合并到快照中。

重要

跳过backfill可能导致数据不一致,因为快照阶段发生的变更可能会被重放,仅保证at-least-once语义。

scan.parse.online.schema.changes.enabled

在增量阶段,是否尝试解析 RDS 无锁变更 DDL 事件。

BOOLEAN

false

参数取值如下:

  • true:解析 RDS 无锁变更 DDL 事件。

  • false(默认):不解析 RDS 无锁变更 DDL 事件。

实验性功能。建议在执行线上无锁变更前,先对Flink作业创建一个Savepoint以便恢复。

scan.only.deserialize.captured.tables.changelog.enabled

在增量阶段,是否仅对指定表的变更事件进行反序列化。

BOOLEAN

true

参数取值如下:

  • true:仅对目标表的变更数据进行反序列化,加快Binlog读取速度。

  • false(默认):对所有表的变更数据进行反序列化。

scan.read-changelog-as-append-only.enabled

是否将changelog数据流转换为append-only数据流。

BOOLEAN

false

参数取值如下:

  • true:所有类型的消息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都会转换成INSERT类型的消息。仅在需要保存上游表删除消息等特殊场景下开启使用。

  • false(默认):所有类型的消息都保持原样下发。

scan.parallel-deserialize-changelog.enabled

在增量阶段,是否使用多线程对变更事件进行解析。

BOOLEAN

false

参数取值如下:

  • true:在变更事件的反序列化阶段采用多线程处理,同时保证Binlog事件顺序不变,从而加快读取速度。

  • false(默认):在事件的反序列化阶段使用单线程处理。

scan.parallel-deserialize-changelog.handler.size

多线程对变更事件进行解析时,事件处理器的数量。

INTEGER

2

无。

scan.incremental.snapshot.unbounded-chunk-first.enabled

快照读取阶段是否先分发无界的分片。

BOOLEAN

false

参数取值如下:

  • true:快照读取阶段优先分发无界的分片。

  • false(默认):快照读取阶段不优先分发无界的分片。

实验性功能。开启后能够降低TaskManager在快照阶段同步最后一个分片时遇到内存溢出 (OOM) 的风险,建议在作业第一次启动前添加。

polardbx.binlog.ignore.archive-events.enabled

是否忽略 PolarDB-X Binlog 中的归档事件(主要是 `DELETE` 事件)。

BOOLEAN

false

polardbx.binlog.ignore.query-events.enabled

是否忽略 PolarDB-X Binlog 中的ROWS_QUERY_LOG_EVENT事件。

BOOLEAN

false

polardbx.binlog.include.tables

仅订阅这些表的Binlog日志。多个表名之间用逗号(`,`)分隔。

STRING

polardbx.binlog.exclude.tables

不订阅这些表的Binlog日志。多个表名之间用逗号(`,`)分隔。

STRING

类型映射

PolarDB-X字段类型

Flink字段类型

TINYINT

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

INT

INT

MEDIUMINT

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

BOOLEAN

BOOLEAN

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)] [WITHOUT TIME ZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIME ZONE]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)] WITH LOCAL TIME ZONE

CHAR(n)

STRING

VARCHAR(n)

TEXT

BINARY

BYTES

VARBINARY

BLOB