The PolarDB-X CDC connector reads change data capture (CDC) events from a PolarDB-X instance in real time. It runs in streaming mode only and can be used as a source table.
This connector requires Ververica Runtime (VVR) 11.5 or later and PolarDB-X 2.0 or later.
To query a dimension table or write to a sink table in PolarDB-X, use the MySQL connector (public preview) instead.
Connector overview
| Category | Details |
|---|---|
| Supported type | Source table |
| Runtime mode | Streaming mode only |
| Data format | Not applicable |
| API type | SQL |
| Supports updates or deletions to sink tables | No |
Server-side binary log filtering
The PolarDB-X CDC connector filters binary logs on the server before sending them to the client. Only the change events you subscribe to are transmitted, which reduces network I/O and improves throughput.
This differs from connectors that pull all binary logs from an instance and filter them on the client side.
To subscribe to specific tables, set polardbx.binlog.include.tables in your CREATE TABLE statement:
CREATE TABLE polardbx_table_foo (
-- Define the table schema here
) WITH (
'connector' = 'polardbx-cdc',
'database-name' = 'db',
'table-name' = '.*',
-- Other required parameters
'polardbx.binlog.include.tables' = 'db.table1,db.table2'
);
Server-side binary log filtering requires PolarDB-X server version 2.5.0 or later and Simple Log Service component version 5.4.20 or later.
Limitations
-
The connector can only be used as a source table. For sink or dimension table access, use the MySQL connector.
-
Server-side binary log filtering requires PolarDB-X 2.5.0 or later and Simple Log Service 5.4.20 or later.
SQL syntax
CREATE TABLE polardbx_customer_table (
`id` STRING,
[columnName dataType,]*
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'polardbx-cdc',
'hostname' = '<cluster-endpoint>',
'username' = '<username>',
'password' = '<password>',
'database-name' = '<database-name>',
'table-name' = '<table-name>'
);
Replace the placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<cluster-endpoint> |
Cluster endpoint of the PolarDB-X instance | pxc-xxxx-pub.polarx.rds.aliyuncs.com |
<username> |
Database username | pdx_user |
<password> |
Database password | — |
<database-name> |
Database name or regular expression | my_db |
<table-name> |
Table name or regular expression | orders |
WITH parameters
Required parameters
| Parameter | Type | Description |
|---|---|---|
connector |
STRING | Must be polardbx-cdc. |
hostname |
STRING | IP address or hostname of the PolarDB-X instance. Specify the cluster endpoint. |
username |
STRING | Database username. |
password |
STRING | Database password. |
database-name |
STRING | Database name. Accepts regular expressions to match multiple databases. Do not use ^ or $ anchors. |
table-name |
STRING | Table name. Accepts regular expressions to match multiple tables. Do not use ^ or $ anchors. |
Optional parameters
Connection
| Parameter | Type | Default | Description |
|---|---|---|---|
port |
INTEGER | 3306 |
Service port of the PolarDB-X database. |
connect.timeout |
DURATION | 30s |
Maximum wait time before retrying after a connection timeout. |
connect.max-retries |
INTEGER | 3 |
Maximum number of retries after a failed connection. |
connection.pool.size |
INTEGER | 20 |
Size of the database connection pool. The pool reuses connections to reduce the total number of open database connections. |
server-time-zone |
STRING | Job time zone | Session time zone for the database, specified as an IANA time zone identifier (for example, Asia/Shanghai). Controls how TIMESTAMP columns are converted to STRING. |
Startup mode
| Parameter | Type | Default | Description |
|---|---|---|---|
scan.startup.mode |
STRING | initial |
Controls where the connector starts reading. See Startup modes. |
scan.startup.specific-offset.file |
STRING | None | Binary log filename for the specific-offset startup mode. Example: mysql-bin.000003. |
scan.startup.specific-offset.pos |
INTEGER | None | Position in the binary log file for the specific-offset startup mode. |
scan.startup.specific-offset.gtid-set |
STRING | None | GTID set for the specific-offset startup mode. Example: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19. |
scan.startup.timestamp-millis |
LONG | None | Start timestamp in milliseconds for the timestamp startup mode. |
scan.startup.specific-offset.skip-events |
INTEGER | None | Number of binary log events to skip when starting from a specific offset. Requires scan.startup.mode = specific-offset. |
scan.startup.specific-offset.skip-rows |
INTEGER | None | Number of row changes to skip when starting from a specific offset. A single binary log event can correspond to multiple row changes. Requires scan.startup.mode = specific-offset. |
Snapshot reading
| Parameter | Type | Default | Description |
|---|---|---|---|
scan.incremental.snapshot.chunk.size |
INTEGER | 8096 |
Number of rows per chunk during snapshot reading. Smaller chunks give finer-grained fault recovery but increase the total chunk count, raising the risk of out-of-memory (OOM) errors and reducing throughput. |
scan.snapshot.fetch.size |
INTEGER | 1024 |
Maximum number of records pulled per batch during a full-table snapshot. |
scan.incremental.snapshot.chunk.key-column |
STRING | None | Column used for data sharding during the snapshot phase. Required for tables without a primary key (the column must be NOT NULL). For tables with a primary key, you can optionally specify one column from the primary key. |
scan.incremental.snapshot.backfill.skip |
BOOLEAN | false |
When true, skips backfill during snapshot reading. Changes to the table during the snapshot phase are read in the later incremental phase instead of being merged into the snapshot. Only at-least-once semantics are guaranteed when backfill is skipped. |
scan.incremental.snapshot.unbounded-chunk-first.enabled |
BOOLEAN | false |
When true, distributes unbounded chunks first during snapshot reading. This is an experimental feature that can reduce OOM risk when a TaskManager synchronizes the last chunk. Add this parameter before the job starts for the first time. |
scan.newly-added-table.enabled |
BOOLEAN | false |
When true, scans newly added captured tables when the job restarts from a checkpoint or savepoint. Newly matched tables are synchronized, and tables that no longer match are removed from state. |
scan.incremental.close-idle-reader.enabled |
BOOLEAN | false |
When true, shuts down idle readers after the snapshot phase ends. Also requires execution.checkpointing.checkpoints-after-tasks-finish.enabled = true. |
Incremental (binary log) reading
| Parameter | Type | Default | Description |
|---|---|---|---|
heartbeat.interval |
DURATION | None | Interval at which the source sends heartbeat events to advance the binary log offset. Heartbeats prevent the binary log from expiring when updates are infrequent. An expired binary log causes the job to fail; recovery requires a stateless restart. |
scan.only.deserialize.captured.tables.changelog.enabled |
BOOLEAN | true |
When true, deserializes change events only for the target tables during the incremental phase. Set to false to deserialize change events for all tables. |
scan.parallel-deserialize-changelog.enabled |
BOOLEAN | false |
When true, uses multiple threads to parse change events during the incremental phase while preserving binary log event order. |
scan.parallel-deserialize-changelog.handler.size |
INTEGER | 2 |
Number of event handler threads when parallel deserialization is enabled. |
scan.read-changelog-as-append-only.enabled |
BOOLEAN | false |
When true, converts all change events (INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER) to INSERT messages, producing an append-only stream. Enable only in specific scenarios, such as when downstream systems need to persist DELETE messages from an upstream table. |
scan.parse.online.schema.changes.enabled |
BOOLEAN | false |
When true, parses RDS lockless DDL change events during the incremental phase. This is an experimental feature. Create a savepoint before performing an online schema change. |
Chunk distribution
| Parameter | Type | Default | Description |
|---|---|---|---|
chunk-meta.group.size |
INTEGER | 1000 |
Maximum size of a chunk metadata group. Metadata larger than this value is split for transmission. |
chunk-key.even-distribution.factor.upper-bound |
DOUBLE | 1000.0 |
Upper bound of the distribution factor for even sharding. If the distribution factor exceeds this value, uneven sharding is used. Distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / total rows. |
chunk-key.even-distribution.factor.lower-bound |
DOUBLE | 0.05 |
Lower bound of the distribution factor for even sharding. If the distribution factor falls below this value, uneven sharding is used. |
PolarDB-X binary log subscription
| Parameter | Type | Default | Description |
|---|---|---|---|
polardbx.binlog.include.tables |
STRING | None | Subscribe to binary logs from only these tables. Specify as a comma-separated list of database.table pairs. Requires PolarDB-X 2.5.0 or later. |
polardbx.binlog.exclude.tables |
STRING | None | Exclude these tables from binary log subscription. Specify as a comma-separated list of database.table pairs. |
polardbx.binlog.ignore.archive-events.enabled |
BOOLEAN | false |
When true, ignores archive events (primarily DELETE events) in the binary log. |
polardbx.binlog.ignore.query-events.enabled |
BOOLEAN | false |
When true, ignores ROWS_QUERY_LOG_EVENT events in the binary log. |
Startup modes
The scan.startup.mode parameter controls where the connector begins reading when the job starts for the first time.
| Mode | Reads historical data | Starts from |
|---|---|---|
initial (default) |
Yes | Scans all existing rows, then reads from the current binary log position |
latest-offset |
No | The end of the current binary log (changes after the connector starts) |
earliest-offset |
No | The earliest available binary log position |
specific-offset |
No | A binary log file and position, or a GTID set |
timestamp |
No | The binary log position at the specified millisecond timestamp |
For earliest-offset, specific-offset, and timestamp modes, the table schema at job startup must match the schema at the specified binary log position. A schema mismatch causes the job to fail. Make sure the schema does not change between the specified position and job startup.
Monitoring metrics
The following metrics are available for the PolarDB-X CDC connector. Metrics that depend on binary log position return 0 during the snapshot phase.
| Metric | Phase | Description |
|---|---|---|
currentFetchEventTimeLag |
Binary logging only | Lag between when data is generated and when it is pulled by the Source Operator. Returns 0 during the snapshot phase. |
currentEmitEventTimeLag |
Binary logging only | Lag between when data is generated and when it leaves the Source Operator. Returns 0 during the snapshot phase. |
sourceIdleTime |
Both | Duration for which the source has been idle. |
Type mapping
| PolarDB-X type | Flink type | Note |
|---|---|---|
TINYINT |
TINYINT |
|
SMALLINT, TINYINT UNSIGNED, TINYINT UNSIGNED ZEROFILL |
SMALLINT |
|
INT, MEDIUMINT, SMALLINT UNSIGNED, SMALLINT UNSIGNED ZEROFILL |
INT |
|
BIGINT, INT UNSIGNED, INT UNSIGNED ZEROFILL, MEDIUMINT UNSIGNED, MEDIUMINT UNSIGNED ZEROFILL |
BIGINT |
|
BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL |
DECIMAL(20, 0) |
Mapped to DECIMAL to avoid overflow, since BIGINT in Flink is signed and cannot represent the full unsigned 64-bit range. |
FLOAT [UNSIGNED] [ZEROFILL] |
FLOAT |
|
DOUBLE [UNSIGNED] [ZEROFILL], DOUBLE PRECISION [UNSIGNED] [ZEROFILL], REAL [UNSIGNED] [ZEROFILL] |
DOUBLE |
|
NUMERIC(p, s) [UNSIGNED] [ZEROFILL], DECIMAL(p, s) [UNSIGNED] [ZEROFILL] |
DECIMAL(p, s) |
|
BOOLEAN, TINYINT(1) |
BOOLEAN |
|
DATE |
DATE |
|
TIME [(p)] |
TIME [(p)] [WITHOUT TIME ZONE] |
|
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIME ZONE] |
|
TIMESTAMP [(p)], TIMESTAMP [(p)] WITH LOCAL TIME ZONE |
TIMESTAMP [(p)] |
|
CHAR(n), VARCHAR(n), TEXT |
STRING |
|
BINARY, VARBINARY, BLOB |
BYTES |