This topic describes how to use the MySQL connector.
Overview
The MySQL connector supports all databases that are compatible with the MySQL protocol, including ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase (MySQL mode), and self-managed MySQL.
When you use the MySQL connector to read from OceanBase, ensure that OceanBase Binlog is enabled and correctly configured. For more information, see Related operations. This feature is in public preview. Please evaluate it thoroughly and use it with caution.
The following table describes the support for the MySQL connector.
Category | Details |
Supported type | Source table, dimension table, sink table, and data ingestion source |
Runtime mode | Streaming mode only |
Data format | Not applicable |
Specific monitoring metrics | |
API type | DataStream, SQL, and data ingestion YAML |
Update or delete data in sink tables | Supported |
Features
The MySQL CDC source first takes a consistent snapshot of the existing data in a database and then seamlessly switches to reading the binary log (binlog) for change events. This process guarantees exactly-once semantics, ensuring no data is missed or duplicated, even during failures. The MySQL CDC source table supports concurrent reading of full data and implements lock-free reading and resumable data transfer by using an incremental snapshot algorithm. For more information, see About MySQL CDC source tables.
Unified batch and stream processing: Reads both full and incremental data, which eliminates the need to maintain separate pipelines.
Concurrent full data reading: Horizontally scales performance.
Seamless switch from full to incremental reading: Automatically scales in to save computing resources.
Resumable data transfer: Supports resumable data transfer during the full data reading phase for enhanced stability.
Lock-free reading: Reads full data without affecting online business operations.
Supports reading backup logs from ApsaraDB RDS for MySQL.
Parallel parsing of binlog files reduces read latency.
Prerequisites
Before you use a MySQL CDC source table, configure your MySQL database as described in Configure MySQL. These configurations are required to use a MySQL CDC source table.
ApsaraDB RDS for MySQL
Perform a network probe with Realtime Compute for Apache Flink to ensure network connectivity.
MySQL versions: 5.6, 5.7, and 8.0.x.
Enable the binary log (binlog). This is enabled by default.
Set the binlog format to ROW. This is the default format.
Set
binlog_row_imageto FULL. This is the default setting.Disable Binary Log Transaction Compression. This feature was introduced in MySQL 8.0.20 and is disabled by default.
Create a MySQL user and grant the
SELECT,SHOW DATABASES,REPLICATION SLAVE, andREPLICATION CLIENTpermissions.Create a MySQL database and table. For more information, see Create a database and an account for ApsaraDB RDS for MySQL. Use a privileged account to create the MySQL database to avoid operational failures due to insufficient permissions.
Configure an IP whitelist. For more information, see Create a database and an account for ApsaraDB RDS for MySQL.
PolarDB for MySQL
Perform a network probe with Realtime Compute for Apache Flink to ensure network connectivity.
MySQL versions: 5.6, 5.7, and 8.0.x.
Enable the binary log (binlog). This is disabled by default.
Set the binlog format to ROW. This is the default format.
Set
binlog_row_imageto FULL. This is the default setting.Disable Binary Log Transaction Compression. This feature was introduced in MySQL 8.0.20 and is disabled by default.
Create a MySQL user and grant the
SELECT,SHOW DATABASES,REPLICATION SLAVE, andREPLICATION CLIENTpermissions.Create a MySQL database and table. For more information, see Create a database and an account for PolarDB for MySQL. Use a privileged account to create the MySQL database to avoid operational failures due to insufficient permissions.
Configure an IP whitelist. For more information, see Configure a whitelist for a PolarDB for MySQL cluster.
Self-managed MySQL
Perform a network probe with Realtime Compute for Apache Flink to ensure network connectivity.
MySQL versions: 5.6, 5.7, and 8.0.x.
Enable the binary log (binlog). This is disabled by default.
Set the binlog format to ROW. The default format is STATEMENT.
Set
binlog_row_imageto FULL. This is the default setting.Disable Binary Log Transaction Compression. This feature was introduced in MySQL 8.0.20 and is disabled by default.
Create a MySQL user and grant the
SELECT,SHOW DATABASES,REPLICATION SLAVE, andREPLICATION CLIENTpermissions.Create a MySQL database and table. For more information, see Create a database and an account for a self-managed MySQL instance. Use a privileged account to create the MySQL database to avoid operational failures due to insufficient permissions.
Configure an IP whitelist. For more information, see Configure a whitelist for a self-managed MySQL instance.
Limitations
General limitations
The MySQL CDC source table does not support defining a watermark.
In CTAS and CDAS jobs, the MySQL CDC source table can synchronize partial schema changes. For more information about the supported change types, see Schema evolution synchronization policies.
The MySQL CDC connector does not support Binary Log Transaction Compression. Therefore, when you use the MySQL CDC connector to consume incremental data, ensure that this feature is disabled. Otherwise, incremental data may fail to be retrieved.
Limitations for ApsaraDB RDS for MySQL
For ApsaraDB RDS for MySQL, we do not recommend reading data from a secondary database or a read-only replica. The binlog retention period for these instances is short by default. If the binlog expires and is cleared, the job may fail to consume binlog data and report an error.
By default, ApsaraDB RDS for MySQL enables parallel synchronization between the primary and secondary databases and does not guarantee transaction order consistency. This may cause some data to be missed during a primary-secondary switchover and checkpoint recovery. You can manually enable the
slave_preserve_commit_orderoption in ApsaraDB RDS for MySQL to resolve this issue.
Limitations for PolarDB for MySQL
MySQL CDC source tables do not support reading from a Multi-master Cluster of PolarDB for MySQL 1.0.19 or earlier. For more information, see What is a Multi-master Cluster?. The binlog generated by these clusters may contain duplicate table IDs, which can cause schema mapping errors in the CDC source table and lead to errors when parsing binlog data.
Limitations for open source MySQL
By default, MySQL maintains transaction order during primary-replica binlog replication. If a MySQL replica enables parallel replication (slave_parallel_workers> 1) but does not have slave_preserve_commit_order=ON, its transaction commit order may be inconsistent with the primary database. When Flink CDC recovers from a checkpoint, it may miss data due to this order inconsistency. We recommend setting slave_preserve_commit_order = ON on the MySQL replica, or setting slave_parallel_workers = 1, which may sacrifice replication performance.
Notes
Sink table
Do not declare an auto-increment primary key in the DDL statement. MySQL automatically populates this field when writing data.
Declare at least one non-primary key field, otherwise an error is reported.
NOT ENFORCEDin the DDL statement indicates that Flink does not perform a validity check on the primary key. You must ensure the correctness and integrity of the primary key. For more information, see Validity Check.
Dimension table
To use indexes for query acceleration, the field order in the JOIN clause must match the index definition order. This is known as the leftmost prefix rule. For example, if the index is on
(a, b, c), the JOIN condition should beON t.a = x AND t.b = y.Flink-generated SQL may be rewritten by the optimizer, which can prevent the index from being used during the actual database query. To verify whether an index is used, check the execution plan (
EXPLAIN) or the slow query log in MySQL to see the actualSELECTstatement that is executed.
SQL
You can use the MySQL connector in SQL jobs as a source table, dimension table, or sink table.
Syntax
CREATE TEMPORARY TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);How the connector writes to a sink table: For each record received, the connector constructs and executes a single SQL statement. The specific SQL statement depends on the table structure:
For a sink table without a primary key, the connector executes an
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);statement.For a sink table with a primary key, the connector executes an
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;statement. Note: If the physical table has a unique index constraint other than the primary key, inserting two records with different primary keys but the same unique index value causes a unique index conflict in the downstream database. This conflict results in data overwrites and potential data loss.
If you define an auto-increment primary key in the MySQL database, do not declare the auto-increment field in the Flink DDL statement. The database automatically populates this field during data insertion. The connector only supports writing and deleting data with an auto-increment field and does not support updates.
Connector options
General
Option
Description
Required
Data type
Default value
Remarks
connector
The type of the table.
Yes
STRING
None
When used as a source table, set this option to
mysql-cdcormysql. They are equivalent. When used as a dimension or sink table, set this option tomysql.hostname
The IP address or hostname of the MySQL database.
Yes
STRING
None
We recommend that you use a VPC endpoint.
NoteIf the MySQL database and Realtime Compute for Apache Flink are not in the same VPC, you must establish a cross-VPC network connection or use the Internet for access. For more information, see Manage and operate workspaces and How can a fully managed Flink cluster access the Internet?.
username
The username for the MySQL database service.
Yes
STRING
None
None.
password
The password for the MySQL database service.
Yes
STRING
None
None.
database-name
The name of the MySQL database.
Yes
STRING
None
When used as a source table, this option supports regular expressions to read data from multiple databases.
When you use a regular expression, avoid using the ^ and $ symbols to match the start and end of the string. For more information, see the Remarks column for the table-name option.
table-name
The name of the MySQL table.
Yes
STRING
None
When used as a source table, this option supports regular expressions to read data from multiple tables.
When you read data from multiple MySQL tables, submit multiple CTAS statements as a single job. This avoids enabling multiple Binlog listeners and improves performance and efficiency. For more information, see Multiple CTAS statements: Submit as a single job.
When you use a regular expression, avoid using the ^ and $ symbols to match the start and end of the string. For more information, see the following note.
NoteWhen a MySQL CDC source table matches table names, it combines the database-name and table-name that you specify into a full-path regular expression using the string \\. (the character . is used in VVR versions earlier than 8.0.1). It then uses this regular expression to match the fully qualified names of tables in the MySQL database.
For example, if you set 'database-name'='db_.*' and 'table-name'='tb_.+', the connector uses the regular expression db_.*\\.tb_.+ (db_.*.tb_.+ in versions earlier than 8.0.1) to match the fully qualified table names to determine which tables to read.
port
The port number of the MySQL database service.
No
INTEGER
3306
None.
Source-specific
Option
Description
Required
Data type
Default value
Remarks
server-id
A numeric ID for the database client.
No
STRING
A random value between 5400 and 6400 is generated.
This ID must be globally unique within the MySQL cluster. We recommend that you set a different ID for each job that connects to the same database.
This option also supports an ID range, such as 5400-5408. When incremental reading is enabled, concurrent reading is supported. In this case, we recommend that you set an ID range so that each concurrent reader uses a different ID. For more information, see Use of server ID.
scan.incremental.snapshot.enabled
Specifies whether to enable incremental snapshots.
No
BOOLEAN
true
Incremental snapshot is enabled by default. Incremental snapshot is a new mechanism for reading full data snapshots. Compared to the old snapshot reading method, incremental snapshots offer several advantages:
The source can read full data in parallel.
The source supports chunk-level checkpoints when reading full data.
The source does not need to acquire a global read lock (FLUSH TABLES WITH read lock) when reading full data.
If you want the source to support concurrent reading, each concurrent reader needs a unique server ID. Therefore, server-id must be a range, such as 5400-6400, and the range must be greater than or equal to the degree of parallelism.
NoteThis configuration item is removed in Ververica Runtime (VVR) 11.1 and later.
scan.incremental.snapshot.chunk.size
The size of each chunk in number of rows.
No
INTEGER
8096
When incremental snapshot reading is enabled, the table is split into multiple chunks for reading. The data of a chunk is buffered in memory before it is fully read.
A smaller number of rows per chunk results in a larger total number of chunks in the table. While this improves the granularity of fault recovery, it may lead to out-of-memory (OOM) errors and lower overall throughput. Therefore, you need to find a balance and set a reasonable chunk size.
scan.snapshot.fetch.size
The maximum number of records to fetch at a time when reading the full data of a table.
No
INTEGER
1024
None.
scan.startup.mode
The startup mode for data consumption.
No
STRING
initial
Valid values:
initial (Default): Scans the full historical data first and then reads the latest Binlog data upon the first startup.
latest-offset: Does not scan historical data upon the first startup. It starts reading from the end of the Binlog, which means it only reads the latest changes made after the connector starts.
earliest-offset: Does not scan historical data. It starts reading from the earliest available Binlog.
specific-offset: Does not scan historical data. It starts from a specific Binlog offset that you specify. You can specify the offset by configuring both scan.startup.specific-offset.file and scan.startup.specific-offset.pos, or by configuring only scan.startup.specific-offset.gtid-set to start from a specific GTID set.
timestamp: Does not scan historical data. It starts reading the Binlog from a specified timestamp. The timestamp is specified by scan.startup.timestamp-millis in milliseconds.
ImportantWhen you use the earliest-offset, specific-offset, or timestamp startup mode, make sure that the schema of the corresponding table does not change between the specified Binlog consumption position and the job startup time to avoid errors due to schema mismatch.
scan.startup.specific-offset.file
The Binlog filename of the start offset when using the specific offset startup mode.
No
STRING
None
When you use this configuration, you must set scan.startup.mode to specific-offset. The filename format is, for example,
mysql-bin.000003.scan.startup.specific-offset.pos
The offset in the specified Binlog file to start from when using the specific offset startup mode.
No
INTEGER
None
When you use this configuration, you must set scan.startup.mode to specific-offset.
scan.startup.specific-offset.gtid-set
The GTID set of the start offset when using the specific offset startup mode.
No
STRING
None
When you use this configuration, you must set scan.startup.mode to specific-offset. The GTID set format is, for example,
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.scan.startup.timestamp-millis
The start offset as a millisecond timestamp when using the timestamp startup mode.
No
LONG
None
When you use this configuration, you must set scan.startup.mode to timestamp. The timestamp is in milliseconds.
ImportantWhen you specify a time, MySQL CDC attempts to read the initial event of each Binlog file to determine its timestamp and locate the Binlog file corresponding to the specified time. Make sure that the Binlog file for the specified timestamp has not been cleared from the database and can be read.
server-time-zone
The session time zone used by the database.
No
STRING
If you do not specify this option, the system uses the time zone of the Flink job's runtime environment as the database server time zone. This is the time zone of the zone you selected.
Example: Asia/Shanghai. This option controls how TIMESTAMP types in MySQL are converted to STRING types. For more information, see Debezium temporal values.
debezium.min.row.count.to.stream.results
When the number of rows in a table exceeds this value, batch reading mode is used.
No
INTEGER
1000
Flink reads data from a MySQL source table in the following ways:
Full read: Reads the entire table's data directly into memory. This method is fast but consumes a corresponding amount of memory. If the source table is very large, it may pose an OOM risk.
Batch read: Reads data in multiple batches, with a certain number of rows per batch, until all data is read. This method avoids OOM risks when reading large tables but is relatively slower.
connect.timeout
The maximum time to wait for a connection to the MySQL database server to time out before retrying.
No
DURATION
30s
None.
connect.max-retries
The maximum number of retries after a failed connection to the MySQL database service.
No
INTEGER
3
None.
connection.pool.size
The size of the database connection pool.
No
INTEGER
20
The database connection pool is used to reuse connections, which can reduce the number of database connections.
jdbc.properties.*
Custom connection options in the JDBC URL.
No
STRING
None
You can pass custom connection options. For example, to not use the SSL protocol, you can set 'jdbc.properties.useSSL' = 'false'.
For more information about supported connection options, see MySQL Configuration Properties.
debezium.*
Custom options for Debezium to read Binlogs.
No
STRING
None
You can pass custom Debezium options. For example, use 'debezium.event.deserialization.failure.handling.mode'='ignore' to specify the handling logic for parsing errors.
heartbeat.interval
The interval at which the source uses heartbeat events to advance the Binlog offset.
No
DURATION
30s
Heartbeat events are used to advance the Binlog offset in the source, which is very useful for slowly updated tables in MySQL. For such tables, the Binlog offset does not advance automatically. Heartbeat events can push the Binlog offset forward, preventing issues where an expired Binlog offset causes the job to fail and require a stateless restart.
scan.incremental.snapshot.chunk.key-column
The column used to split chunks during the snapshot phase.
See Remarks.
STRING
None
Required for tables without a primary key. The selected column must be of a non-null type (NOT NULL).
Optional for tables with a primary key. You can only select one column from the primary key.
rds.region-id
The region ID of the ApsaraDB RDS for MySQL instance.
Required when reading archived logs from OSS.
STRING
None
For more information about region IDs, see Regions and zones.
rds.access-key-id
The AccessKey ID of the account for the ApsaraDB RDS for MySQL instance.
Required when reading archived logs from OSS.
STRING
None
For more information, see How do I view the AccessKey ID and AccessKey secret?.
ImportantTo prevent your AccessKey information from being leaked, we recommend that you manage your AccessKey ID using secrets management. For more information, see Manage variables.
rds.access-key-secret
The AccessKey secret of the account for the ApsaraDB RDS for MySQL instance.
Required when reading archived logs from OSS.
STRING
None
For more information, see How do I view the AccessKey ID and AccessKey secret?
ImportantTo prevent your AccessKey information from being leaked, we recommend that you manage your AccessKey secret using secrets management. For more information, see Manage variables.
rds.db-instance-id
The instance ID of the ApsaraDB RDS for MySQL instance.
Required when reading archived logs from OSS.
STRING
None
None.
rds.main-db-id
The ID of the primary database of the ApsaraDB RDS for MySQL instance.
No
STRING
None
For more information about how to obtain the primary database ID, see Log backup for ApsaraDB RDS for MySQL.
Supported only in VVR 8.0.7 and later.
rds.download.timeout
The timeout period for downloading a single archived log from OSS.
No
DURATION
60s
None.
rds.endpoint
The service endpoint for obtaining OSS Binlog information.
No
STRING
None
For more information about valid values, see Endpoints.
Supported only in VVR 8.0.8 and later.
scan.incremental.close-idle-reader.enabled
Specifies whether to close idle readers after the snapshot phase ends.
No
BOOLEAN
false
Supported only in VVR 8.0.1 and later.
For this configuration to take effect, you must set execution.checkpointing.checkpoints-after-tasks-finish.enabled to true.
scan.read-changelog-as-append-only.enabled
Specifies whether to convert the changelog stream to an append-only stream.
No
BOOLEAN
false
Valid values:
true: All types of messages (including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER) are converted to INSERT messages. Enable this only in special scenarios, such as when you need to save delete messages from the upstream table.
false (Default): All types of messages are sent downstream as they are.
NoteSupported only in VVR 8.0.8 and later.
scan.only.deserialize.captured.tables.changelog.enabled
During the incremental phase, specifies whether to deserialize change events only for the specified tables.
No
BOOLEAN
The default value is false in VVR 8.x versions.
The default value is true in VVR 11.1 and later.
Valid values:
true: Deserializes change data only for the target tables to accelerate Binlog reading.
false (Default): Deserializes change data for all tables.
NoteSupported only in VVR 8.0.7 and later.
When used in VVR 8.0.8 and earlier, the option name must be changed to debezium.scan.only.deserialize.captured.tables.changelog.enable.
scan.parse.online.schema.changes.enabled
During the incremental phase, specifies whether to attempt to parse RDS lock-free DDL events.
No
BOOLEAN
false
Valid values:
true: Parses RDS lock-free DDL events.
false (Default): Does not parse RDS lock-free DDL events.
This is an experimental feature. We recommend that you take a snapshot of the Flink job for recovery before performing online lock-free changes.
NoteSupported only in VVR 11.1 and later.
scan.incremental.snapshot.backfill.skip
Specifies whether to skip backfill during the snapshot reading phase.
No
BOOLEAN
false
Valid values:
true: Skips backfill during the snapshot reading phase.
false (Default): Does not skip backfill during the snapshot reading phase.
If you skip backfill, changes to the table during the snapshot phase are read in the later incremental phase instead of being merged into the snapshot.
ImportantSkipping backfill may lead to data inconsistency because changes that occur during the snapshot phase might be replayed. Only at-least-once semantics are guaranteed.
NoteSupported only in VVR 11.1 and later.
scan.incremental.snapshot.unbounded-chunk-first.enabled
Specifies whether to distribute unbounded chunks first during the snapshot reading phase.
No
BOOELEAN
false
Valid values:
true: Distributes unbounded chunks first during the snapshot reading phase.
false (Default): Does not distribute unbounded chunks first during the snapshot reading phase.
This is an experimental feature. Enabling it can reduce the risk of OOM errors when a TaskManager synchronizes the last chunk during the snapshot phase. We recommend adding this before the job's first startup.
NoteSupported only in VVR 11.1 and later.
Dimension table-specific
Option
Description
Required
Data type
Default value
Remarks
url
The MySQL JDBC URL.
No
STRING
None
The URL format is
jdbc:mysql://<endpoint>:<port>/<database_name>.lookup.max-retries
The maximum number of retries after a failed data read.
No
INTEGER
3
Supported only in VVR 6.0.7 and later.
lookup.cache.strategy
The cache policy.
No
STRING
None
Supports three cache policies: None, LRU, and ALL. For more information about the values, see Background information.
NoteWhen you use the LRU cache policy, you must also configure the lookup.cache.max-rows option.
lookup.cache.max-rows
The maximum number of cached rows.
No
INTEGER
100000
If you select the LRU cache policy, you must set the cache size.
If you select the ALL cache policy, you do not need to set the cache size.
lookup.cache.ttl
The cache time-to-live (TTL).
No
DURATION
10 s
The configuration of lookup.cache.ttl depends on lookup.cache.strategy:
If lookup.cache.strategy is set to None, you do not need to configure lookup.cache.ttl. This means the cache does not expire.
If lookup.cache.strategy is set to LRU, lookup.cache.ttl is the cache TTL. By default, it does not expire.
If lookup.cache.strategy is set to ALL, lookup.cache.ttl is the cache reload time. By default, it is not reloaded.
Use a time format, such as 1min or 10s.
lookup.max-join-rows
The maximum number of results returned when querying the dimension table for each record in the primary table.
No
INTEGER
1024
None.
lookup.filter-push-down.enabled
Specifies whether to enable filter pushdown for the dimension table.
No
BOOLEAN
false
Valid values:
true: Enables filter pushdown for the dimension table. When loading data from the MySQL database table, the dimension table filters data in advance based on the conditions set in the SQL job.
false (Default): Disables filter pushdown for the dimension table. When loading data from the MySQL database table, the dimension table loads all data.
NoteSupported only in VVR 8.0.7 and later.
ImportantFilter pushdown for a dimension table should only be enabled when the Flink table is used as a dimension table. MySQL source tables do not support enabling filter pushdown. If a Flink table is used as both a source table and a dimension table, and filter pushdown is enabled for the dimension table, you must explicitly set this configuration item to false for the source table using SQL Hints. Otherwise, the job may run abnormally.
Sink-specific
Option
Description
Required
Data type
Default value
Remarks
url
The MySQL JDBC URL.
No
STRING
None
The URL format is
jdbc:mysql://<endpoint>:<port>/<database_name>.sink.max-retries
The maximum number of retries after a failed data write.
No
INTEGER
3
None.
sink.buffer-flush.batch-size
The number of records in a single batch write.
No
INTEGER
4096
None.
sink.buffer-flush.max-rows
The number of data records buffered in memory.
No
INTEGER
10000
This option takes effect only after a primary key is specified.
sink.buffer-flush.interval
The time interval for flushing the buffer. If the data in the buffer does not meet the output conditions after the specified waiting time, the system automatically outputs all data in the buffer.
No
DURATION
1s
None.
sink.ignore-delete
Specifies whether to ignore DELETE operations.
No
BOOLEAN
false
When the stream generated by Flink SQL contains delete or update-before records, if multiple output tasks update different fields of the same table simultaneously, data inconsistency may occur.
For example, after a record is deleted, another task updates only some fields. The un-updated fields will become null or their default values, causing data errors.
By setting sink.ignore-delete to true, you can ignore upstream DELETE and UPDATE_BEFORE operations to avoid such issues.
NoteUPDATE_BEFORE is part of Flink's retraction mechanism, used to "retract" the old value in an update operation.
When ignoreDelete = true, all DELETE and UPDATE_BEFORE records are skipped. Only INSERT and UPDATE_AFTER records are processed.
sink.ignore-null-when-update
When updating data, specifies whether to update the corresponding field to null or skip the update for that field if the incoming data field is null.
No
BOOLEAN
false
Valid values:
true: Does not update the field. This option can be set to true only when a primary key is set for the Flink table. When set to true:
In VVR 8.0.6 and earlier, batch execution is not supported for writing data to the sink table.
In VVR 8.0.7 and later, batch execution is supported for writing data to the sink table.
Although batch writing can significantly improve write efficiency and overall throughput, it can cause data latency and OOM risks. Therefore, make a trade-off based on your actual business scenario.
false: Updates the field to null.
NoteThis option is supported only in VVR 8.0.5 and later.
Type mapping
CDC source table
MySQL CDC field type
Flink field type
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
BIGINT
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
BIGINT UNSIGNED
DECIMAL(20, 0)
BIGINT UNSIGNED ZEROFILL
SERIAL
FLOAT [UNSIGNED] [ZEROFILL]
FLOAT
DOUBLE [UNSIGNED] [ZEROFILL]
DOUBLE
DOUBLE PRECISION [UNSIGNED] [ZEROFILL]
REAL [UNSIGNED] [ZEROFILL]
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]
DECIMAL(p, s)
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n)
STRING
VARCHAR(n)
TEXT
BINARY
BYTES
VARBINARY
BLOB
ImportantWe recommend that you do not use the TINYINT(1) type in MySQL to store values other than 0 and 1. When
property-version=0, the MySQL CDC source table maps TINYINT(1) to Flink's BOOLEAN type by default, which can cause data inaccuracies. If you need to use the TINYINT(1) type to store values other than 0 and 1, see the configuration option catalog.table.treat-tinyint1-as-boolean.Dimension table and sink table
MySQL field type
Flink field type
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
FLOAT
FLOAT
DOUBLE
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
Notep must be less than or equal to 38.
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
ImportantFlink only supports MySQL BLOB type records that are less than or equal to 2,147,483,647 (2^31 - 1) bytes.
BLOB
MEDIUMBLOB
LONGBLOB
Data ingestion
You can use the MySQL connector as a data source in a data ingestion YAML job.
Syntax
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: <username>
password: <password>
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: xxxConnector options
Option | Description | Required | Data type | Default value | Remarks |
type | The type of the data source. | Yes | STRING | None | Set this option to mysql. |
name | The name of the data source. | No | STRING | None | None. |
hostname | The IP address or hostname of the MySQL database. | Yes | STRING | None | We recommend that you use a VPC endpoint. Note If the MySQL database and Realtime Compute for Apache Flink are not in the same VPC, you must establish a cross-VPC network connection or use the Internet for access. For more information, see Workspace and namespace management and Workspace and namespace management. |
username | The username for the MySQL database service. | Yes | STRING | None | None. |
password | The password for the MySQL database service. | Yes | STRING | None | None. |
tables | The MySQL data tables to synchronize. | Yes | STRING | None |
Note
|
tables.exclude | The tables to exclude from synchronization. | No | STRING | None |
Note A period is used to separate the database name and table name. To use a period to match any character, you must escape the period with a backslash. For example: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*. |
port | The port number of the MySQL database service. | No | INTEGER | 3306 | None. |
schema-change.enabled | Specifies whether to send schema change events. | No | BOOLEAN | true | None. |
server-id | The numeric ID or range of IDs for the database client used for synchronization. | No | STRING | A random value between 5400 and 6400 is generated. | This ID must be globally unique within the MySQL cluster. We recommend that you set a different ID for each job that connects to the same database. This option also supports an ID range, such as 5400-5408. When incremental reading is enabled, concurrent reading is supported. In this case, we recommend that you set an ID range so that each concurrent reader uses a different ID. |
jdbc.properties.* | Custom connection options in the JDBC URL. | No | STRING | None | You can pass custom connection options. For example, to not use the SSL protocol, you can set 'jdbc.properties.useSSL' = 'false'. For more information about supported connection options, see MySQL Configuration Properties. |
debezium.* | Custom options for Debezium to read Binlogs. | No | STRING | None | You can pass custom Debezium options. For example, use 'debezium.event.deserialization.failure.handling.mode'='ignore' to specify the handling logic for parsing errors. |
scan.incremental.snapshot.chunk.size | The size of each chunk in number of rows. | No | INTEGER | 8096 | A MySQL table is split into multiple chunks for reading. The data of a chunk is buffered in memory before it is fully read. A smaller number of rows per chunk results in a larger total number of chunks in the table. While this improves the granularity of fault recovery, it may lead to OOM errors and lower overall throughput. Therefore, you need to find a balance and set a reasonable chunk size. |
scan.snapshot.fetch.size | The maximum number of records to fetch at a time when reading the full data of a table. | No | INTEGER | 1024 | None. |
scan.startup.mode | The startup mode for data consumption. | No | STRING | initial | Valid values:
Important For the earliest-offset, specific-offset, and timestamp startup modes, if the table schema at the startup time is different from the schema at the specified start offset time, the job will fail due to a schema mismatch. In other words, when using these three startup modes, you must ensure that the schema of the corresponding table does not change between the specified Binlog consumption position and the job startup time. |
scan.startup.specific-offset.file | The Binlog filename of the start offset when using the specific offset startup mode. | No | STRING | None | When you use this configuration, you must set scan.startup.mode to specific-offset. The filename format is, for example, |
scan.startup.specific-offset.pos | The offset in the specified Binlog file to start from when using the specific offset startup mode. | No | INTEGER | None | When you use this configuration, you must set scan.startup.mode to specific-offset. |
scan.startup.specific-offset.gtid-set | The GTID set of the start offset when using the specific offset startup mode. | No | STRING | None | When you use this configuration, you must set scan.startup.mode to specific-offset. The GTID set format is, for example, |
scan.startup.timestamp-millis | The start offset as a millisecond timestamp when using the timestamp startup mode. | No | LONG | None | When you use this configuration, you must set scan.startup.mode to timestamp. The timestamp is in milliseconds. Important When you specify a time, MySQL CDC attempts to read the initial event of each Binlog file to determine its timestamp and locate the Binlog file corresponding to the specified time. Make sure that the Binlog file for the specified timestamp has not been cleared from the database and can be read. |
server-time-zone | The session time zone used by the database. | No | STRING | If you do not specify this option, the system uses the time zone of the Flink job's runtime environment as the database server time zone. This is the time zone of the zone you selected. | Example: Asia/Shanghai. This option controls how TIMESTAMP types in MySQL are converted to STRING types. For more information, see Debezium temporal values. |
scan.startup.specific-offset.skip-events | The number of Binlog events to skip when reading from a specific offset. | No | INTEGER | None | When you use this configuration, you must set scan.startup.mode to specific-offset. |
scan.startup.specific-offset.skip-rows | The number of row changes to skip when reading from a specific offset. A single Binlog event may correspond to multiple row changes. | No | INTEGER | None | When you use this configuration, you must set scan.startup.mode to specific-offset. |
connect.timeout | The maximum time to wait for a connection to the MySQL database server to time out before retrying. | No | DURATION | 30s | None. |
connect.max-retries | The maximum number of retries after a failed connection to the MySQL database service. | No | INTEGER | 3 | None. |
connection.pool.size | The size of the database connection pool. | No | INTEGER | 20 | The database connection pool is used to reuse connections, which can reduce the number of database connections. |
heartbeat.interval | The interval at which the source uses heartbeat events to advance the Binlog offset. | No | DURATION | 30s | Heartbeat events are used to advance the Binlog offset in the source, which is very useful for slowly updated tables in MySQL. For such tables, the Binlog offset does not advance automatically. Heartbeat events can push the Binlog offset forward, preventing issues where an expired Binlog offset causes the job to fail and require a stateless restart. |
scan.incremental.snapshot.chunk.key-column | The column used to split chunks during the snapshot phase. | No. | STRING | None | You can only select one column from the primary key. |
rds.region-id | The region ID of the ApsaraDB RDS for MySQL instance. | Required when reading archived logs from OSS. | STRING | None | For more information about region IDs, see Regions and zones. |
rds.access-key-id | The AccessKey ID of the account for the ApsaraDB RDS for MySQL instance. | Required when reading archived logs from OSS. | STRING | None | For more information, see How do I view the AccessKey ID and AccessKey secret? Important To prevent your AccessKey information from being leaked, we recommend that you manage your AccessKey ID using secrets management. For more information, see Manage variables. |
rds.access-key-secret | The AccessKey secret of the account for the ApsaraDB RDS for MySQL instance. | Required when reading archived logs from OSS. | STRING | None | For more information, see How do I view the AccessKey ID and AccessKey secret? Important To prevent your AccessKey information from being leaked, we recommend that you manage your AccessKey secret using secrets management. For more information, see Manage variables. |
rds.db-instance-id | The instance ID of the ApsaraDB RDS for MySQL instance. | Required when reading archived logs from OSS. | STRING | None | None. |
rds.main-db-id | The ID of the primary database of the ApsaraDB RDS for MySQL instance. | No | STRING | None | For more information about how to obtain the primary database ID, see Log backup for ApsaraDB RDS for MySQL. |
rds.download.timeout | The timeout period for downloading a single archived log from OSS. | No | DURATION | 60s | None. |
rds.endpoint | The service endpoint for obtaining OSS Binlog information. | No | STRING | None | For more information about valid values, see Endpoints. |
rds.binlog-directory-prefix | The directory prefix for storing Binlog files. | No | STRING | rds-binlog- | None. |
rds.use-intranet-link | Specifies whether to use an internal network to download Binlog files. | No | BOOLEAN | true | None. |
rds.binlog-directories-parent-path | The absolute path of the parent directory where Binlog files are stored. | No | STRING | None | None. |
chunk-meta.group.size | The size of the chunk metadata. | No | INTEGER | 1000 | If the metadata is larger than this value, it is transmitted in multiple parts. |
chunk-key.even-distribution.factor.lower-bound | The lower bound of the chunk distribution factor for even splitting. | No | DOUBLE | 0.05 | If the distribution factor is less than this value, uneven splitting is used. Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of rows. |
chunk-key.even-distribution.factor.upper-bound | The upper bound of the chunk distribution factor for even splitting. | No | DOUBLE | 1000.0 | If the distribution factor is greater than this value, uneven splitting is used. Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of rows. |
scan.incremental.close-idle-reader.enabled | Specifies whether to close idle readers after the snapshot phase ends. | No | BOOLEAN | false | For this configuration to take effect, you must set |
scan.only.deserialize.captured.tables.changelog.enabled | During the incremental phase, specifies whether to deserialize change events only for the specified tables. | No | BOOLEAN |
| Valid values:
|
metadata-column.include-list | The metadata columns to pass to the downstream. | No | STRING | None | Available metadata includes Note The MySQL CDC YAML connector does not need to and does not support adding the Important The |
scan.newly-added-table.enabled | When restarting from a checkpoint, specifies whether to synchronize newly added tables that were not matched in the previous run or to remove currently unmatched tables that are saved in the state. | No | BOOLEAN | false | This takes effect when restarting from a checkpoint or savepoint. |
scan.binlog.newly-added-table.enabled | During the incremental phase, specifies whether to send data from newly added tables that are matched. | No | BOOLEAN | false | Cannot be enabled at the same time as |
scan.incremental.snapshot.chunk.key-column | Specifies a column for certain tables to be used as the splitting key for chunks during the snapshot phase. | No | STRING | None |
|
scan.parse.online.schema.changes.enabled | Specifies whether to parse RDS lock-free DDL events during the incremental phase. | No | BOOLEAN | false | Valid values:
This is an experimental feature. Before performing an online lock-free DDL change, take a snapshot of the Flink job for recovery. Note This feature is supported only by Flink compute engine Ververica Runtime (VVR) 11.0 and later. |
scan.incremental.snapshot.backfill.skip | Specifies whether to skip the backfill during the snapshot read phase. | No | BOOLEAN | false | Valid values:
If the backfill is skipped, changes to the table during the snapshot phase are read in a later incremental phase instead of being merged into the snapshot. Important Skipping the backfill may cause data inconsistency because changes that occur during the snapshot phase may be replayed. This only guarantees at-least-once semantics. Note This option is supported only by Flink compute engine Ververica Runtime (VVR) 11.1 and later. |
treat-tinyint1-as-boolean.enabled | Specifies whether to treat the TINYINT(1) type as the Boolean type. | No | BOOLEAN | true | The valid values are:
|
treat-timestamp-as-datetime-enabled | Specifies whether to process | No | BOOLEAN | false | Valid values:
MySQL Enabling this will convert MySQL |
include-comments.enabled | Specifies whether to sync table and field comments. | No | BOOLEAN | false | Valid values:
Enabling this option increases the memory usage of the job. |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Specifies whether to distribute unbounded shards first during the snapshot read phase. | No | BOOLEAN | false | The valid values are as follows:
This is an experimental feature. Enabling this feature reduces the risk of an out-of-memory (OOM) error when a TaskManager synchronizes the last shard during the snapshot phase. Add this option before the job starts for the first time. Note This feature is available only in Flink compute engine Ververica Runtime (VVR) 11.1 and later. |
Type mapping
The following table shows the type mappings for data ingestion.
MySQL CDC field type | CDC field type |
TINYINT(n) | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
YEAR | |
INT | INT |
MEDIUMINT | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | |
BOOLEAN | BOOLEAN |
BIT(1) | |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] |
DATETIME [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] | The Flink CDC type is determined by the
|
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | STRING Note MySQL supports a decimal precision up to 65. Flink limits decimal precision to 38. If a decimal column has a precision greater than 38, map it to a string to prevent precision loss. |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | |
TINYTEXT | STRING |
TEXT | |
MEDIUMTEXT | |
LONGTEXT | |
ENUM | |
JSON | STRING Note The JSON data type is transformed into a JSON-formatted string in Flink. |
GEOMETRY | STRING Note MySQL spatial data types are transformed into strings with a fixed JSON format. For more information, see MySQL spatial data type mapping. |
POINT | |
LINESTRING | |
POLYGON | |
MULTIPOINT | |
MULTILINESTRING | |
MULTIPOLYGON | |
GEOMETRYCOLLECTION | |
TINYBLOB | BYTES Note For MySQL BLOB data types, the maximum supported length is 2,147,483,647 (2**31 - 1). |
BLOB | |
MEDIUMBLOB | |
LONGBLOB |
Examples
CDC source table
CREATE TEMPORARY TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( order_id INT, customer_name STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT order_id, customer_name FROM mysqlcdc_source;Dimension table
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;Sink table
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); INSERT INTO mysql_sink SELECT * FROM datagen_source;Data ingestion source
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
MySQL CDC source tables
How it works
When the MySQL CDC source table starts, it scans the entire table and splits it into multiple chunks based on the primary key, recording the current binlog position. It then uses the incremental snapshot algorithm to read the data from each chunk one by one by using
SELECTstatements. The job periodically performs checkpoints to record the completed chunks. If a failover occurs, the job only needs to continue reading the unfinished chunks. After all chunks are read, it starts reading incremental change records from the previously recorded binlog position. The Flink job continues to perform periodic checkpoints to record the binlog position. If the job fails over, it resumes processing from the last recorded binlog position, thus achieving exactly-once semantics.For a more detailed explanation of the incremental snapshot algorithm, see MySQL CDC Connector.
Metadata
Metadata is highly useful in scenarios where you merge sharded databases and tables. After merging, businesses often still need to identify the source database and table for each row of data. Metadata columns allow you to access this information. Therefore, you can easily merge multiple sharded tables into a single destination table by using metadata columns.
The MySQL CDC Source supports metadata column syntax. You can access the following metadata through metadata columns.
Metadata key
Metadata type
Description
database_name
STRING NOT NULL
The name of the database that contains the row.
table_name
STRING NOT NULL
The name of the table that contains the row.
op_ts
TIMESTAMP_LTZ(3) NOT NULL
The time when the row was changed in the database. If the record is from the table's existing historical data instead of the Binlog, this value is always 0.
op_type
STRING NOT NULL
The change type of the row.
+I: INSERT message
-D: DELETE message
-U: UPDATE_BEFORE message
+U: UPDATE_AFTER message
NoteSupported only in Ververica Runtime (VVR) 8.0.7 and later.
query_log
STRING NOT NULL
The MySQL query log record corresponding to the row read.
NoteTo log queries, MySQL requires enabling
binlog_rows_query_log_events.The following example shows how to merge multiple
orderstables from different sharded databases in a MySQL instance and synchronize them to aholo_orderstable in Hologres.CREATE TEMPORARY TABLE mysql_orders ( db_name STRING METADATA FROM 'database_name' VIRTUAL, -- Read the database name. table_name STRING METADATA FROM 'table_name' VIRTUAL, -- Read the table name. operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Read the change timestamp. op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Read the change type. order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb_.*', -- Match multiple sharded databases using a regular expression. 'table-name' = 'orders_.*' -- Match multiple sharded tables using a regular expression. ); INSERT INTO holo_orders SELECT * FROM mysql_orders;Based on the code above, if you set the scan.read-changelog-as-append-only.enabled option to true in the WITH clause, the output varies based on the primary key settings of the downstream table:
If the primary key of the downstream table is order_id, the output includes only the last change for each primary key in the source table. For example, if the last change for a primary key was a delete operation, you see a record in the downstream table with the same primary key and an op_type of -D.
If the primary key of the downstream table is a composite of order_id, operation_ts, and op_type, the output includes the complete change history for each primary key in the source table.
Regular expression support
The MySQL CDC source table supports using regular expressions in the table name or database name to match multiple tables or databases. The following example shows how to specify multiple tables by using a regular expression.
CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Match multiple databases using a regular expression. 'table-name' = '(t[5-8]|tt)' -- Match multiple tables using a regular expression. );Explanation of the regular expressions in the preceding example:
^(test).*is a prefix match example. This expression can match database names that start withtest, such astest1ortest2..*[p$]is a suffix match example. This expression can match database names that end withp, such ascdcporedcp.txcis a specific match. It can match a specific database name, such astxc.
When matching a fully qualified table name, MySQL CDC uses both the database name and table name to uniquely identify a table. It uses the pattern
database-name.table-namefor matching. For example, the pattern(^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[ 5-8]|tt)can match tables such astxc.ttandtest2.test5in the database.ImportantIn SQL job configurations, the
table-nameanddatabase-nameoptions do not support using a comma (,) to specify multiple tables or databases.To match multiple tables or use multiple regular expressions, connect them with a vertical bar (
|) and enclose them in parentheses. For example, to read theuserandproducttables, you can configuretable-nameas(user|product).If a regular expression contains a comma, you must rewrite it using the vertical bar (
|) operator. For example, the regular expressionmytable_\d{1, 2}needs to be rewritten as the equivalent(mytable_\d{1}|mytable_\d{2})to avoid using a comma.
Concurrency control
The MySQL connector supports reading full data with multiple concurrencies, which improves data loading efficiency. When combined with Autopilot in the Realtime Compute for Apache Flink console, it can automatically scale in during the incremental phase after the multi-concurrency reading is complete, thereby saving computing resources.
In the Realtime Compute development console, you can set the job's parallelism on the Resource Configuration page in either Basic mode or Expert mode. The differences are as follows:
The parallelism set in Basic mode is the global parallelism for the entire job.

