This topic describes how to use the MySQL connector.
Background information
The MySQL connector supports all databases that are compatible with the MySQL protocol, including ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase (MySQL mode), and self-managed MySQL.
When you use the MySQL connector to read from OceanBase, ensure that binary logging (binlog) for OceanBase is enabled and correctly configured. For more information, see Related operations. This feature is in public preview. We recommend that you evaluate it thoroughly and use it with caution.
The MySQL connector supports the following.
Category | Details |
Supported type | Source table, dimension table, sink table, and data ingestion source |
Runtime mode | Streaming mode only |
Data format | Not applicable |
Specific monitoring metrics | |
API type | DataStream, SQL, and data ingestion YAML |
Can I update or delete data in a sink table? | Yes |
Features
The MySQL Change Data Capture (CDC) source table is a streaming source table that first reads the full historical data from a database. It then smoothly switches to reading the binlog to ensure that data is not read more than once or missed. Even if a failure occurs, the data is processed with exactly-once semantics. The MySQL CDC source table supports the concurrent reading of full data and uses an incremental snapshot algorithm to achieve lock-free reading and resumable data transfer. For more information, see About MySQL CDC source tables.
Unified batch and stream processing: Reads both full and incremental data, eliminating the need for separate pipelines.
Concurrent full data reading: Horizontally scales performance.
Seamless switch from full to incremental reading: Automatically scales in to save computing resources.
Resumable data transfer: Supports resumable data transfer during the full data reading phase for enhanced stability.
Lock-free reading: Reads full data without affecting online business operations.
Supports reading backup logs from ApsaraDB RDS for MySQL.
Parallel parsing of binlog files reduces read latency.
Prerequisites
You must configure your MySQL database as described in Configure MySQL before you can use a MySQL CDC source table.
ApsaraDB RDS for MySQL
Perform a network probe with Realtime Compute for Apache Flink to ensure network connectivity.
MySQL versions: 5.6, 5.7, and 8.0.x.
Enable the binlog. This is enabled by default.
Set the binlog format to ROW. This is the default format.
Set binlog_row_image to FULL. This is the default setting.
Disable Binary Log Transaction Compression. This feature was introduced in MySQL 8.0.20 and is disabled by default.
A MySQL user has been created with the SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT permissions.
Create a MySQL database and table. For more information, see Create a database and an account for ApsaraDB RDS for MySQL. Use a privileged account to create the MySQL database to prevent operational failures caused by insufficient permissions.
Configure an IP address whitelist. For more information, see Configure a whitelist for ApsaraDB RDS for MySQL.
PolarDB for MySQL
Perform a network probe with Realtime Compute for Apache Flink to ensure network connectivity.
MySQL versions: 5.6, 5.7, and 8.0.x.
Enable the binlog. This is disabled by default.
Set the binlog format to ROW. This is the default format.
Set binlog_row_image to FULL. This is the default setting.
Disable Binary Log Transaction Compression. This feature was introduced in MySQL 8.0.20 and is disabled by default.
A MySQL user has been created with the SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT permissions.
Create a MySQL database and table. For more information, see Create a database and an account for PolarDB for MySQL. Use a privileged account to create the MySQL database to prevent operational failures caused by insufficient permissions.
Configure an IP address whitelist. For more information, see Configure a whitelist for a PolarDB for MySQL cluster.
Self-managed MySQL
Perform a network probe with Realtime Compute for Apache Flink to ensure network connectivity.
MySQL versions: 5.6, 5.7, and 8.0.x.
Enable the binlog. This is disabled by default.
Set the binlog format to ROW. The default format is STATEMENT.
Set binlog_row_image to FULL. This is the default setting.
Disable Binary Log Transaction Compression. This feature was introduced in MySQL 8.0.20 and is disabled by default.
A MySQL user has been created with the SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT permissions.
Create a MySQL database and table. For more information, see Create a database and an account for a self-managed MySQL instance. Use a privileged account to create the MySQL database to prevent operational failures caused by insufficient permissions.
Configure an IP address whitelist. For more information, see Configure a whitelist for a self-managed MySQL instance.
Limits
General limits
The MySQL CDC source table does not support defining a watermark.
In CREATE TABLE AS SELECT (CTAS) and CREATE DATABASE AS SELECT (CDAS) jobs, the MySQL CDC source table supports partial schema change synchronization. For more information about the supported change types, see Schema evolution synchronization policies.
The MySQL CDC connector does not support Binary Log Transaction Compression. Therefore, when you use the MySQL CDC connector to consume incremental data, ensure that this feature is disabled. Otherwise, incremental data retrieval may fail.
Limits for ApsaraDB RDS for MySQL
For ApsaraDB RDS for MySQL, we do not recommend reading data from a secondary database or a read-only replica. The binlog retention period for these instances is short by default. If the binlog expires and is cleared, the job may fail to consume binlog data, causing an error.
By default, ApsaraDB RDS for MySQL enables parallel replication between the primary and secondary databases and does not guarantee transaction order consistency. This may cause some data to be lost during a primary-secondary switchover and checkpoint recovery. You can manually enable the slave_preserve_commit_order option in ApsaraDB RDS for MySQL to address this issue.
Limits for PolarDB for MySQL
MySQL CDC source tables do not support reading from a Multi-master Cluster of PolarDB for MySQL 1.0.19 or earlier. For more information, see What is a Multi-master Cluster?. The binlog generated by these clusters may contain duplicate table IDs, which can cause schema mapping errors in the CDC source table, leading to errors during binlog data parsing.
Limits for open source MySQL
By default, MySQL maintains transaction order during primary-replica Binlog replication. If a MySQL replica enables parallel replication (slave_parallel_workers > 1) but does not have slave_preserve_commit_order = ON, its transaction commit order may be inconsistent with the primary database. When Flink CDC recovers from a checkpoint, it may miss data due to this order inconsistency. We recommend that you set slave_preserve_commit_order = ON on the MySQL replica, or set slave_parallel_workers = 1. The latter option may sacrifice replication performance.
Notes
Sink table
Do not declare an auto-increment primary key in the DDL statement. MySQL automatically populates this field when writing data.
Declare at least one non-primary key field. Otherwise, an error is reported.
NOT ENFORCED in the DDL statement indicates that Flink does not perform a validity check on the primary key. You must ensure the correctness and integrity of the primary key. For more information, see Validity Check.
Dimension table
To use indexes for query acceleration, the field order in the JOIN clause must match the index definition order. This is known as the leftmost prefix rule. For example, if the index is on (a, b, c), the JOIN condition should be
ON t.a = x AND t.b = y.Flink-generated SQL may be rewritten by the optimizer, which can prevent the index from being used during the actual database query. To verify whether an index is used, check the execution plan (EXPLAIN) or the slow query log in MySQL to see the actual SELECT statement that is executed.
SQL
You can use the MySQL connector in SQL jobs as a source table, dimension table, or sink table.
Syntax
CREATE TEMPORARY TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);How the connector writes to a sink table: For each record received, the connector constructs and executes a single SQL statement. The specific SQL statement depends on the table schema:
For a sink table without a primary key, the connector executes an
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);statement.For a sink table with a primary key, the connector executes an
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;statement. Note: If the physical table has a unique index constraint other than the primary key, inserting two records with different primary keys but the same unique index value may cause a unique index conflict in the downstream database, leading to data overwrites and potential data loss.
If you define an auto-increment primary key in the MySQL database, do not declare the auto-increment field in the Flink DDL statement. The database automatically populates this field during data insertion. The connector only supports writing and deleting data with an auto-increment field and does not support updates.
WITH parameters
General
Parameter
Description
Required
Data type
Default value
Remarks
connector
The type of the table.
Yes
STRING
None
When used as a source table, set this option to
mysql-cdcormysql. They are equivalent. When used as a dimension or sink table, set this option tomysql.hostname
The IP address or hostname of the MySQL database.
Yes
STRING
None
We recommend entering the VPC address.
NoteIf the MySQL database and Realtime Compute for Apache Flink are not in the same VPC, you must establish a cross-VPC network connection or use the public network for access. For more information, see Manage and operate workspaces and How can a fully managed Flink cluster access the public network?.
username
The username for the MySQL database service.
Yes
STRING
None
None.
password
The password for the MySQL database service.
Yes
STRING
None
None.
database-name
The name of the MySQL database.
Yes
STRING
None
When you use a database as a source table, you can use a regular expression for the database name to read data from multiple databases.
When you use a regular expression, avoid using the ^ and $ symbols to match the start and end of the string. For more information, see the Remarks column for the table-name option.
table-name
The name of the MySQL table.
Yes
STRING
None
You can use a regular expression for the source table name to read data from multiple tables.
When you read data from multiple MySQL tables, submit multiple CTAS statements as a single job. This avoids enabling multiple binary log listeners and improves performance and efficiency. For more information, see Multiple CTAS statements: Submit as a single job.
When you use a regular expression, avoid using the ^ and $ symbols to match the start and end of the string. For more information, see the following note.
NoteWhen a MySQL CDC source table matches table names, it combines the database-name and table-name that you specify into a full-path regular expression using the string \\. (the character . is used in VVR versions earlier than 8.0.1). It then uses this regular expression to match the fully qualified names of tables in the MySQL database.
For example, if you set 'database-name'='db_.*' and 'table-name'='tb_.+', the connector uses the regular expression db_.*\\.tb_.+ (db_.*.tb_.+ in versions earlier than 8.0.1) to match the fully qualified table names to determine which tables to read.
port
The port number of the MySQL database service.
No
INTEGER
3306
None.
Source-specific
Parameter
Description
Required
Data type
Default value
Remarks
server-id
A numeric ID for the database client.
No
STRING
A random value between 5400 and 6400 is generated.
This ID must be globally unique within the MySQL cluster. We recommend that you set a different ID for each job that connects to the same database.
This option also supports an ID range, such as 5400-5408. When incremental reading is enabled, concurrent reading is supported. In this case, we recommend that you set an ID range so that each concurrent reader uses a different ID. For more information, see Use of Server ID.
scan.incremental.snapshot.enabled
Specifies whether to enable incremental snapshots.
No
BOOLEAN
true
Incremental snapshot is enabled by default. Incremental snapshot is a new mechanism for reading full data snapshots. Compared to the old snapshot reading method, incremental snapshots offer several advantages:
The source can read full data in parallel.
The source supports chunk-level checkpoints when reading full data.
The source does not need to acquire a global read lock (FLUSH TABLES WITH read lock) when reading full data.
If you want the source to support concurrent reading, each concurrent reader needs a unique server ID. Therefore, server-id must be a range, such as 5400-6400, and the range must be greater than or equal to the degree of parallelism.
NoteThis configuration item is removed in Ververica Runtime (VVR) 11.1 and later.
scan.incremental.snapshot.chunk.size
The size of each chunk in number of rows.
No
INTEGER
8096
When incremental snapshot reading is enabled, the table is split into multiple chunks for reading. The data of a chunk is buffered in memory before it is fully read.
The fewer rows a chunk contains, the greater the total number of chunks in the table. Although this reduces the granularity of fault recovery, it can cause out-of-memory (OOM) issues and reduce overall throughput. Therefore, you need to weigh the trade-offs and set a reasonable chunk size.
scan.snapshot.fetch.size
The maximum number of records to fetch at a time when reading the full data of a table.
No
INTEGER
1024
None.
scan.startup.mode
The startup mode for data consumption.
No
STRING
initial
Valid values:
initial (Default): Scans the full historical data first and then reads the latest binlog data upon the first startup.
latest-offset: Does not scan historical data upon the first startup. It starts reading from the end of the binlog, which means it only reads the latest changes made after the connector starts.
earliest-offset: Does not scan historical data. It starts reading from the earliest available binlog.
specific-offset: Does not scan historical data. It starts from a specific binlog offset that you specify. You can specify the offset by configuring both scan.startup.specific-offset.file and scan.startup.specific-offset.pos, or by configuring only scan.startup.specific-offset.gtid-set to start from a specific GTID set.
timestamp: Does not scan historical data. It starts reading the binlog from a specified timestamp. The timestamp is specified by scan.startup.timestamp-millis in milliseconds.
ImportantWhen you use the earliest-offset, specific-offset, or timestamp startup mode, make sure that the schema of the corresponding table does not change between the specified binlog consumption position and the job startup time to avoid errors due to schema mismatch.
scan.startup.specific-offset.file
The binlog filename of the start offset when using the specific offset startup mode.
No
STRING
None
When you use this configuration, you must set scan.startup.mode to specific-offset. The filename format is, for example,
mysql-bin.000003.scan.startup.specific-offset.pos
The offset in the specified binlog file to start from when using the specific offset startup mode.
No
INTEGER
None
When you use this configuration, you must set scan.startup.mode to specific-offset.
scan.startup.specific-offset.gtid-set
The GTID set of the start offset when using the specific offset startup mode.
No
STRING
None
When you use this configuration, you must set scan.startup.mode to specific-offset. The GTID set format is, for example,
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.scan.startup.timestamp-millis
The start offset as a millisecond timestamp when using the timestamp startup mode.
No
LONG
None
When you use this configuration, you must set scan.startup.mode to timestamp. The timestamp is in milliseconds.
ImportantWhen you specify a time, MySQL CDC attempts to read the initial event of each binlog file to determine its timestamp and locate the binlog file corresponding to the specified time. Make sure that the binlog file for the specified timestamp has not been cleared from the database and can be read.
server-time-zone
The session time zone used by the database.
No
STRING
If you do not specify this option, the system uses the time zone of the Flink job's runtime environment as the database server time zone. This is the time zone of the zone you selected.
Example: Asia/Shanghai. This option controls how TIMESTAMP types in MySQL are converted to STRING types. For more information, see Debezium temporal values.
debezium.min.row.count.to.stream.results
When the number of rows in a table exceeds this value, batch reading mode is used.
No
INTEGER
1000
Flink reads data from a MySQL source table in the following ways:
Full read: Reads the entire table's data directly into memory. This method is fast but consumes a corresponding amount of memory. If the source table is very large, it may pose an OOM risk.
Batch read: Reads data in multiple batches, with a certain number of rows per batch, until all data is read. This method avoids OOM risks when reading large tables but is relatively slower.
connect.timeout
The maximum time to wait for a connection to the MySQL database server to time out before retrying.
No
DURATION
30s
None.
connect.max-retries
The maximum number of retries after a failed connection to the MySQL database service.
No
INTEGER
3
None.
connection.pool.size
The size of the database connection pool.
No
INTEGER
20
The database connection pool is used to reuse connections, which can reduce the number of database connections.
jdbc.properties.*
Custom connection options in the JDBC URL.
No
STRING
None
You can pass custom connection options. For example, to not use the SSL protocol, you can set 'jdbc.properties.useSSL' = 'false'.
For more information about supported connection options, see MySQL Configuration Properties.
debezium.*
Custom options for Debezium to read binlogs.
No
STRING
None
You can pass custom Debezium options. For example, use 'debezium.event.deserialization.failure.handling.mode'='ignore' to specify the handling logic for parsing errors.
heartbeat.interval
The interval at which the source uses heartbeat events to advance the binlog offset.
No
DURATION
30s
Heartbeat events are used to advance the binlog offset in the source, which is very useful for slowly updated tables in MySQL. For such tables, the binlog offset does not advance automatically. Heartbeat events can push the binlog offset forward, preventing issues where an expired binlog offset causes the job to fail and require a stateless restart.
scan.incremental.snapshot.chunk.key-column
The column used to split chunks during the snapshot phase.
See Remarks.
STRING
None
Required for tables without a primary key. The selected column must be of a non-null type (NOT NULL).
Optional for tables with a primary key. You can only select one column from the primary key.
rds.region-id
The region ID of the ApsaraDB RDS for MySQL instance.
Required when reading archived logs from OSS.
STRING
None
For more information about region IDs, see Regions and zones.
rds.access-key-id
The AccessKey ID of the account for the ApsaraDB RDS for MySQL instance.
Required when reading archived logs from OSS.
STRING
None
For more information, see How do I view the AccessKey ID and AccessKey secret?.
ImportantTo prevent your AccessKey information from being leaked, we recommend that you manage your AccessKey ID using secrets management. For more information, see Manage variables.
rds.access-key-secret
The AccessKey secret of the account for the ApsaraDB RDS for MySQL instance.
Required when reading archived logs from OSS.
STRING
None
For more information, see How do I view the AccessKey ID and AccessKey secret?
ImportantTo prevent your AccessKey information from being leaked, we recommend that you manage your AccessKey secret using secrets management. For more information, see Manage variables.
rds.db-instance-id
The instance ID of the ApsaraDB RDS for MySQL instance.
Required when reading archived logs from OSS.
STRING
None
None.
rds.main-db-id
The ID of the primary database of the ApsaraDB RDS for MySQL instance.
No
STRING
None
For more information about how to obtain the primary database ID, see Log backup for ApsaraDB RDS for MySQL.
Supported only in VVR 8.0.7 and later.
rds.download.timeout
The timeout period for downloading a single archived log from OSS.
No
DURATION
60s
None.
rds.endpoint
The service endpoint for obtaining OSS binlog information.
No
STRING
None
For more information about valid values, see Endpoints.
Supported only in VVR 8.0.8 and later.
scan.incremental.close-idle-reader.enabled
Specifies whether to close idle readers after the snapshot phase ends.
No
BOOLEAN
false
Supported only in VVR 8.0.1 and later.
For this configuration to take effect, you must set execution.checkpointing.checkpoints-after-tasks-finish.enabled to true.
scan.read-changelog-as-append-only.enabled
Specifies whether to convert the changelog stream to an append-only stream.
No
BOOLEAN
false
Valid values:
true: All types of messages, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER, are converted to INSERT messages. Enable this only in special scenarios, such as when you need to save delete messages from the upstream table.
false (Default): All types of messages are sent downstream as they are.
NoteSupported only in VVR 8.0.8 and later.
scan.only.deserialize.captured.tables.changelog.enabled
During the incremental phase, specifies whether to deserialize change events only for the specified tables.
No
BOOLEAN
The default value is false in VVR 8.x versions.
The default value is true in VVR 11.1 and later.
Valid values:
true: Deserializes change data only for the target tables to accelerate binlog reading.
false (Default): Deserializes change data for all tables.
NoteSupported only in VVR 8.0.7 and later.
When used in VVR 8.0.8 and earlier, the option name must be changed to debezium.scan.only.deserialize.captured.tables.changelog.enable.
scan.parse.online.schema.changes.enabled
During the incremental phase, specifies whether to attempt to parse RDS lockless change DDL events.
No
BOOLEAN
false
Valid values:
true: Parses RDS lockless change DDL events.
false (Default): Does not parse RDS lockless change DDL events.
This is an experimental feature. We recommend that you take a snapshot of the Flink job for recovery before you perform online lockless changes.
NoteSupported only in VVR 11.1 and later.
scan.incremental.snapshot.backfill.skip
Specifies whether to skip backfill during the snapshot reading phase.
No
BOOLEAN
false
Valid values:
true: Skips backfill during the snapshot reading phase.
false (Default): Does not skip backfill during the snapshot reading phase.
If you skip backfill, changes to the table during the snapshot phase are read in the later incremental phase instead of being merged into the snapshot.
ImportantSkipping backfill may lead to data inconsistency because changes that occur during the snapshot phase might be replayed. Only at-least-once semantics are guaranteed.
NoteSupported only in VVR 11.1 and later.
scan.incremental.snapshot.unbounded-chunk-first.enabled
Specifies whether to distribute unbounded chunks first during the snapshot reading phase.
No
BOOLEAN
false
Valid values:
true: Distributes unbounded chunks first during the snapshot reading phase.
false (Default): Does not distribute unbounded chunks first during the snapshot reading phase.
This is an experimental feature. Enabling it can reduce the risk of out-of-memory (OOM) errors when a TaskManager synchronizes the last chunk during the snapshot phase. We recommend that you add this before the job's first startup.
NoteSupported only in VVR 11.1 and later.
binlog.session.network.timeout
The network read/write timeout for the binlog connection.
No
DURATION
10m
If you set this option to 0s, the default timeout of the MySQL server is used.
NoteSupported only in VVR 11.5 and later.
scan.rate-limit.records-per-second
The maximum number of records that the source can send per second.
No
LONG
None
This option applies to scenarios where you need to limit data reading. The limit takes effect in both the full and incremental phases.
The
numRecordsOutPerSecondmetric of the source reflects the number of records that the data stream outputs per second. You can adjust this option based on this metric.In the full data reading phase, you typically need to reduce the number of data records read in each batch. You can reduce the value of the
scan.incremental.snapshot.chunk.sizeoption.NoteSupported only in VVR 11.5 and later.
Dimension table-specific
Parameter
Description
Required
Data type
Default value
Remarks
url
The MySQL JDBC URL.
No
STRING
None
The URL format is:
jdbc:mysql://<endpoint>:<port>/<database_name>.lookup.max-retries
The maximum number of retries after a failed data read.
No
INTEGER
3
Supported only in VVR 6.0.7 and later.
lookup.cache.strategy
The cache policy.
No
STRING
None
Supports three cache policies: None, LRU, and ALL. For more information about the values, see Background information.
NoteWhen you use the LRU cache policy, you must also configure the lookup.cache.max-rows option.
lookup.cache.max-rows
The maximum number of cached rows.
No
INTEGER
100000
If you select the LRU cache policy, you must set the cache size.
If you select the ALL cache policy, you do not need to set the cache size.
lookup.cache.ttl
The cache time-to-live (TTL).
No
DURATION
10 s
The configuration of lookup.cache.ttl depends on lookup.cache.strategy:
If lookup.cache.strategy is set to None, you do not need to configure lookup.cache.ttl. This means the cache does not expire.
If lookup.cache.strategy is set to LRU, lookup.cache.ttl is the cache TTL. By default, it does not expire.
If lookup.cache.strategy is set to ALL, lookup.cache.ttl is the cache reload time. By default, it is not reloaded.
Use a time format, such as 1min or 10s.
lookup.max-join-rows
The maximum number of results returned when querying the dimension table for each record in the primary table.
No
INTEGER
1024
None.
lookup.filter-push-down.enabled
Specifies whether to enable filter pushdown for the dimension table.
No
BOOLEAN
false
Valid values:
true: Enables filter pushdown for the dimension table. When loading data from the MySQL database table, the dimension table filters data in advance based on the conditions set in the SQL job.
false (Default): Disables filter pushdown for the dimension table. When loading data from the MySQL database table, the dimension table loads all data.
NoteSupported only in VVR 8.0.7 and later.
ImportantFilter pushdown for a dimension table should only be enabled when the Flink table is used as a dimension table. MySQL source tables do not support enabling filter pushdown. If a Flink table is used as both a source table and a dimension table, and filter pushdown is enabled for the dimension table, you must explicitly set this configuration item to false for the source table using SQL Hints. Otherwise, the job may run abnormally.
Sink-specific
Parameter
Description
Required
Data type
Default value
Remarks
url
The MySQL JDBC URL.
No
STRING
None
The URL format is:
jdbc:mysql://<endpoint>:<port>/<database_name>.sink.max-retries
The maximum number of retries after a failed data write.
No
INTEGER
3
None.
sink.buffer-flush.batch-size
The number of records in a single batch write.
No
INTEGER
4096
None.
sink.buffer-flush.max-rows
The number of data records buffered in memory.
No
INTEGER
10000
This option takes effect only after a primary key is specified.
sink.buffer-flush.interval
The time interval for flushing the buffer. If the data in the buffer does not meet the output conditions after the specified waiting time, the system automatically outputs all data in the buffer.
No
DURATION
1s
None.
sink.ignore-delete
Specifies whether to ignore DELETE operations.
No
BOOLEAN
false
When the stream generated by Flink SQL contains delete or update-before records, if multiple output tasks update different fields of the same table simultaneously, data inconsistency may occur.
For example, after a record is deleted, another task updates only some fields. The un-updated fields will become null or their default values, causing data errors.
By setting sink.ignore-delete to true, you can ignore upstream DELETE and UPDATE_BEFORE operations to avoid such issues.
NoteUPDATE_BEFORE is part of Flink's retraction mechanism, used to "retract" the old value in an update operation.
When ignoreDelete = true, all DELETE and UPDATE_BEFORE records are skipped. Only INSERT and UPDATE_AFTER records are processed.
sink.ignore-null-when-update
When updating data, specifies whether to update the corresponding field to null or skip the update for that field if the incoming data field is null.
No
BOOLEAN
false
Valid values:
true: Does not update the field. This option can be set to true only when a primary key is set for the Flink table. When set to true:
In VVR 8.0.6 and earlier, batch execution is not supported for writing data to the sink table.
In VVR 8.0.7 and later, batch execution is supported for writing data to the sink table.
Although batch writing can significantly improve write efficiency and overall throughput, it can cause data latency and OOM risks. Therefore, make a trade-off based on your actual business scenario.
false: Updates the field to null.
NoteThis option is supported only in VVR 8.0.5 and later.
Type mapping
CDC source table
MySQL CDC field type
Flink field type
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
BIGINT
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
BIGINT UNSIGNED
DECIMAL(20, 0)
BIGINT UNSIGNED ZEROFILL
SERIAL
FLOAT [UNSIGNED] [ZEROFILL]
FLOAT
DOUBLE [UNSIGNED] [ZEROFILL]
DOUBLE
DOUBLE PRECISION [UNSIGNED] [ZEROFILL]
REAL [UNSIGNED] [ZEROFILL]
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]
DECIMAL(p, s)
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n)
STRING
VARCHAR(n)
TEXT
BINARY
BYTES
VARBINARY
BLOB
ImportantWe recommend that you do not use the TINYINT(1) type in MySQL to store values other than 0 and 1. When property-version = 0, the MySQL CDC source table maps TINYINT(1) to Flink's BOOLEAN type by default, which can cause data inaccuracies. To use the TINYINT(1) type to store values other than 0 and 1, see the configuration option catalog.table.treat-tinyint1-as-boolean.
Dimension table and sink table
MySQL field type
Flink field type
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
FLOAT
FLOAT
DOUBLE
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
Notep must be less than or equal to 38.
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
ImportantFlink supports MySQL BLOB type records only up to 2,147,483,647 (2^31 - 1) bytes.
BLOB
MEDIUMBLOB
LONGBLOB
Data ingestion
You can use the MySQL connector as a data source in a data ingestion YAML job.
Syntax
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: <username>
password: <password>
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: xxxConfiguration items
Parameter | Description | Required | Data type | Default value | Remarks |
type | The type of the data source. | Yes | STRING | None | The static field is set to mysql. |
name | The name of the data source. | No | STRING | None | None. |
hostname | The IP address or hostname of the MySQL database. | Yes | STRING | None | We recommend entering the Virtual Private Cloud (VPC) address. Note If the MySQL database and Realtime Compute for Apache Flink are not in the same VPC, you must establish a cross-VPC network connection or use the public network for access. For more information, see Manage and operate workspaces and How can a fully managed Flink cluster access the public network?. |
username | The username for the MySQL database service. | Yes | STRING | None | None. |
password | The password for the MySQL database service. | Yes | STRING | None | None. |
tables | The MySQL data tables to synchronize. | Yes | STRING | None |
Note
|
tables.exclude | The tables to exclude from synchronization. | No | STRING | None |
Note A period is used to separate the database name and table name. To use a period to match any character, you must escape the period with a backslash. For example: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*. |
port | The port number of the MySQL database service. | No | INTEGER | 3306 | None. |
schema-change.enabled | Specifies whether to send schema change events. | No | BOOLEAN | true | None. |
server-id | The numeric ID or range of IDs for the database client used for synchronization. | No | STRING | A random value between 5400 and 6400 is generated. | This ID must be globally unique within the MySQL cluster. We recommend that you set a different ID for each job that connects to the same database. This option also supports an ID range, such as 5400-5408. When incremental reading is enabled, concurrent reading is supported. In this case, we recommend that you set an ID range so that each concurrent reader uses a different ID. |
jdbc.properties.* | Custom connection options in the JDBC URL. | No | STRING | None | You can pass custom connection options. For example, to not use the SSL protocol, you can set 'jdbc.properties.useSSL' = 'false'. For more information about supported connection options, see MySQL Configuration Properties. |
debezium.* | Custom options for Debezium to read binlogs. | No | STRING | None | You can pass custom Debezium options. For example, use 'debezium.event.deserialization.failure.handling.mode'='ignore' to specify the handling logic for parsing errors. |
scan.incremental.snapshot.chunk.size | The size of each chunk in number of rows. | No | INTEGER | 8096 | A MySQL table is split into multiple chunks for reading. The data of a chunk is buffered in memory before it is fully read. While using fewer rows per chunk provides finer granularity for fault recovery, it can also cause out-of-memory (OOM) errors and reduce overall throughput. Therefore, you must set a reasonable chunk size that balances these trade-offs. |
scan.snapshot.fetch.size | The maximum number of records to fetch at a time when reading the full data of a table. | No | INTEGER | 1024 | None. |
scan.startup.mode | The startup mode for data consumption. | No | STRING | initial | Valid values:
Important For the earliest-offset, specific-offset, and timestamp startup modes, if the table schema at the startup time is different from the schema at the specified start offset time, the job will fail due to a schema mismatch. In other words, when you use these three startup modes, you must ensure that the schema of the corresponding table does not change between the specified binlog consumption position and the job startup time. |
scan.startup.specific-offset.file | The binlog filename of the start offset when using the specific offset startup mode. | No | STRING | None | When you use this configuration, you must set scan.startup.mode to specific-offset. The filename format is, for example, |
scan.startup.specific-offset.pos | The offset in the specified binlog file to start from when using the specific offset startup mode. | No | INTEGER | None | When you use this configuration, you must set scan.startup.mode to specific-offset. |
scan.startup.specific-offset.gtid-set | The GTID set of the start offset when using the specific offset startup mode. | No | STRING | None | When you use this configuration, you must set scan.startup.mode to specific-offset. The GTID set format is, for example, |
scan.startup.timestamp-millis | The start offset as a millisecond timestamp when using the timestamp startup mode. | No | LONG | None | When you use this configuration, you must set scan.startup.mode to timestamp. The timestamp is in milliseconds. Important When you specify a time, MySQL CDC attempts to read the initial event of each binlog file to determine its timestamp and locate the binlog file corresponding to the specified time. Make sure that the binlog file for the specified timestamp has not been cleared from the database and can be read. |
server-time-zone | The session time zone used by the database. | No | STRING | If you do not specify this option, the system uses the time zone of the Flink job's runtime environment as the database server time zone. This is the time zone of the zone you selected. | Example: Asia/Shanghai. This option controls how TIMESTAMP types in MySQL are converted to STRING types. For more information, see Debezium temporal values. |
scan.startup.specific-offset.skip-events | The number of binlog events to skip when reading from a specific offset. | No | INTEGER | None | When you use this configuration, scan.startup.mode must be set to specific-offset. |
scan.startup.specific-offset.skip-rows | The number of row changes to skip when reading from a specific offset. A single binlog event may correspond to multiple row changes. | No | INTEGER | None | When you use this configuration, scan.startup.mode must be set to specific-offset. |
connect.timeout | The maximum time to wait for a connection to the MySQL database server to time out before retrying. | No | DURATION | 30s | None. |
connect.max-retries | The maximum number of retries after a failed connection to the MySQL database service. | No | INTEGER | 3 | None. |
connection.pool.size | The size of the database connection pool. | No | INTEGER | 20 | The database connection pool is used to reuse connections, which can reduce the number of database connections. |
heartbeat.interval | The interval at which the source uses heartbeat events to advance the binlog offset. | No | DURATION | 30s | Heartbeat events are used to advance the binlog offset in the source, which is very useful for slowly updated tables in MySQL. For such tables, the binlog offset does not advance automatically. Heartbeat events can push the binlog offset forward, preventing issues where an expired binlog offset causes the job to fail and require a stateless restart. |
scan.incremental.snapshot.chunk.key-column | The column used to split chunks during the snapshot phase. | No. | STRING | None | You can only select one column from the primary key. |
rds.region-id | The region ID of the ApsaraDB RDS for MySQL instance. | Required when reading archived logs from OSS. | STRING | None | For more information about region IDs, see Regions and zones. |
rds.access-key-id | The AccessKey ID of the account for the ApsaraDB RDS for MySQL instance. | Required when reading archived logs from OSS. | STRING | None | For more information, see How do I view the AccessKey ID and AccessKey secret? Important To prevent your AccessKey information from being leaked, we recommend that you manage your AccessKey ID using secrets management. For more information, see Manage variables. |
rds.access-key-secret | The AccessKey secret of the account for the ApsaraDB RDS for MySQL instance. | Required when reading archived logs from OSS. | STRING | None | For more information, see How do I view the AccessKey ID and AccessKey secret? Important To prevent your AccessKey information from being leaked, we recommend that you manage your AccessKey secret using secrets management. For more information, see Manage variables. |
rds.db-instance-id | The instance ID of the ApsaraDB RDS for MySQL instance. | Required when reading archived logs from OSS. | STRING | None | None. |
rds.main-db-id | The ID of the primary database of the ApsaraDB RDS for MySQL instance. | No | STRING | None | For more information about how to obtain the primary database ID, see Log backup for ApsaraDB RDS for MySQL. |
rds.download.timeout | The timeout period for downloading a single archived log from OSS. | No | DURATION | 60s | None. |
rds.endpoint | The service endpoint for obtaining OSS binlog information. | No | STRING | None | For more information about valid values, see Endpoints. |
rds.binlog-directory-prefix | The directory prefix for storing binlog files. | No | STRING | rds-binlog- | None. |
rds.use-intranet-link | Specifies whether to use an internal network to download binlog files. | No | BOOLEAN | true | None. |
rds.binlog-directories-parent-path | The absolute path of the parent directory where binlog files are stored. | No | STRING | None | None. |
chunk-meta.group.size | The size of the chunk metadata. | No | INTEGER | 1000 | If the metadata is larger than this value, it is transmitted in multiple parts. |
chunk-key.even-distribution.factor.lower-bound | The lower bound of the chunk distribution factor for even splitting. | No | DOUBLE | 0.05 | If the distribution factor is less than the value of this parameter, chunks are not evenly distributed. Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / total number of data rows. |
chunk-key.even-distribution.factor.upper-bound | The upper bound of the chunk distribution factor for even splitting. | No | DOUBLE | 1000.0 | If the distribution factor is greater than this value, uneven splitting is used. Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of rows. |
scan.incremental.close-idle-reader.enabled | Specifies whether to close idle readers after the snapshot phase ends. | No | BOOLEAN | false | For this configuration to take effect, you must set |
scan.only.deserialize.captured.tables.changelog.enabled | During the incremental phase, specifies whether to deserialize change events only for the specified tables. | No | BOOLEAN |
| Valid values:
|
scan.parallel-deserialize-changelog.enabled | During the incremental phase, specifies whether to use multiple threads to parse change events. | No | BOOLEAN | false | Valid values:
Note Supported only in VVR 8.0.11 and later. |
scan.parallel-deserialize-changelog.handler.size | The number of event handlers when using multiple threads to parse change events. | No | INTEGER | 2 | Note Supported only in VVR 8.0.11 and later. |
metadata-column.include-list | The metadata columns to pass to the downstream. | No | STRING | None | Available metadata includes Note The MySQL CDC YAML connector does not need to and does not support adding the Important The The |
scan.newly-added-table.enabled | When restarting from a checkpoint, specifies whether to synchronize newly added tables that were not matched in the previous run or to remove currently unmatched tables that are saved in the state. | No | BOOLEAN | false | This takes effect when restarting from a checkpoint or savepoint. |
scan.binlog.newly-added-table.enabled | During the incremental phase, specifies whether to send data from newly added tables that are matched. | No | BOOLEAN | false | Cannot be enabled at the same time as |
scan.incremental.snapshot.chunk.key-column | Specifies a column for certain tables to be used as the splitting key for chunks during the snapshot phase. | No | STRING | None |
|
scan.parse.online.schema.changes.enabled | During the incremental phase, specifies whether to attempt to parse RDS lockless change DDL events. | No | BOOLEAN | false | Valid values:
This is an experimental feature. We recommend that you take a snapshot of the Flink job for recovery before you perform online lockless changes. Note Supported only in VVR 11.0 and later. |
scan.incremental.snapshot.backfill.skip | Specifies whether to skip backfill during the snapshot reading phase. | No | BOOLEAN | false | Valid values:
If backfill is skipped, changes to the table during the snapshot phase are read in the later incremental phase instead of being merged into the snapshot. Important Skipping backfill may cause data inconsistency because changes that occur during the snapshot phase might be replayed, which guarantees only at-least-once semantics. Note This feature is available only for Flink compute engine Ververica Runtime (VVR) 11.1 and later. |
treat-tinyint1-as-boolean.enabled | Specifies whether to treat the TINYINT(1) type as the Boolean type. | No | BOOLEAN | true | Valid values:
|
treat-timestamp-as-datetime-enabled | Specifies whether to treat the TIMESTAMP type as the DATETIME type. | No | BOOLEAN | false | Valid values:
The MySQL TIMESTAMP type stores UTC time and is affected by time zones. The MySQL DATETIME type stores literal time and is not affected by time zones. When enabled, this feature transforms MySQL TIMESTAMP data to the DATETIME type based on the server-time-zone. |
include-comments.enabled | Specifies whether to sync table comments and field comments. | No | BOOLEAN | false | Valid values:
Enabling this feature increases the memory usage of the job. |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Specifies whether to distribute unbounded shards first during the snapshot reading phase. | No | BOOLEAN | false | Valid values:
This is an experimental feature. Enabling it can reduce the threat of a TaskManager encountering an out-of-memory (OOM) error when it syncs the last shard during the snapshot phase. Add this parameter before starting the job for the first time. Note This feature is available only for Flink compute engine VVR 11.1 and later. |
binlog.session.network.timeout | The network timeout duration for the binary logging connection. | No | DURATION | 10m | If this parameter is set to 0 s, the default server-side timeout duration of MySQL is used. Note This feature is available only for Flink compute engine VVR 11.5 and later. |
scan.rate-limit.records-per-second | The maximum number of records that the source can send per second. | No | LONG | None | This parameter applies to scenarios where data reads must be limited. This limit takes effect in both the full and incremental phases. The During the full read phase, also reduce the number of records read in each batch. To do this, decrease the value of the Note This feature is available only for Flink compute engine VVR 11.5 and later. |
Type mapping
The following table shows the type mappings for data ingestion.
MySQL CDC field type | CDC field type |
TINYINT(n) | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
YEAR | |
INT | INT |
MEDIUMINT | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | |
BOOLEAN | BOOLEAN |
BIT(1) | |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] |
DATETIME [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] | Depending on the value of the
|
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | STRING Note MySQL supports a decimal precision up to 65. Flink limits decimal precision to 38. If a decimal column has a precision greater than 38, map it to a string to prevent precision loss. |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | |
TINYTEXT | STRING |
TEXT | |
MEDIUMTEXT | |
LONGTEXT | |
ENUM | |
JSON | STRING Note The JSON data type is transformed into a JSON-formatted string in Flink. |
GEOMETRY | STRING Note MySQL spatial data types are transformed into strings with a fixed JSON format. For more information, see MySQL spatial data type mapping. |
POINT | |
LINESTRING | |
POLYGON | |
MULTIPOINT | |
MULTILINESTRING | |
MULTIPOLYGON | |
GEOMETRYCOLLECTION | |
TINYBLOB | BYTES Note For MySQL BLOB data types, the maximum supported length is 2,147,483,647 (2**31 - 1). |
BLOB | |
MEDIUMBLOB | |
LONGBLOB |
Examples
CDC source table
CREATE TEMPORARY TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( order_id INT, customer_name STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT order_id, customer_name FROM mysqlcdc_source;Dimension table
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;Sink table
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); INSERT INTO mysql_sink SELECT * FROM datagen_source;Data ingestion source
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
About MySQL CDC source tables
How it works
When the MySQL CDC source table starts, it scans the entire table, splits it into multiple chunks based on the primary key, and records the current binlog position. It then uses the incremental snapshot algorithm to read the data from each chunk using SELECT statements. The job periodically performs checkpoints to record the completed chunks. If a failover occurs, the job continues reading from the unfinished chunks. After all chunks are read, the job starts reading incremental change records from the previously recorded binlog position. The Flink job continues to perform periodic checkpoints to record the binlog position. If the job fails, it resumes processing from the last recorded binlog position to achieve exactly-once semantics.
For a more detailed explanation of the incremental snapshot algorithm, see MySQL CDC Connector.
Metadata
Metadata is useful for merging sharded databases and tables. After merging, you may need to identify the source database and table for each row of data. Metadata columns allow you to access this information, making it easy to merge multiple sharded tables into a single destination table.
The MySQL CDC Source supports metadata column syntax. You can access the following metadata through metadata columns.
Metadata key
Metadata type
Description
database_name
STRING NOT NULL
The name of the database that contains the row.
table_name
STRING NOT NULL
The name of the table that contains the row.
op_ts
TIMESTAMP_LTZ(3) NOT NULL
The time when the row was changed in the database. If the record is from the table's existing historical data instead of the binlog, this value is always 0.
op_type
STRING NOT NULL
The change type of the row.
+I: INSERT message
-D: DELETE message
-U: UPDATE_BEFORE message
+U: UPDATE_AFTER message
NoteSupported only in VVR 8.0.7 and later.
query_log
STRING NOT NULL
The MySQL query log record corresponding to the row read.
NoteTo log queries, MySQL requires enabling the binlog_rows_query_log_events parameter.
The following example shows how to merge multiple orders tables from different sharded databases in a MySQL instance and synchronize them to a holo_orders table in Hologres.
CREATE TEMPORARY TABLE mysql_orders ( db_name STRING METADATA FROM 'database_name' VIRTUAL, -- Read the database name. table_name STRING METADATA FROM 'table_name' VIRTUAL, -- Read the table name. operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Read the change timestamp. op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Read the change type. order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb_.*', -- Match multiple sharded databases using a regular expression. 'table-name' = 'orders_.*' -- Match multiple sharded tables using a regular expression. ); INSERT INTO holo_orders SELECT * FROM mysql_orders;Based on the code above, if you set the scan.read-changelog-as-append-only.enabled option to true in the WITH clause, the output varies based on the primary key settings of the downstream table:
If the primary key of the downstream table is order_id, the output includes only the last change for each primary key in the source table. For example, if the last change for a primary key was a delete operation, a record with the same primary key and an op_type of -D appears in the downstream table.
If the primary key of the downstream table is a composite of order_id, operation_ts, and op_type, the output includes the complete change history for each primary key in the source table.
Regular expression support
The MySQL CDC source table supports using regular expressions in the table name or database name to match multiple tables or databases. The following example shows how to specify multiple tables using a regular expression.
CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Match multiple databases using a regular expression. 'table-name' = '(t[5-8]|tt)' -- Match multiple tables using a regular expression. );Explanation of the regular expressions in the preceding example:
^(test).* is a prefix match example. This expression can match database names that start with test, such as test1 or test2.
.*[p$] is a suffix match example. This expression can match database names that end with p, such as cdcp or edcp.
txc is a specific match. It can match a specific database name, such as txc.
When matching a fully qualified table name, MySQL CDC uses both the database name and table name to uniquely identify a table. It uses the pattern database-name.table-name for matching. For example, the pattern (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[ 5-8]|tt) can match tables such as txc.tt and test2.test5 in the database.
ImportantIn SQL job configurations, the table-name and database-name options do not support using a comma (,) to specify multiple tables or databases.
To match multiple tables or use multiple regular expressions, connect them with a vertical bar (|) and enclose them in parentheses. For example, to read the user and product tables, you can configure table-name as
(user|product).If a regular expression contains a comma, you must rewrite it using the vertical bar (|) operator. For example, the regular expression
mytable_\d{1, 2}needs to be rewritten as the equivalent(mytable_\d{1}|mytable_\d{2})to avoid using a comma.
Concurrency control
The MySQL connector supports concurrently reading full data, which improves data loading efficiency. When combined with Autopilot in the Realtime Compute for Apache Flink console, it can automatically scale in during the incremental phase after the concurrent reading is complete, which saves computing resources.
In the Realtime Compute for Apache Flink development console, you can set the job's parallelism on the Resource Configuration page in either Basic mode or Expert mode. The differences are as follows:
The parallelism set in Basic mode is the global parallelism for the entire job.

