本文为您介绍如何使用PolarDB-X连接器。
背景信息
PolarDB 分布式版(PolarDB for Xscale,简称“PolarDB-X”)是阿里云自主设计研发的高性能云原生分布式数据库产品,为用户提供高吞吐、大存储、低延时、易扩展和超高可用的云时代数据库服务。
仅支持VVR 11.5及以上版本,配合PolarDB-X 2.0及更高版本使用。
目前,PolarDB-X CDC连接器只支持作为源表使用。如您需要对PolarDB-X实例进行维表查询、或作为结果表写入,请使用MySQL连接器(公测中)。
类别 | 详情 |
支持类型 | 源表 |
运行模式 | 仅支持流模式 |
数据格式 | 暂不适用 |
特有监控指标 |
|
API种类 | SQL |
是否支持更新或删除结果表数据 | 否 |
特色功能
PolarDB-X CDC连接器针对Binlog解析阶段进行性能优化,支持PolarDB-X服务器侧对无关Binlog进行过滤和裁剪,从而提升吞吐量、节省网络带宽。
Binlog按需订阅示例
此版本支持在服务端对Binlog进行过滤、只发送所需的变更日志给客户端,从而起到降低网络流量压力、提升日志消费吞吐的优化作用。
例如,如果您只需订阅PolarDB-X服务器中db.table1和db.table2表的变更数据,可以像这样配置Flink SQL作业:
CREATE TABLE polardbx_table_foo (
... -- 在这里定义表结构
) WITH (
'connector' = 'polardbx-cdc',
'database-name' = 'db',
'table-name' = '.*',
..., -- 其他参数
'polardbx.binlog.include.tables' = 'db.table1,db.table2' -- 只订阅对应表的数据
);相较于MySQL CDC连接器会将整个实例中全量的变更Binlog日志加载到本地、并在客户端进行过滤,PolarDB-X CDC连接器具备服务端过滤Binlog、客户端按需订阅Binlog的能力,能够大幅减少网络IO开销。
使用限制
如您需要使用服务端Binlog服务端过滤、按表订阅功能,需要保证PolarDB-X服务端版本为2.5.0及以上,且日志服务组件为5.4.20或更高版本。
SQL
语法结构
CREATE TABLE polardbx_customer_table(
`id` STRING,
[columnName dataType,]*
PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
'connector' = 'polardbx-cdc',
'hosts' = 'pxc-**************-pub.polarx.rds.aliyuncs.com',
'username' = 'pdx_user',
'password' = 'pdx_password',
'database' = 'full_db',
'collection' = 'customers'
)WITH参数
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 连接器名称。 | STRING | 是 | 无 | 固定值为polardbx-cdc。 |
hostname | PolarDB-X数据库的IP地址或者Hostname。 | STRING | 是 | 无 | 建议填写实例连接信息中的集群地址。 |
port | PolarDB-X数据库的服务端口号。 | INTEGER | 否 | 3306 | 无。 |
username | PolarDB-X数据库服务的用户名。 | STRING | 是 | 无 | 无。 |
password | PolarDB-X数据库服务的密码。 | STRING | 是 | 无 | 无。 |
database-name | PolarDB-X数据库名称。 | STRING | 是 | 无 | 支持使用正则表达式匹配读取多个数据库的数据。 说明 使用正则表达式时,不要使用^和$符号匹配开头和结尾。 |
table-name | PolarDB-X表名。 | STRING | 是 | 无 | 支持使用正则表达式匹配读取多张表的数据。 说明 使用正则表达式时,不要使用^和$符号匹配开头和结尾。 |
server-time-zone | 数据库在使用的会话时区。 | STRING | 否 | 作业运行环境可用区时区。 | 指定 IANA 时区标识符,例如 Asia/Shanghai。该参数控制了源表中的TIMESTAMP类型如何转换为STRING类型。 |
scan.incremental.snapshot.chunk.size | 增量快照分块读取时,每个chunk的大小(包含的行数)。 | INTEGER | 否 | 8096 | PolarDB-X 将表切分为多个分片(Chunk)进行读取,并在内存中缓存分片数据。减少单分片行数会增加分片总量。这虽然细化了故障恢复粒度,但也增加了内存溢出(OOM)风险并降低吞吐量。请配置合理的分片大小以平衡性能。 |
scan.snapshot.fetch.size | 当读取表的全量数据时,每次最多拉取的记录数。 | INTEGER | 否 | 1024 | 无。 |
connect.timeout | 连接PolarDB-X数据库服务器超时时,重试连接之前等待超时的最长时间。 | DURATION | 否 | 30s | 无。 |
connection.pool.size | 数据库连接池大小。 | INTEGER | 否 | 20 | 数据库连接池用于复用连接,可以降低数据库连接数量。 |
connect.max-retries | 连接MySQL数据库服务时,连接失败后重试的最大次数。 | INTEGER | 否 | 3 | 无。 |
scan.startup.mode | 消费数据时的启动模式。 | STRING | 否 | initial | 参数取值如下:
重要 对于earliest-offset,specific-offset和timestamp启动模式,启动时的表结构必须与指定位点一致。结构不匹配将导致作业报错。请确保在指定 Binlog 位点至作业启动期间,表结构未发生变更。 |
scan.startup.specific-offset.file | 使用指定位点模式启动时,启动位点的Binlog文件名。 | STRING | 否 | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。文件名格式例如 |
scan.startup.specific-offset.pos | 使用指定位点模式启动时,启动位点在指定Binlog文件中的偏移量。 | INTEGER | 否 | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。 |
scan.startup.specific-offset.gtid-set | 使用指定位点模式启动时,启动位点的GTID集合。 | STRING | 否 | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。GTID集合格式例如 |
scan.startup.timestamp-millis | 使用指定时间模式启动时,启动位点的毫秒时间戳。 | LONG | 否 | 无 | 使用该配置时,scan.startup.mode必须配置为timestamp。时间戳单位为毫秒。 |
scan.startup.specific-offset.skip-events | 从指定的位点读取时,跳过多少Binlog事件。 | INTEGER | 否 | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。 |
scan.startup.specific-offset.skip-rows | 从指定的位点读取时,跳过多少行变更(一个Binlog事件可能对应多行变更)。 | INTEGER | 否 | 无 | 使用该配置时,scan.startup.mode必须配置为specific-offset。 |
heartbeat.interval | Source通过心跳事件推动Binlog位点前进的时间间隔。 | DURATION | 否 | 无 | 心跳事件强制推进 Source 端的 Binlog 位点。此机制防止低频更新导致 Binlog 过期。Binlog 过期将引发作业失败,且仅能通过无状态重启恢复。 |
chunk-meta.group.size | chunk元信息的大小。 | INTEGER | 否 | 1000 | 如果元信息大于该值,元信息会分为多份传递。 |
chunk-key.even-distribution.factor.upper-bound | 是否可以均匀分片的chunk分布因子的上限。 | DOUBLE | 否 | 1000.0 | 分布因子大于该值会使用非均匀分片。 chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 总数据行数。 |
chunk-key.even-distribution.factor.lower-bound | 是否可以均匀分片的chunk分布因子的下限。 | DOUBLE | 否 | 0.05 | 分布因子小于该值会使用非均匀分片。 chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 总数据行数。 |
scan.newly-added-table.enabled | 从Checkpoint重启时,是否扫描新增的捕获表。 | BOOLEAN | 否 | false | 启用后,系统将同步此前未匹配的新增表,并从状态中移除不再匹配的表。从Checkpoint或Savepoint重启时生效。 |
scan.incremental.snapshot.chunk.key-column | 指定快照阶段用于数据分片的列。 | STRING | 见备注 | 无 |
|
scan.incremental.close-idle-reader.enabled | 是否在快照结束后关闭空闲的 Reader。 | BOOLEAN | 否 | false | 该配置生效需要同时设置execution.checkpointing.checkpoints-after-tasks-finish.enabled为true。 |
scan.incremental.snapshot.backfill.skip | 是否在快照读取阶段跳过backfill。 | BOOLEAN | 否 | false | 参数取值如下:
如果跳过backfill,快照阶段表的更改将在稍后的增量阶段读取,而不是合并到快照中。 重要 跳过backfill可能导致数据不一致,因为快照阶段发生的变更可能会被重放,仅保证at-least-once语义。 |
scan.parse.online.schema.changes.enabled | 在增量阶段,是否尝试解析 RDS 无锁变更 DDL 事件。 | BOOLEAN | 否 | false | 参数取值如下:
实验性功能。建议在执行线上无锁变更前,先对Flink作业创建一个Savepoint以便恢复。 |
scan.only.deserialize.captured.tables.changelog.enabled | 在增量阶段,是否仅对指定表的变更事件进行反序列化。 | BOOLEAN | 否 | true | 参数取值如下:
|
scan.read-changelog-as-append-only.enabled | 是否将changelog数据流转换为append-only数据流。 | BOOLEAN | 否 | false | 参数取值如下:
|
scan.parallel-deserialize-changelog.enabled | 在增量阶段,是否使用多线程对变更事件进行解析。 | BOOLEAN | 否 | false | 参数取值如下:
|
scan.parallel-deserialize-changelog.handler.size | 多线程对变更事件进行解析时,事件处理器的数量。 | INTEGER | 否 | 2 | 无。 |
scan.incremental.snapshot.unbounded-chunk-first.enabled | 快照读取阶段是否先分发无界的分片。 | BOOLEAN | 否 | false | 参数取值如下:
实验性功能。开启后能够降低TaskManager在快照阶段同步最后一个分片时遇到内存溢出 (OOM) 的风险,建议在作业第一次启动前添加。 |
polardbx.binlog.ignore.archive-events.enabled | 是否忽略 PolarDB-X Binlog 中的归档事件(主要是 `DELETE` 事件)。 | BOOLEAN | 否 | false | |
polardbx.binlog.ignore.query-events.enabled | 是否忽略 PolarDB-X Binlog 中的ROWS_QUERY_LOG_EVENT事件。 | BOOLEAN | 否 | false | |
polardbx.binlog.include.tables | 仅订阅这些表的Binlog日志。多个表名之间用逗号(`,`)分隔。 | STRING | 否 | 无 | |
polardbx.binlog.exclude.tables | 不订阅这些表的Binlog日志。多个表名之间用逗号(`,`)分隔。 | STRING | 否 | 无 |
类型映射
PolarDB-X字段类型 | Flink字段类型 |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
INT | INT |
MEDIUMINT | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] | |
BOOLEAN | BOOLEAN |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] [WITHOUT TIME ZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIME ZONE] |
TIMESTAMP [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] WITH LOCAL TIME ZONE | |
CHAR(n) | STRING |
VARCHAR(n) | |
TEXT | |
BINARY | BYTES |
VARBINARY | |
BLOB |