All Products
Search
Document Center

Realtime Compute for Apache Flink:PolarDB-X CDC

Last Updated:Mar 26, 2026

The PolarDB-X CDC connector reads change data capture (CDC) events from a PolarDB-X instance in real time. It runs in streaming mode only and can be used as a source table.

Important

This connector requires Ververica Runtime (VVR) 11.5 or later and PolarDB-X 2.0 or later.

To query a dimension table or write to a sink table in PolarDB-X, use the MySQL connector (public preview) instead.

Connector overview

Category Details
Supported type Source table
Runtime mode Streaming mode only
Data format Not applicable
API type SQL
Supports updates or deletions to sink tables No

Server-side binary log filtering

The PolarDB-X CDC connector filters binary logs on the server before sending them to the client. Only the change events you subscribe to are transmitted, which reduces network I/O and improves throughput.

This differs from connectors that pull all binary logs from an instance and filter them on the client side.

To subscribe to specific tables, set polardbx.binlog.include.tables in your CREATE TABLE statement:

CREATE TABLE polardbx_table_foo (
  -- Define the table schema here
) WITH (
  'connector' = 'polardbx-cdc',
  'database-name' = 'db',
  'table-name' = '.*',
  -- Other required parameters
  'polardbx.binlog.include.tables' = 'db.table1,db.table2'
);
Server-side binary log filtering requires PolarDB-X server version 2.5.0 or later and Simple Log Service component version 5.4.20 or later.

Limitations

  • The connector can only be used as a source table. For sink or dimension table access, use the MySQL connector.

  • Server-side binary log filtering requires PolarDB-X 2.5.0 or later and Simple Log Service 5.4.20 or later.

SQL syntax

CREATE TABLE polardbx_customer_table (
  `id` STRING,
  [columnName dataType,]*
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'polardbx-cdc',
  'hostname' = '<cluster-endpoint>',
  'username' = '<username>',
  'password' = '<password>',
  'database-name' = '<database-name>',
  'table-name' = '<table-name>'
);

Replace the placeholders with your actual values:

Placeholder Description Example
<cluster-endpoint> Cluster endpoint of the PolarDB-X instance pxc-xxxx-pub.polarx.rds.aliyuncs.com
<username> Database username pdx_user
<password> Database password
<database-name> Database name or regular expression my_db
<table-name> Table name or regular expression orders

WITH parameters

Required parameters

Parameter Type Description
connector STRING Must be polardbx-cdc.
hostname STRING IP address or hostname of the PolarDB-X instance. Specify the cluster endpoint.
username STRING Database username.
password STRING Database password.
database-name STRING Database name. Accepts regular expressions to match multiple databases. Do not use ^ or $ anchors.
table-name STRING Table name. Accepts regular expressions to match multiple tables. Do not use ^ or $ anchors.

Optional parameters

Connection

Parameter Type Default Description
port INTEGER 3306 Service port of the PolarDB-X database.
connect.timeout DURATION 30s Maximum wait time before retrying after a connection timeout.
connect.max-retries INTEGER 3 Maximum number of retries after a failed connection.
connection.pool.size INTEGER 20 Size of the database connection pool. The pool reuses connections to reduce the total number of open database connections.
server-time-zone STRING Job time zone Session time zone for the database, specified as an IANA time zone identifier (for example, Asia/Shanghai). Controls how TIMESTAMP columns are converted to STRING.

Startup mode

Parameter Type Default Description
scan.startup.mode STRING initial Controls where the connector starts reading. See Startup modes.
scan.startup.specific-offset.file STRING None Binary log filename for the specific-offset startup mode. Example: mysql-bin.000003.
scan.startup.specific-offset.pos INTEGER None Position in the binary log file for the specific-offset startup mode.
scan.startup.specific-offset.gtid-set STRING None GTID set for the specific-offset startup mode. Example: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.
scan.startup.timestamp-millis LONG None Start timestamp in milliseconds for the timestamp startup mode.
scan.startup.specific-offset.skip-events INTEGER None Number of binary log events to skip when starting from a specific offset. Requires scan.startup.mode = specific-offset.
scan.startup.specific-offset.skip-rows INTEGER None Number of row changes to skip when starting from a specific offset. A single binary log event can correspond to multiple row changes. Requires scan.startup.mode = specific-offset.

Snapshot reading

Parameter Type Default Description
scan.incremental.snapshot.chunk.size INTEGER 8096 Number of rows per chunk during snapshot reading. Smaller chunks give finer-grained fault recovery but increase the total chunk count, raising the risk of out-of-memory (OOM) errors and reducing throughput.
scan.snapshot.fetch.size INTEGER 1024 Maximum number of records pulled per batch during a full-table snapshot.
scan.incremental.snapshot.chunk.key-column STRING None Column used for data sharding during the snapshot phase. Required for tables without a primary key (the column must be NOT NULL). For tables with a primary key, you can optionally specify one column from the primary key.
scan.incremental.snapshot.backfill.skip BOOLEAN false When true, skips backfill during snapshot reading. Changes to the table during the snapshot phase are read in the later incremental phase instead of being merged into the snapshot. Only at-least-once semantics are guaranteed when backfill is skipped.
scan.incremental.snapshot.unbounded-chunk-first.enabled BOOLEAN false When true, distributes unbounded chunks first during snapshot reading. This is an experimental feature that can reduce OOM risk when a TaskManager synchronizes the last chunk. Add this parameter before the job starts for the first time.
scan.newly-added-table.enabled BOOLEAN false When true, scans newly added captured tables when the job restarts from a checkpoint or savepoint. Newly matched tables are synchronized, and tables that no longer match are removed from state.
scan.incremental.close-idle-reader.enabled BOOLEAN false When true, shuts down idle readers after the snapshot phase ends. Also requires execution.checkpointing.checkpoints-after-tasks-finish.enabled = true.