Expert mode lets you set the parallelism for a specific VERTEX as needed.

For more information about resource configuration, see Configure a job deployment.
ImportantRegardless of whether you use Basic mode or Expert mode, the server-id range declared in the table must be greater than or equal to the job's parallelism. For example, if the server-id range is 5404-5412, the range contains 9 unique server IDs, allowing for a maximum job parallelism of 9. Different jobs for the same MySQL instance must have non-overlapping server-id ranges, which means that each job must have a unique, explicitly configured server-id.
Autopilot auto-scaling
The full data phase accumulates a large amount of historical data. To improve reading efficiency, historical data is typically read concurrently. In the incremental binlog phase, however, because the volume of binlog data is small and a global order must be maintained, a single parallelism is usually sufficient. Autopilot can automatically balance performance and resources to meet these different requirements of the full and incremental phases.
Autopilot monitors the traffic of each task in the MySQL CDC Source. When the job enters the binlog phase, if only one task is responsible for reading the binlog and the other tasks are idle, Autopilot will automatically reduce the CU count and parallelism of the Source. To enable Autopilot, set the Autopilot mode to Active on the job's O&M page.
NoteThe minimum trigger interval for reducing parallelism is 24 hours by default. For more information on Autopilot options and details, see Configure Autopilot.
Startup modes
You can use the scan.startup.mode option to specify the startup mode for the MySQL CDC source table. The options include the following:
initial (default): Performs a full read of the database table upon the first startup, and then switches to incremental mode to read the binlog.
earliest-offset: Skips the snapshot phase and starts reading from the earliest available binlog position.
latest-offset: Skips the snapshot phase and starts reading from the end of the binlog. In this mode, the source table can only read data changes that occur after the job starts.
specific-offset: Skips the snapshot phase and starts reading from a specified binlog position. The position can be specified by the binlog filename and position, or using a GTID set.
timestamp: Skips the snapshot phase and starts reading binlog events from a specified timestamp.
Example:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'scan.startup.mode' = 'earliest-offset', -- Start from the earliest offset. 'scan.startup.mode' = 'latest-offset', -- Start from the latest offset. 'scan.startup.mode' = 'specific-offset', -- Start from a specific offset. 'scan.startup.mode' = 'timestamp', -- Start from a specific timestamp. 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Specify the binlog filename for the specific-offset mode. 'scan.startup.specific-offset.pos' = '4', -- Specify the binlog position for the specific-offset mode. 'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Specify the GTID set for the specific-offset mode. 'scan.startup.timestamp-millis' = '1667232000000' -- Specify the startup timestamp for the timestamp mode. ... )ImportantThe MySQL source logs the current position at the INFO level during a checkpoint. The log prefix is
Binlog offset on checkpoint {checkpoint-id}. This log can help you restart the job from a specific checkpoint position.If the schema of the table being read has changed in the past, starting from the earliest-offset, a specific-offset, or a timestamp may cause errors. This is because the Debezium reader internally stores the latest schema, and older data with a mismatched schema cannot be parsed correctly.
About keyless CDC source tables
To use a keyless table, you must set scan.incremental.snapshot.chunk.key-column and specify a non-nullable column.
The processing semantics of a keyless CDC source table are determined by the behavior of the column specified in scan.incremental.snapshot.chunk.key-column:
If the specified column is not updated, exactly-once semantics can be guaranteed.
If the specified column is updated, only at-least-once semantics can be guaranteed. However, you can ensure data correctness by combining it with a downstream system, specifying a downstream primary key, and using idempotent operations.
Read backup logs from ApsaraDB RDS for MySQL
The MySQL CDC source table supports reading backup logs from ApsaraDB RDS for MySQL. This is useful when the full snapshot phase takes a long time, which can cause local binlog files to be cleaned up before they are read. If backup files are available, the connector can read from them instead.
Example:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'rds.region-id' = 'cn-beijing', 'rds.access-key-id' = 'xxxxxxxxx', 'rds.access-key-secret' = 'xxxxxxxxx', 'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 'rds.main-db-id' = '12345678', 'rds.download.timeout' = '60s' ... )Enable CDC Source reuse
In the same job, multiple MySQL CDC source tables start multiple binlog clients. If all source tables are in the same instance, this increases the database pressure. For more information, see MySQL CDC FAQ.
Solution
VVR 8.0.7 and later support MySQL CDC Source reuse. This feature merges eligible MySQL CDC source tables. Source tables are eligible for merging if their configuration items are identical, except for the database name, table name, and
server-id. The engine automatically merges MySQL CDC sources in the same job.Procedure
Use the
SETcommand in the SQL job:SET 'table.optimizer.source-merge.enabled' = 'true'; # (For VVR 8.0.8 and 8.0.9) Additionally set this item: SET 'sql-gateway.exec-plan.enabled' = 'false';Reuse is enabled by default in VVR 11.1 and later.
Start the job without a state. Modifying the Source reuse configuration item changes the job topology. You must start the job without a state. Otherwise, the job may fail to start or lose data. If a Source is merged, a
MergetableSourceScannode appears.
ImportantAfter you enable reuse, we do not recommend disabling operator chaining. If you set
pipeline.operator-chainingtofalse, it increases the overhead of data serialization and deserialization. The more Sources are merged, the greater the overhead.In VVR 8.0.7, disabling operator chaining causes a serialization issue.
Accelerate binlog reading
When you use the MySQL connector as a source table or a data ingestion source, it parses binlog files to generate various change messages during the incremental phase. The binlog file records all table changes in a binary format. You can accelerate binlog file parsing in the following ways.
Enable the parsing filter
Use the
scan.only.deserialize.captured.tables.changelog.enabledoption to parse change events only for the specified tables.
Optimize Debezium options
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50debezium.max.queue.size: The maximum number of records that the blocking queue can hold. When Debezium reads an event stream from the database, it places the events in a blocking queue before writing them downstream. The default value is 8192.debezium.max.batch.size: The maximum number of events that the connector processes in each iteration. The default value is 2048.debezium.poll.interval.ms: The number of milliseconds the connector should wait before requesting new change events. The default value is 1000 milliseconds, or 1 second.
Example:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Debezium configuration
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- Enable parsing filter
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- Parses change events only for specified tables.
...
)source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
# Debezium configuration
debezium.max.queue.size: 162580
debezium.max.batch.size: 40960
debezium.poll.interval.ms: 50
# Enable parsing filter
scan.only.deserialize.captured.tables.changelog.enabled: trueThe Enterprise Edition of MySQL CDC has a binlog consumption capacity of 85 MB/s, which is about twice that of the open source community version. If the binlog generation speed exceeds 85 MB/s (equivalent to one 512 MB file every 6 seconds), the Flink job's latency will continue to increase. The processing latency will gradually decrease after the binlog generation speed slows down. When the binlog file contains large transactions, processing latency may temporarily increase and then decrease after the transaction log is read.
MySQL CDC DataStream API
To read and write data using the DataStream API, you must use the corresponding DataStream connector to connect to Flink. For information on how to set up the DataStream connector, see Integrate and use connectors in DataStream programs.
The following examples show how to create a DataStream API program and use MySqlSource, including the required pom dependencies.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // Set the database to capture.
.tableList("yourDatabaseName.yourTableName") // Set the table to capture.
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // Converts a SourceRecord to a JSON string.
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing.
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// Set 4 parallel source tasks.
.setParallelism(4)
.print().setParallelism(1); // Use a parallelism of 1 for the sink to maintain message order.
env.execute("Print MySQL Snapshot + Binlog");
}
}<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>When you build MySqlSource, you must specify the following parameters in your code:
Parameter | Description |
hostname | The IP address or hostname of the MySQL database. |
port | The port number of the MySQL database service. |
databaseList | The name of the MySQL database. Note The database name supports regular expressions to read data from multiple databases. Use |
username | The username for the MySQL database service. |
password | The password for the MySQL database service. |
deserializer | A deserializer that deserializes records of the SourceRecord type to a specified type. The parameter can be set to one of the following values:
|
You must specify the following parameters in your pom dependencies:
${vvr.version} | The version of the Realtime Compute for Apache Flink engine. For example, Note Use the version number displayed on Maven. We periodically release hotfix versions, and these updates might not be announced through other channels. |
${flink.version} | The Apache Flink version. For example: Important Ensure that your Apache Flink version matches the engine version of Realtime Compute for Apache Flink to avoid incompatibility issues during job runtime. For more information about the version mapping, see Engine. |
FAQ
For information about issues you might encounter when using a CDC source table, see CDC issues.