All Products
Search
Document Center

Realtime Compute for Apache Flink:PolarDB-X CDC (Public Preview)

Last Updated:Jan 09, 2026

This topic describes how to use the PolarDB-X connector.

Background information

PolarDB for Xscale (PolarDB-X) is a high-performance, cloud-native distributed database service from Alibaba Cloud. It provides high throughput, large storage capacity, low latency, easy scalability, and high availability.

Important

This connector supports Ververica Runtime (VVR) 11.5 or later and must be used with PolarDB-X 2.0 or later.

The PolarDB-X CDC connector can be used only as a source table. To query a dimension table or write to a sink table in a PolarDB-X instance, use the MySQL connector (public preview).

Category

Details

Supported type

Source table

Runtime mode

Streaming mode only

Data format

Not applicable

Specific monitoring metrics

  • currentFetchEventTimeLag: The interval between when data is generated and when it is pulled by the Source Operator.

    This metric is valid only during the binary logging phase. During the snapshot phase, its value is always 0.

  • currentEmitEventTimeLag: The interval between when data is generated and when it leaves the Source Operator.

    This metric is valid only during the binary logging phase. During the snapshot phase, its value is always 0.

  • sourceIdleTime: The duration for which the source table has been idle.

API type

SQL

Supports updates or deletions to sink tables

No

Features

The PolarDB-X CDC connector optimizes performance during the binary log parsing phase by supporting server-side filtering and cropping of irrelevant binary logs. This improves throughput and saves network bandwidth.

Example of on-demand binary log subscription

This version supports server-side filtering of binary logs, sending only the required change logs to the client. This reduces network traffic and improves log consumption throughput.

For example, to subscribe only to the change data of the db.table1 and db.table2 tables on the PolarDB-X server, configure the Flink SQL job as follows:

CREATE TABLE polardbx_table_foo (
  ... -- Define the table schema here
) WITH (
  'connector' = 'polardbx-cdc',
  'database-name' = 'db',
  'table-name' = '.*',
  ..., -- Other parameters
  'polardbx.binlog.include.tables' = 'db.table1,db.table2' -- Subscribe to data from only the corresponding tables
);

Unlike the MySQL CDC connector, which loads all change logs from an entire instance for client-side filtering, the PolarDB-X CDC connector can filter binary logs on the server. This allows the client to subscribe to binary logs as needed and significantly reduces network I/O overhead.

Limits

Server-side binary log filtering and subscription to specific tables require PolarDB-X server version 2.5.0 or later and Simple Log Service component version 5.4.20 or later.

SQL

Syntax

CREATE TABLE polardbx_customer_table(
  `id` STRING,
  [columnName dataType,]*
  PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
  'connector' = 'polardbx-cdc',
  'hosts' = 'pxc-**************-pub.polarx.rds.aliyuncs.com',
  'username' = 'pdx_user',
  'password' = 'pdx_password',
  'database' = 'full_db',
  'collection' = 'customers'
)

WITH parameters

Parameter

Description

Data type

Required

Default value

Notes

connector

The name of the connector.

STRING

Yes

None

The value must be polardbx-cdc.

hostname

The IP address or hostname of the PolarDB-X database.

STRING

Yes

None

Specify the cluster endpoint of the instance.

port

The service port number of the PolarDB-X database.

INTEGER

No

3306

None.

username

The username for the PolarDB-X database service.

STRING

Yes

None

None.

password

The password for the PolarDB-X database service.

STRING

Yes

None

None.

database-name

The name of the PolarDB-X database.

STRING

Yes

None

You can use a regular expression to read data from multiple databases.

Note

When you use a regular expression, do not use the ^ and $ symbols to match the start and end of the string.

table-name

The name of the PolarDB-X table.

STRING

Yes

None

You can use a regular expression to read data from multiple tables.

Note

When you use a regular expression, do not use the ^ and $ symbols to match the start and end of the string.

server-time-zone

The session time zone used by the database.

STRING

No

The time zone of the zone where the job runs.

Specify an IANA time zone identifier, such as Asia/Shanghai. This parameter controls how TIMESTAMP types in the source table are converted to STRING types.

scan.incremental.snapshot.chunk.size

The size (number of rows) of each chunk when reading data from an incremental snapshot.