Incremental (binary log) reading

Parameter Type Default Description
heartbeat.interval DURATION None Interval at which the source sends heartbeat events to advance the binary log offset. Heartbeats prevent the binary log from expiring when updates are infrequent. An expired binary log causes the job to fail; recovery requires a stateless restart.
scan.only.deserialize.captured.tables.changelog.enabled BOOLEAN true When true, deserializes change events only for the target tables during the incremental phase. Set to false to deserialize change events for all tables.
scan.parallel-deserialize-changelog.enabled BOOLEAN false When true, uses multiple threads to parse change events during the incremental phase while preserving binary log event order.
scan.parallel-deserialize-changelog.handler.size INTEGER 2 Number of event handler threads when parallel deserialization is enabled.
scan.read-changelog-as-append-only.enabled BOOLEAN false When true, converts all change events (INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER) to INSERT messages, producing an append-only stream. Enable only in specific scenarios, such as when downstream systems need to persist DELETE messages from an upstream table.
scan.parse.online.schema.changes.enabled BOOLEAN false When true, parses RDS lockless DDL change events during the incremental phase. This is an experimental feature. Create a savepoint before performing an online schema change.

Chunk distribution

Parameter Type Default Description
chunk-meta.group.size INTEGER 1000 Maximum size of a chunk metadata group. Metadata larger than this value is split for transmission.
chunk-key.even-distribution.factor.upper-bound DOUBLE 1000.0 Upper bound of the distribution factor for even sharding. If the distribution factor exceeds this value, uneven sharding is used. Distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / total rows.
chunk-key.even-distribution.factor.lower-bound DOUBLE 0.05 Lower bound of the distribution factor for even sharding. If the distribution factor falls below this value, uneven sharding is used.

PolarDB-X binary log subscription

Parameter Type Default Description
polardbx.binlog.include.tables STRING None Subscribe to binary logs from only these tables. Specify as a comma-separated list of database.table pairs. Requires PolarDB-X 2.5.0 or later.
polardbx.binlog.exclude.tables STRING None Exclude these tables from binary log subscription. Specify as a comma-separated list of database.table pairs.
polardbx.binlog.ignore.archive-events.enabled BOOLEAN false When true, ignores archive events (primarily DELETE events) in the binary log.
polardbx.binlog.ignore.query-events.enabled BOOLEAN false When true, ignores ROWS_QUERY_LOG_EVENT events in the binary log.

Startup modes

The scan.startup.mode parameter controls where the connector begins reading when the job starts for the first time.

Mode Reads historical data Starts from
initial (default) Yes Scans all existing rows, then reads from the current binary log position
latest-offset No The end of the current binary log (changes after the connector starts)
earliest-offset No The earliest available binary log position
specific-offset No A binary log file and position, or a GTID set
timestamp No The binary log position at the specified millisecond timestamp
Important

For earliest-offset, specific-offset, and timestamp modes, the table schema at job startup must match the schema at the specified binary log position. A schema mismatch causes the job to fail. Make sure the schema does not change between the specified position and job startup.

Monitoring metrics

The following metrics are available for the PolarDB-X CDC connector. Metrics that depend on binary log position return 0 during the snapshot phase.

Metric Phase Description
currentFetchEventTimeLag Binary logging only Lag between when data is generated and when it is pulled by the Source Operator. Returns 0 during the snapshot phase.
currentEmitEventTimeLag Binary logging only Lag between when data is generated and when it leaves the Source Operator. Returns 0 during the snapshot phase.
sourceIdleTime Both Duration for which the source has been idle.

Type mapping

PolarDB-X type Flink type Note
TINYINT TINYINT
SMALLINT, TINYINT UNSIGNED, TINYINT UNSIGNED ZEROFILL SMALLINT
INT, MEDIUMINT, SMALLINT UNSIGNED, SMALLINT UNSIGNED ZEROFILL INT
BIGINT, INT UNSIGNED, INT UNSIGNED ZEROFILL, MEDIUMINT UNSIGNED, MEDIUMINT UNSIGNED ZEROFILL BIGINT
BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL DECIMAL(20, 0) Mapped to DECIMAL to avoid overflow, since BIGINT in Flink is signed and cannot represent the full unsigned 64-bit range.
FLOAT [UNSIGNED] [ZEROFILL] FLOAT
DOUBLE [UNSIGNED] [ZEROFILL], DOUBLE PRECISION [UNSIGNED] [ZEROFILL], REAL [UNSIGNED] [ZEROFILL] DOUBLE
NUMERIC(p, s) [UNSIGNED] [ZEROFILL], DECIMAL(p, s) [UNSIGNED] [ZEROFILL] DECIMAL(p, s)
BOOLEAN, TINYINT(1) BOOLEAN
DATE DATE
TIME [(p)] TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)], TIMESTAMP [(p)] WITH LOCAL TIME ZONE TIMESTAMP [(p)]
CHAR(n), VARCHAR(n), TEXT STRING
BINARY, VARBINARY, BLOB BYTES