This topic describes how to create a Change Data Capture (CDC) source table of MySQL. It also describes the data definition language (DDL) statements, parameters in the WITH clause, field type mappings, and sample code used when you create a CDC source table of MySQL.
Introduction to the CDC source table of MySQL
- We recommend that you grant MySQL users the RELOAD permissions.
If you do not grant MySQL users the RELOAD permissions, the global read lock is downgraded to a table read lock. The read lock is released only after a full table scan is complete. This prolongs the locking duration. In addition, online business is affected because the read lock blocks the writing of data to tables.
- Online business may be affected during the execution of FLUSH TABLES WITH READ LOCK.
During the execution of FLUSH TABLES WITH READ LOCK, Flink obtains the binary log file position and table schemas. Therefore, the locking duration increases with the number of tables. The database locking duration may reach seconds. Online business may be affected because the read lock blocks data writing. If you want to skip the lock stage and can tolerate non-exactly-once semantics, you can add the
'debezium.snapshot.locking.mode' = 'none'property to explicitly skip the lock stage.
- Unique server IDs must be explicitly configured for each job.
Each client that synchronizes data from a database has a unique server ID. The MySQL server maintains network connections and binary log file positions based on server IDs. If a large number of clients with different server IDs connect to the MySQL server, CPU utilization of the MySQL server may increase suddenly. This affects the stability of online business. If multiple jobs share the same server ID, the checkpoints for the binary log file positions may be disordered and data reading accuracy is affected. Therefore, we recommend that you use dynamic hints to configure different server IDs for each CDC job, for example,
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;. For more information about dynamic hints, see Dynamic hints.
- No checkpoint cannot be triggered during a full table scan.
During a full table scan, Flink cannot trigger a checkpoint because no binary log file position can be used to restore data. To prevent Flink from triggering a checkpoint, the CDC source table of MySQL keeps the running checkpoint wait. If the table that is scanned is excessively large, the scan is time-consuming, which causes the checkpoint to time out. The checkpoint that times out is considered as a failed checkpoint. If Flink uses its default configurations, a job failover may be triggered if a checkpoint fails. To prevent job failovers due to the timeout of a checkpoint, we recommend that you configure the following job parameters for excessively large tables:
execution.checkpointing.interval: 10min # The interval at which checkpoints are triggered. execution.checkpointing.tolerable-failed-checkpoints: 100 # The maximum number of failed checkpoints allowed. restart-strategy: fixed-delay # The retry policy. restart-strategy.fixed-delay.attempts: 2147483647 #The number of retries.
CREATE TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' );
Parameters in the WITH clause
|connector||The type of the source table.||Yes||STRING||Set the value to
|hostname||The IP address or hostname of the MySQL database.||Yes||STRING||None.|
|username||The username that is used to access the MySQL database.||Yes||STRING||None.|
|password||The password that is used to access the MySQL database.||Yes||STRING||None.|
|database-name||The name of the MySQL database.||Yes||STRING||The database name can be a regular expression so that data of multiple databases can be read.|
|table-name||The name of the MySQL table.||Yes||STRING||The table name can be a regular expression so that data of multiple tables can be read.|
|port||The port that is used to access the MySQL database.||No||INTEGER||Default value: 3306.|
|server-id||The ID allocated to a client.||No||STRING||The ID must be unique in a MySQL cluster. We recommend that you configure a unique server ID for each job in the same database. By default, a value in the range of 5400 to 6400 is randomly generated.|
|server-time-zone||The time zone of the session used by the database.||No||STRING||Example: Asia/Shanghai. This parameter determines how to convert data in the MySQL database from the TIMESTAMP type to the STRING type.|
|debezium.min.row.count.to.stream.results||The minimum number of data records allowed in a table to trigger the batch data read mode. If the number of data records in a table is greater than the value of this parameter, the batch data read mode is used.||No||INTEGER||Default value: 1000. Flink reads data from a MySQL source table in one of the following
|debezium.snapshot.fetch.size||The maximum number of rows that can be read at a time from a MySQL source table at the snapshot stage.||No||INTEGER||This parameter is valid only when the batch read mode is used.|
|debezium. *||The Debezium property parameters.||No||STRING||This parameter is used to impose fined-grained control over the behavior of a Debezium
connector. For example, you can set
Field type mapping
|Data type of MySQL CDC||Data type of Flink|
|BIGINT UNSIGNED||DECIMAL(20, 0)|
|NUMERIC(p, s)||DECIMAL(p, s)|
|TIME [(p)]||TIME [(p)] [WITHOUT TIMEZONE]|
|DATETIME [(p)]||TIMESTAMP [(p)] [WITHOUT TIMEZONE]|
|TIMESTAMP [(p)]||TIMESTAMP [(p)]|
|TIMESTAMP [(p)] WITH LOCAL TIME ZONE|
- Q: How do I enable the system to skip the snapshot stage and read only the incremental
A: You can use the debezium.snapshot.mode parameter in the WITH clause to control whether to skip the snapshot stage. Valid values:
- never: When a job starts to run, the system directly reads the incremental data from the start position without the need to read the snapshots from the database. The old incremental data of the MySQL database may be automatically deleted. Therefore, the incremental data may not contain full data and the data that the system reads may be incomplete.
- schema_only: If you do not need to ensure data consistency and focus only on the incremental data in the database, you can set this parameter to schema_only. This way, only schema snapshots are generated and the system reads data from the latest incremental data.
- Q: How does the system read data from a MySQL database on which table sharding is
A: If a MySQL database has multiple tables such as table user_00, table user_02, and table user_99 after table sharding, and the schemas of these tables are the same, you can use the table-name option to specify a regular expression to match multiple tables whose data can be read. For example, you can set table-name to user_. * to monitor all tables with the user_ prefix. You can also use the database-name option to achieve the same effect if the schemas of all tables in the database are the same.
- Q: What do I do if the data reading efficiency is low and backpressure exists when
full data is read from tables?
A: If the downstream node processes data at a low speed, backpressure may occur. You can first check whether the downstream node has backpressure. If the downstream node has backpressure, solve the backpressure issue on the downstream node first. To resolve this issue, you can use one of the following methods:
- Increase the job concurrency.
- Enable aggregate optimization features, such as miniBatch.