INTEGER

No

8096

PolarDB-X splits a table into multiple chunks for reading and caches the chunk data in memory. Reducing the number of rows per chunk increases the total number of chunks. This provides finer-grained fault recovery but also increases the risk of out-of-memory (OOM) errors and reduces throughput. Configure a reasonable chunk size to balance performance.

scan.snapshot.fetch.size

The maximum number of records to pull at a time when reading full data from a table.

INTEGER

No

1024

None.

connect.timeout

The maximum time to wait before retrying a connection after a connection to the PolarDB-X database server times out.

DURATION

No

30s

None.

connection.pool.size

The size of the database connection pool.

INTEGER

No

20

The database connection pool reuses connections to reduce the number of database connections.

connect.max-retries

The maximum number of retries after a failed connection to the MySQL database service.

INTEGER

No

3

None.

scan.startup.mode

The startup mode for data consumption.

STRING

No

initial

Valid values:

  • initial (default): When the job starts for the first time, it scans all historical data and then reads the latest binary log data.

  • latest-offset: When the job starts for the first time, it does not scan historical data. It starts reading from the end of the binary log, which means it reads only the changes that occur after the connector starts.

  • earliest-offset: Does not scan historical data. It starts reading from the earliest available binary log.

  • specific-offset: Does not scan historical data. It starts from a specified binary log offset. You can specify the offset by configuring both scan.startup.specific-offset.file and scan.startup.specific-offset.pos to start from a specific binary log file and position. You can also configure only scan.startup.specific-offset.gtid-set to start from a specific GTID set.

  • timestamp: Does not scan historical data. It starts reading the binary log from a specified timestamp. The timestamp is specified in milliseconds by scan.startup.timestamp-millis.

Important

For the earliest-offset, specific-offset, and timestamp startup modes, the table schema at startup must match the schema at the specified offset. A schema mismatch causes the job to fail. Ensure that the table schema does not change between the specified binary log offset and the job startup.

scan.startup.specific-offset.file

The name of the binary log file for the start offset when using the specific offset mode.

STRING

No

None

When you use this parameter, you must set scan.startup.mode to specific-offset. Example file name format: mysql-bin.000003.

scan.startup.specific-offset.pos

The position in the specified binary log file for the start offset when using the specific offset mode.

INTEGER

No

None

When you use this parameter, you must set scan.startup.mode to specific-offset.

scan.startup.specific-offset.gtid-set

The GTID set for the start offset when using the specific offset mode.

STRING

No

None

When you use this parameter, you must set scan.startup.mode to specific-offset. Example GTID set format: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

scan.startup.timestamp-millis

The timestamp in milliseconds for the start offset when using the specific time mode.

LONG

No

None

When you use this parameter, you must set scan.startup.mode to timestamp. The timestamp is in milliseconds.

scan.startup.specific-offset.skip-events

The number of binary log events to skip when reading from a specified offset.

INTEGER

No

None

When you use this parameter, you must set scan.startup.mode to specific-offset.

scan.startup.specific-offset.skip-rows

The number of row changes to skip when reading from a specified offset. A single binary log event can correspond to multiple row changes.

INTEGER

No

None

When you use this parameter, you must set scan.startup.mode to specific-offset.

heartbeat.interval

The interval at which the source uses heartbeat events to advance the binary log offset.

DURATION

No

None

Heartbeat events force the binary log offset to advance on the source side. This mechanism prevents the binary log from expiring due to infrequent updates. An expired binary log causes the job to fail and can be recovered only by a stateless restart.

chunk-meta.group.size

The size of the chunk metadata.

INTEGER

No

1000

If the metadata is larger than this value, it is split into multiple parts for transmission.

chunk-key.even-distribution.factor.upper-bound

The upper bound of the chunk distribution factor for even sharding.

DOUBLE

No

1000.0

If the distribution factor is greater than this value, uneven sharding is used.

Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of rows.

chunk-key.even-distribution.factor.lower-bound

The lower bound of the chunk distribution factor for even sharding.

DOUBLE

No

0.05

If the distribution factor is less than this value, uneven sharding is used.

Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of rows.

scan.newly-added-table.enabled

Specifies whether to scan newly added captured tables when the job restarts from a checkpoint.

