This topic provides the DDL syntax that is used to create a MySQL Change Data Capture (CDC) source table, describes the parameters in the WITH clause, and provides data type mappings and sample code.
What is a MySQL CDC source table?
A MySQL CDC source table is a streaming source table of MySQL. This table is used to read full and incremental data from a MySQL database. The exactly-once processing semantics is used to ensure data reading accuracy. Before Flink starts a full table scan, it adds a global read lock by using FLUSH TABLES WITH READ LOCK, obtains the binary log file position and table schemas, and then releases the global read lock. After Flink reads all data in a full table scan, it obtains the incremental change records from the previous binary log file position. When a Flink job is running, checkpoints are periodically generated to record the binary log file position. If the job is recovered from failure or is resumed, Flink can process data from the previous binary log file position. This achieves the exactly-once semantics.
A self-managed MySQL database or an ApsaraDB RDS for MySQL database is created, and tables are created in the database. For more information about how to create an ApsaraDB RDS for MySQL database and create a table, see Create accounts and databases for an ApsaraDB RDS for MySQL instance.
- MySQL CDC data can be consumed only in a single job.
- Only Flink that uses Ververica Runtime (VVR) 2.1.2 or later supports MySQL CDC connectors.
- MySQL CDC source tables do not support watermarks. If you need to perform window aggregation on a MySQL CDC source table, you can use a non-window aggregation method to aggregate data instead.
- Only MySQL users who have specific permissions such as SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT are allowed to read full data and incremental data from a MySQL CDC source table. Flink can read data only from MySQL V5.7 and MySQL V8.0 by using a MySQL CDC connector.
- When Flink reads full data from a MySQL CDC source table, checkpointing does not take effect. In this case, we recommend that you do not enable Autopilot in this scenario.
- We recommend that you grant MySQL users the RELOAD permission.
If you do not grant MySQL users the RELOAD permission, the global read lock is downgraded to a table read lock. The table read lock is released only after a full table scan is complete. This prolongs the locking duration. In addition, online business is affected because the table read lock blocks data writing to tables.Note If you have configured 'debezium.snapshot.locking.mode' = 'none' to show the stage of skipping the read lock, you do not need to grant the RELOAD permission.
- Online business may be affected when you execute FLUSH TABLES WITH READ LOCK.
When you execute FLUSH TABLES WITH READ LOCK, Flink obtains the binary log file position and table schemas. Therefore, the locking duration increases with the number of tables. The database locking duration may reach seconds. For example, the time required to lock thousands of tables is about 2 or 3 seconds. Online business may be affected because the read lock blocks data writing. If you want to skip the lock stage and can tolerate non-exactly-once semantics, you can add the 'debezium.snapshot.locking.mode' = 'none' property to explicitly skip the lock stage.
- Unique server IDs must be explicitly configured for each job.
Each client that synchronizes data from a database has a unique server ID. The MySQL server maintains network connections and binary log file positions based on server IDs. If a large number of clients with different server IDs connect to the MySQL server, CPU utilization of the MySQL server may increase suddenly. This affects the stability of online business. If multiple jobs share the same server ID, the checkpoints for the binary log file positions may be disordered and data reading accuracy is affected. Therefore, we recommend that you use dynamic hints to configure different server IDs for each CDC job, such as
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;. For more information about dynamic hints, see Dynamic hints.
- No checkpoint can be generated during a full table scan.
During a full table scan, Flink cannot trigger a checkpoint because no binary log file position can be used to restore data. To prevent Flink from triggering a checkpoint, the MySQL CDC source table keeps the running checkpoint waiting. If the table that is scanned is excessively large, the scan is time-consuming, which causes the checkpoint to time out. The checkpoint that times out is considered a failed checkpoint. If Flink uses its default configurations, a job failover may be triggered. To prevent job failures due to the timeout of a checkpoint, we recommend that you configure the following job parameters for excessively large tables:
execution.checkpointing.interval: 10min execution.checkpointing.tolerable-failed-checkpoints: 100 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647
Parameter Description execution.checkpointing.interval The checkpoint interval. Unit: minutes. execution.checkpointing.tolerable-failed-checkpoints The total number of tolerable checkpoint failures. restart-strategy The retry policy after a checkpoint failure. Valid values:
- fixed-delay: This is the recommended value. If an application fails, the system restarts the application at a fixed interval. The default interval is 1 second. The system can restart the application for a maximum number of times that is specified by restart-strategy.fixed-delay.attempts.
- failure-rate: If an application fails, the system restarts the application at a fixed interval. The default interval is 1 second. If the failure frequency exceeds the specified upper limit, the application is no longer restarted. The failure frequency is controlled by the following parameters:
- restart-strategy.failure-rate.failure-rate-interval: determines the failure interval.
- restart-strategy.failure-rate.max-failures-per-interval: determines the maximum number of failures allowed in the specified interval.
- none: If an application fails, the system does not restart the application.
restart-strategy.fixed-delay.attempts The number of retries allowed for checkpoint failures.
CREATE TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' );
Parameters in the WITH clause
|connector||The type of the source table.||Yes||STRING||Set the value to
|hostname||The IP address or hostname of the MySQL database.||Yes||STRING||N/A.|
|username||The username that is used to access the MySQL database.||Yes||STRING||N/A.|
|password||The password that is used to access the MySQL database.||Yes||STRING||N/A.|
|database-name||The name of the MySQL database.||Yes||STRING||You can set this parameter to a regular expression to read data from multiple databases.|
|table-name||The name of the MySQL table.||Yes||STRING||You can set this parameter to a regular expression to read data from multiple tables.|
|port||The port number that is used to access the MySQL database.||No||INTEGER||Default value: 3306.|
|server-id||The ID allocated to a database client.||No||STRING||The ID must be unique in a MySQL cluster. We recommend that you configure a unique server ID for each job in the same database. By default, a value in the range of 5400 to 6400 is randomly generated.|
|scan.startup.mode||The startup mode of the consumer.||No||STRING||For more information, see Start mode.|
|server-time-zone||The time zone of the session used by the database.||No||STRING||Example: Asia/Shanghai. This parameter determines how to convert data in the MySQL database from the TIMESTAMP type to the STRING type.|
|debezium.min.row.count.to.stream.results||The minimum number of data records allowed in a table to trigger the batch data read mode. If the number of data records in a table is greater than the value of this parameter, the batch data read mode is used.||No||INTEGER||Default value: 1000. Flink reads data from a MySQL source table in one of the following modes:
|debezium.snapshot.fetch.size||The maximum number of rows that can be read at a time from a MySQL source table at the snapshot stage.||No||INTEGER||This parameter is valid only when the batch read mode is used.|
|debezium.*||The Debezium property parameters.||No||STRING||The parameters are used to impose fine-grained control over the behavior of Debezium clients. For example, you can set
- initial: This is the default value. When a MySQL CDC connector starts for the first time, it scans all historical data and reads the latest binary log data.
- latest-offset: When a MySQL CDC connector starts for the first time, it reads binary log data from the latest offset, instead of scanning all historical data. This way, Flink reads only the latest incremental data after the connector starts.
Data type mapping
|Data type of MySQL CDC||Data type of Flink|
|BIGINT UNSIGNED||DECIMAL(20, 0)|
|NUMERIC(p, s)||DECIMAL(p, s)|
|TIME [(p)]||TIME [(p)] [WITHOUT TIMEZONE]|
|DATETIME [(p)]||TIMESTAMP [(p)] [WITHOUT TIMEZONE]|
|TIMESTAMP [(p)]||TIMESTAMP [(p)]|
|TIMESTAMP [(p)] WITH LOCAL TIME ZONE|