Expert mode allows you to set the parallelism for a specific VERTEX as needed.

For more information about resource configuration, see Configure a job deployment.
ImportantWhether you use Basic mode or Expert mode, the
server-idrange declared in the table must be greater than or equal to the job's parallelism. For example, if theserver-idrange is5404-5412, there are 8 unique server IDs, so the job can have a maximum of 8 concurrencies. Different jobs for the same MySQL instance must have non-overlappingserver-idranges, meaning each job must explicitly configure a differentserver-id.Autopilot auto-scaling
The full data phase accumulates a large amount of historical data. To improve reading efficiency, historical data is typically read concurrently. In the incremental binlog phase, however, because the volume of binlog data is small and a global order must be maintained, a single concurrency is usually sufficient. Autopilot can automatically balance performance and resources to meet these different requirements of the full and incremental phases.
Autopilot monitors the traffic of each task in the MySQL CDC Source. When the job enters the binlog phase, if only one task is responsible for reading the binlog and the other tasks are idle, Autopilot will automatically reduce the CU count and parallelism of the Source. To enable Autopilot, set the Autopilot mode to Active on the job's Operations and Maintenance page.
NoteThe minimum trigger interval for reducing parallelism is 24 hours by default. For more information on Autopilot options and details, see Configure Autopilot.
Startup modes
You can use the
scan.startup.modeoption to specify the startup mode for the MySQL CDC source table. The options include:initial(default): Performs a full read of the database table upon the first startup, and then switches to incremental mode to read the binlog.earliest-offset: Skips the snapshot phase and starts reading from the earliest available binlog position.latest-offset: Skips the snapshot phase and starts reading from the end of the binlog. In this mode, the source table can only read data changes that occur after the job starts.specific-offset: Skips the snapshot phase and starts reading from a specified binlog position. The position can be specified by the binlog filename and position, or by using a GTID set.timestamp: Skips the snapshot phase and starts reading binlog events from a specified timestamp.
Example:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'scan.startup.mode' = 'earliest-offset', -- Start from the earliest offset. 'scan.startup.mode' = 'latest-offset', -- Start from the latest offset. 'scan.startup.mode' = 'specific-offset', -- Start from a specific offset. 'scan.startup.mode' = 'timestamp', -- Start from a specific timestamp. 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Specify the Binlog file name for the specific-offset mode. 'scan.startup.specific-offset.pos' = '4', -- Specify the Binlog position for the specific-offset mode. 'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Specify the GTID set for the specific-offset mode. 'scan.startup.timestamp-millis' = '1667232000000' -- Specify the startup timestamp for the timestamp mode. ... )ImportantThe MySQL source logs the current position at the INFO level during a checkpoint. The log prefix is
Binlog offset on checkpoint {checkpoint-id}. This log can help you restart the job from a specific checkpoint position.If the schema of the table being read has changed in the past, starting from the
earliest-offset, aspecific-offset, or atimestampmay cause errors. This is because the Debezium reader internally stores the latest schema, and older data with a mismatched schema cannot be parsed correctly.
Keyless CDC source tables
To use a keyless table, you must set
scan.incremental.snapshot.chunk.key-columnand can only choose a non-nullable column.The processing semantics of a keyless CDC source table are determined by the behavior of the column specified in
scan.incremental.snapshot.chunk.key-column:If the specified column is not updated, exactly-once semantics can be guaranteed.
If the specified column is updated, only at-least-once semantics can be guaranteed. However, you can achieve data correctness by combining it with a downstream system, specifying a downstream primary key, and using idempotent operations.
Read backup logs from ApsaraDB RDS for MySQL
The MySQL CDC source table supports reading backup logs from Alibaba Cloud ApsaraDB RDS for MySQL. This is particularly useful in scenarios where the full snapshot phase takes a long time, causing local binlog files to be automatically cleaned up, while automatically or manually uploaded backup files still exist.
Example:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'rds.region-id' = 'cn-beijing', 'rds.access-key-id' = 'xxxxxxxxx', 'rds.access-key-secret' = 'xxxxxxxxx', 'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 'rds.main-db-id' = '12345678', 'rds.download.timeout' = '60 s' ... )Enable CDC source reuse
When a single job has multiple MySQL CDC source tables, each source table starts a corresponding binlog client. If there are many source tables and they all read from the same MySQL instance, it can put significant pressure on the database. For more information, see MySQL CDC FAQ.
VVR 8.0.7+ versions support MySQL CDC source reuse. Different CDC source tables can be merged if all their configuration options, except for the database, table name, and server-id, are identical. After enabling source reuse, VVR will merge as many compatible MySQL CDC source tables as possible within the same job.
Procedure
Enable the source reuse feature in your SQL job draft using the SET command:
SET 'table.optimizer.source-merge.enabled' = 'true';Start the job without states. After enabling source reuse for an existing job, you must perform a stateless restart. This is because source reuse changes the job topology, and restarting from the old job state may fail or lead to data loss. If source reuse occurs, a
MergetableSourceScanoperator will show.
ImportantAfter you enable CDC source reuse, do not set
pipeline.operator-chainingtofalse. Disabling operator chaining adds serialization and deserialization overhead. The more sources are merged, the greater the overhead becomes.In VVR 8.0.7, disabling operator chaining causes a serialization issue.
Accelerate binlog reading
When you use the MySQL connector as a source table or a data ingestion source, it parses binlog files to generate various change messages during the incremental phase. The binlog file records all table changes in a binary format. You can accelerate binlog file parsing in the following ways.
Enable parsing filter configuration
Use the
scan.only.deserialize.captured.tables.changelog.enabledoption to parse change events only for the specified tables.
Optimize Debezium options
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50debezium.max.queue.size: The maximum number of records that the blocking queue can hold. When Debezium reads an event stream from the database, it places the events in a blocking queue before writing them downstream. The default value is 8192.debezium.max.batch.size: The maximum number of events that the connector processes in each iteration. The default value is 2048.debezium.poll.interval.ms: The number of milliseconds the connector should wait before requesting new change events. The default value is 1000 milliseconds, or 1 second.
Example:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Debezium configuration
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- Enable parsing filter
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- Parses change events only for specified tables.
...
)source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
# Debezium configuration
debezium.max.queue.size: 162580
debezium.max.batch.size: 40960
debezium.poll.interval.ms: 50
# Enable parsing filter
scan.only.deserialize.captured.tables.changelog.enabled: trueThe enterprise version of MySQL CDC has a binlog consumption capacity of 85 MB/s, which is about twice that of the open-source community version. If the binlog generation speed exceeds 85 MB/s (equivalent to one 512 MB file every 6 seconds), the Flink job's latency will continue to increase. The processing latency will gradually decrease after the binlog generation speed slows down. When the binlog file contains large transactions, processing latency may temporarily increase and will decrease after the transaction's log is read.
MySQL CDC DataStream API
To read and write data by using the DataStream API, you must use the corresponding DataStream connector to connect to Flink. For information on how to set up the DataStream connector, see Integrate and use connectors in DataStream programs.
The following examples show how to create a DataStream API program and use MySqlSource, including the required pom dependencies.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // Set the database to capture.
.tableList("yourDatabaseName.yourTableName") // Set the table to capture.
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // Converts a SourceRecord to a JSON string.
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing.
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// Set 4 parallel source tasks.
.setParallelism(4)
.print().setParallelism(1); // Use a parallelism of 1 for the sink to maintain message order.
env.execute("Print MySQL Snapshot + Binary Log");
}
}<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>When you build MySqlSource, you can specify the following parameters in your code:
Parameter | Description |
hostname | The IP address or hostname of the MySQL database. |
port | The port number of the MySQL database service. |
databaseList | The name of the MySQL database. Note The database name supports regular expressions to read data from multiple databases. Use |
username | The username for the MySQL database service. |
password | The password for the MySQL database service. |
deserializer | A deserializer that deserializes records of the SourceRecord type to a specified type. The parameter can be set to one of the following values:
|
You must specify the following parameters in your pom dependencies:
${vvr.version} | The version of the Realtime Compute for Apache Flink engine. For example, Note Use the version number displayed on Maven. We periodically release hotfix versions, and these updates might not be announced through other channels. |
${flink.version} | The Apache Flink version. For example: Important Ensure that your Apache Flink version matches the engine version of Realtime Compute for Apache Flink to avoid incompatibility issues during job runtime. For more information about the version mapping, see Engine. |
FAQ
For information about issues you might encounter when using a CDC source table, see CDC issues.