BOOLEAN

No

false

If enabled, the system synchronizes newly added tables that did not match before and removes tables that no longer match from the state. This takes effect when restarting from a checkpoint or savepoint.

scan.incremental.snapshot.chunk.key-column

Specifies the column used for data sharding during the snapshot phase.

STRING

See Notes

None

  • Required for tables without a primary key. The selected column must be of a NOT NULL type.

  • Optional for tables with a primary key. You can select only one column from the primary key.

scan.incremental.close-idle-reader.enabled

Specifies whether to shut down idle readers after the snapshot phase ends.

BOOLEAN

No

false

For this configuration to take effect, you must also set execution.checkpointing.checkpoints-after-tasks-finish.enabled to true.

scan.incremental.snapshot.backfill.skip

Specifies whether to skip backfill during the snapshot reading phase.

BOOLEAN

No

false

Valid values:

  • true: Skips backfill during the snapshot reading phase.

  • false (default): Does not skip backfill during the snapshot reading phase.

If backfill is skipped, changes to the table during the snapshot phase are read in the later incremental phase instead of being merged into the snapshot.

Important

Skipping backfill may cause data inconsistency because changes that occur during the snapshot phase might be replayed. Only at-least-once semantics are guaranteed.

scan.parse.online.schema.changes.enabled

In the incremental phase, specifies whether to try to parse RDS lockless change DDL events.

BOOLEAN

No

false

Valid values:

  • true: Parses RDS lockless change DDL events.

  • false (default): Does not parse RDS lockless change DDL events.

This is an experimental feature. Before you perform an online lockless change, create a savepoint for the Flink job to facilitate recovery.

scan.only.deserialize.captured.tables.changelog.enabled

In the incremental phase, specifies whether to deserialize only the change events for the specified tables.

BOOLEAN

No

true

Valid values:

  • true: Deserializes only the change data for the target tables to speed up binary log reading.

  • false (default): Deserializes the change data for all tables.

scan.read-changelog-as-append-only.enabled

Specifies whether to convert the changelog stream to an append-only stream.

BOOLEAN

No

false

Valid values:

  • true: All types of messages, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER, are converted to INSERT messages. Enable this option only in specific scenarios, such as when you need to save delete messages from an ancestor table.

  • false (default): All types of messages are sent downstream as they are.

scan.parallel-deserialize-changelog.enabled

In the incremental phase, specifies whether to use multiple threads to parse change events.

BOOLEAN

No

false

Valid values:

  • true: Uses multi-threaded processing during the change event deserialization phase while maintaining the order of binary log events to speed up reading.

  • false (default): Uses single-threaded processing during the event deserialization phase.

scan.parallel-deserialize-changelog.handler.size

The number of event handlers when using multiple threads to parse change events.

INTEGER

No

2

None.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Specifies whether to distribute unbounded chunks first during the snapshot reading phase.

BOOLEAN

No

false

Valid values:

  • true: Distributes unbounded chunks first during the snapshot reading phase.

  • false (default): Does not distribute unbounded chunks first during the snapshot reading phase.

This is an experimental feature. Enabling it can reduce the risk of OOM errors when a TaskManager synchronizes the last chunk during the snapshot phase. Add this configuration before the job starts for the first time.

polardbx.binlog.ignore.archive-events.enabled

Specifies whether to ignore archive events (mainly `DELETE` events) in the PolarDB-X binary log.

BOOLEAN

No

false

polardbx.binlog.ignore.query-events.enabled

Specifies whether to ignore query events in the PolarDB-X binary log.

BOOLEAN

No

false

polardbx.binlog.include.tables

Subscribes to the binary logs of only these tables. Separate multiple table names with commas (,).

STRING

No

None

polardbx.binlog.exclude.tables

Does not subscribe to the binary logs of these tables. Separate multiple table names with commas (,).

STRING

No

None

Type mapping

PolarDB-X data type

Flink data type

TINYINT

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

INT

INT

MEDIUMINT

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

BOOLEAN

BOOLEAN

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)] [WITHOUT TIME ZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIME ZONE]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)] WITH LOCAL TIME ZONE

CHAR(n)

STRING

VARCHAR(n)

TEXT

BINARY

BYTES

VARBINARY

BLOB