This topic describes how to use the PolarDB-X connector.
Background information
PolarDB for Xscale (PolarDB-X) is a high-performance, cloud-native distributed database service from Alibaba Cloud. It provides high throughput, large storage capacity, low latency, easy scalability, and high availability.
This connector supports Ververica Runtime (VVR) 11.5 or later and must be used with PolarDB-X 2.0 or later.
The PolarDB-X CDC connector can be used only as a source table. To query a dimension table or write to a sink table in a PolarDB-X instance, use the MySQL connector (public preview).
Category | Details |
Supported type | Source table |
Runtime mode | Streaming mode only |
Data format | Not applicable |
Specific monitoring metrics |
|
API type | SQL |
Supports updates or deletions to sink tables | No |
Features
The PolarDB-X CDC connector optimizes performance during the binary log parsing phase by supporting server-side filtering and cropping of irrelevant binary logs. This improves throughput and saves network bandwidth.
Example of on-demand binary log subscription
This version supports server-side filtering of binary logs, sending only the required change logs to the client. This reduces network traffic and improves log consumption throughput.
For example, to subscribe only to the change data of the db.table1 and db.table2 tables on the PolarDB-X server, configure the Flink SQL job as follows:
CREATE TABLE polardbx_table_foo (
... -- Define the table schema here
) WITH (
'connector' = 'polardbx-cdc',
'database-name' = 'db',
'table-name' = '.*',
..., -- Other parameters
'polardbx.binlog.include.tables' = 'db.table1,db.table2' -- Subscribe to data from only the corresponding tables
);Unlike the MySQL CDC connector, which loads all change logs from an entire instance for client-side filtering, the PolarDB-X CDC connector can filter binary logs on the server. This allows the client to subscribe to binary logs as needed and significantly reduces network I/O overhead.
Limits
Server-side binary log filtering and subscription to specific tables require PolarDB-X server version 2.5.0 or later and Simple Log Service component version 5.4.20 or later.
SQL
Syntax
CREATE TABLE polardbx_customer_table(
`id` STRING,
[columnName dataType,]*
PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
'connector' = 'polardbx-cdc',
'hosts' = 'pxc-**************-pub.polarx.rds.aliyuncs.com',
'username' = 'pdx_user',
'password' = 'pdx_password',
'database' = 'full_db',
'collection' = 'customers'
)WITH parameters
Parameter | Description | Data type | Required | Default value | Notes |
connector | The name of the connector. | STRING | Yes | None | The value must be polardbx-cdc. |
hostname | The IP address or hostname of the PolarDB-X database. | STRING | Yes | None | Specify the cluster endpoint of the instance. |
port | The service port number of the PolarDB-X database. | INTEGER | No | 3306 | None. |
username | The username for the PolarDB-X database service. | STRING | Yes | None | None. |
password | The password for the PolarDB-X database service. | STRING | Yes | None | None. |
database-name | The name of the PolarDB-X database. | STRING | Yes | None | You can use a regular expression to read data from multiple databases. Note When you use a regular expression, do not use the ^ and $ symbols to match the start and end of the string. |
table-name | The name of the PolarDB-X table. | STRING | Yes | None | You can use a regular expression to read data from multiple tables. Note When you use a regular expression, do not use the ^ and $ symbols to match the start and end of the string. |
server-time-zone | The session time zone used by the database. | STRING | No | The time zone of the zone where the job runs. | Specify an IANA time zone identifier, such as Asia/Shanghai. This parameter controls how TIMESTAMP types in the source table are converted to STRING types. |
scan.incremental.snapshot.chunk.size | The size (number of rows) of each chunk when reading data from an incremental snapshot. | INTEGER | No | 8096 | PolarDB-X splits a table into multiple chunks for reading and caches the chunk data in memory. Reducing the number of rows per chunk increases the total number of chunks. This provides finer-grained fault recovery but also increases the risk of out-of-memory (OOM) errors and reduces throughput. Configure a reasonable chunk size to balance performance. |
scan.snapshot.fetch.size | The maximum number of records to pull at a time when reading full data from a table. | INTEGER | No | 1024 | None. |
connect.timeout | The maximum time to wait before retrying a connection after a connection to the PolarDB-X database server times out. | DURATION | No | 30s | None. |
connection.pool.size | The size of the database connection pool. | INTEGER | No | 20 | The database connection pool reuses connections to reduce the number of database connections. |
connect.max-retries | The maximum number of retries after a failed connection to the MySQL database service. | INTEGER | No | 3 | None. |
scan.startup.mode | The startup mode for data consumption. | STRING | No | initial | Valid values:
Important For the earliest-offset, specific-offset, and timestamp startup modes, the table schema at startup must match the schema at the specified offset. A schema mismatch causes the job to fail. Ensure that the table schema does not change between the specified binary log offset and the job startup. |
scan.startup.specific-offset.file | The name of the binary log file for the start offset when using the specific offset mode. | STRING | No | None | When you use this parameter, you must set scan.startup.mode to specific-offset. Example file name format: |
scan.startup.specific-offset.pos | The position in the specified binary log file for the start offset when using the specific offset mode. | INTEGER | No | None | When you use this parameter, you must set scan.startup.mode to specific-offset. |
scan.startup.specific-offset.gtid-set | The GTID set for the start offset when using the specific offset mode. | STRING | No | None | When you use this parameter, you must set scan.startup.mode to specific-offset. Example GTID set format: |
scan.startup.timestamp-millis | The timestamp in milliseconds for the start offset when using the specific time mode. | LONG | No | None | When you use this parameter, you must set scan.startup.mode to timestamp. The timestamp is in milliseconds. |
scan.startup.specific-offset.skip-events | The number of binary log events to skip when reading from a specified offset. | INTEGER | No | None | When you use this parameter, you must set scan.startup.mode to specific-offset. |
scan.startup.specific-offset.skip-rows | The number of row changes to skip when reading from a specified offset. A single binary log event can correspond to multiple row changes. | INTEGER | No | None | When you use this parameter, you must set scan.startup.mode to specific-offset. |
heartbeat.interval | The interval at which the source uses heartbeat events to advance the binary log offset. | DURATION | No | None | Heartbeat events force the binary log offset to advance on the source side. This mechanism prevents the binary log from expiring due to infrequent updates. An expired binary log causes the job to fail and can be recovered only by a stateless restart. |
chunk-meta.group.size | The size of the chunk metadata. | INTEGER | No | 1000 | If the metadata is larger than this value, it is split into multiple parts for transmission. |
chunk-key.even-distribution.factor.upper-bound | The upper bound of the chunk distribution factor for even sharding. | DOUBLE | No | 1000.0 | If the distribution factor is greater than this value, uneven sharding is used. Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of rows. |
chunk-key.even-distribution.factor.lower-bound | The lower bound of the chunk distribution factor for even sharding. | DOUBLE | No | 0.05 | If the distribution factor is less than this value, uneven sharding is used. Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of rows. |
scan.newly-added-table.enabled | Specifies whether to scan newly added captured tables when the job restarts from a checkpoint. | BOOLEAN | No | false | If enabled, the system synchronizes newly added tables that did not match before and removes tables that no longer match from the state. This takes effect when restarting from a checkpoint or savepoint. |
scan.incremental.snapshot.chunk.key-column | Specifies the column used for data sharding during the snapshot phase. | STRING | See Notes | None |
|
scan.incremental.close-idle-reader.enabled | Specifies whether to shut down idle readers after the snapshot phase ends. | BOOLEAN | No | false | For this configuration to take effect, you must also set execution.checkpointing.checkpoints-after-tasks-finish.enabled to true. |
scan.incremental.snapshot.backfill.skip | Specifies whether to skip backfill during the snapshot reading phase. | BOOLEAN | No | false | Valid values:
If backfill is skipped, changes to the table during the snapshot phase are read in the later incremental phase instead of being merged into the snapshot. Important Skipping backfill may cause data inconsistency because changes that occur during the snapshot phase might be replayed. Only at-least-once semantics are guaranteed. |
scan.parse.online.schema.changes.enabled | In the incremental phase, specifies whether to try to parse RDS lockless change DDL events. | BOOLEAN | No | false | Valid values:
This is an experimental feature. Before you perform an online lockless change, create a savepoint for the Flink job to facilitate recovery. |
scan.only.deserialize.captured.tables.changelog.enabled | In the incremental phase, specifies whether to deserialize only the change events for the specified tables. | BOOLEAN | No | true | Valid values:
|
scan.read-changelog-as-append-only.enabled | Specifies whether to convert the changelog stream to an append-only stream. | BOOLEAN | No | false | Valid values:
|
scan.parallel-deserialize-changelog.enabled | In the incremental phase, specifies whether to use multiple threads to parse change events. | BOOLEAN | No | false | Valid values:
|
scan.parallel-deserialize-changelog.handler.size | The number of event handlers when using multiple threads to parse change events. | INTEGER | No | 2 | None. |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Specifies whether to distribute unbounded chunks first during the snapshot reading phase. | BOOLEAN | No | false | Valid values:
This is an experimental feature. Enabling it can reduce the risk of OOM errors when a TaskManager synchronizes the last chunk during the snapshot phase. Add this configuration before the job starts for the first time. |
polardbx.binlog.ignore.archive-events.enabled | Specifies whether to ignore archive events (mainly `DELETE` events) in the PolarDB-X binary log. | BOOLEAN | No | false | |
polardbx.binlog.ignore.query-events.enabled | Specifies whether to ignore query events in the PolarDB-X binary log. | BOOLEAN | No | false | |
polardbx.binlog.include.tables | Subscribes to the binary logs of only these tables. Separate multiple table names with commas (,). | STRING | No | None | |
polardbx.binlog.exclude.tables | Does not subscribe to the binary logs of these tables. Separate multiple table names with commas (,). | STRING | No | None |
Type mapping
PolarDB-X data type | Flink data type |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
INT | INT |
MEDIUMINT | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] | |
BOOLEAN | BOOLEAN |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] [WITHOUT TIME ZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIME ZONE] |
TIMESTAMP [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] WITH LOCAL TIME ZONE | |
CHAR(n) | STRING |
VARCHAR(n) | |
TEXT | |
BINARY | BYTES |
VARBINARY | |
BLOB |