This topic provides the DDL syntax that is used to create a MySQL Change Data Capture (CDC) source table, describes the parameters in the WITH clause, and provides data type mappings and sample code.
What is a MySQL CDC source table?
A MySQL CDC source table is a streaming source table of MySQL databases. A MySQL CDC source table reads full historical data from a database and then reads binary log files. Accurate data reading is achieved. If an error occurs, the exactly-once semantics can be used to ensure data accuracy. You can run multiple jobs at the same time to read full data from a MySQL CDC source table by using a MySQL CDC connector. When the jobs are running, the incremental snapshot algorithm is used to implement lock-free reading and resumable uploads.
- Integrates stream processing and batch processing to support full and incremental data reading. This way, you do not need to separately implement stream processing and batch processing.
- Allows you to run multiple jobs at the same time to read full data from a MySQL CDC source table. This improves the data reading performance.
- Seamlessly switches between full and incremental data reading and supports automatic scale-in operations. This reduces computing resources that are consumed.
- Supports resumable uploads during full data reading, which provides more stable performance.
- Reads full data without locks. This way, online business is not affected.
When a Flink job starts, the MySQL CDC connector scans the full table and splits the table into multiple chunks based on primary keys. Then, the MySQL CDC connector uses the incremental snapshot algorithm to read the data from each chunk. The job periodically generates checkpoints to record the chunks whose data has been read. If a failover occurs, the MySQL CDC connector needs only to continue reading data from the chunks whose data has not been read. After the data of all chunks is read, incremental change records are read from the previously obtained binary log file position. The job continues to periodically generate checkpoints to record the binary log file position. If a failover occurs, the MySQL CDC connector processes data from the previous binary log file position. This way, the exactly-once semantics is implemented. For more information about the incremental snapshot algorithm, see MySQL CDC Connector.
- The network between your MySQL database and Ververica Platform (VVP) is connected.
- The MySQL server meets the following version requirements:
- The MySQL version is 5.7 or 8.0.X.
- Binary logging is enabled.
- The binary log format is set to ROW.
- The binlog_row_image parameter is set to FULL.
- The interactive_timeout and wait_timeout parameters are configured in the MySQL configuration file.
- A MySQL user is created and granted the SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT permissions.
- Lock-free reading and parallel reading are supported only when the Flink version is vvr-4.0.8-flink-1.13 or later.
- The MySQL CDC connector supports MySQL V5.7 and MySQL V8.0.X.
- MySQL CDC source tables do not support watermarks. If you need to perform window aggregation on a MySQL CDC source table, you can use a different approach to perform time-based aggregation. For more information, see How do I perform window aggregation if watermarks are not supported?.
- Only MySQL users who have specific permissions are allowed to read full data and incremental data from a MySQL CDC source table. The permissions include SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT.
- Unique server IDs must be explicitly configured for each job.
Each client that synchronizes data from a database has a unique server ID. The MySQL server maintains network connections and binary log file positions based on server IDs. If a large number of clients with different server IDs connect to the MySQL server, CPU utilization of the MySQL server may dramatically increase. This affects the stability of online business.If multiple jobs share the same server ID, the checkpoints for the binary log file positions may be out of order. In this case, the data that is read does not meet your business requirements. Therefore, we recommend that you configure a unique server ID for each CDC job. We recommend that you use dynamic hints to configure server IDs instead of configuring server IDs in DDL statements. The following sample code shows how to configure server IDs.
For more information about dynamic hints, see Dynamic hints.
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;Note If the incremental snapshot algorithm is enabled to read data, you must set the server-id parameter to the server ID range that corresponds to the parallelism of jobs.
- Only jobs that use Ververica Runtime (VVR) 4.0.8 or later support lock-free reading,
parallel reading, and resumable uploads during full data reading.
If your job uses a VVR version earlier than 4.0.8, you must grant the RELOAD permission to the MySQL user to obtain the global read lock. This ensures data reading consistency. The global read lock blocks the write operation, and the locking duration may reach seconds. This may affect online business.
If your job uses a VVR version earlier than 4.0.8, checkpoints cannot be generated during full data reading. If your job fails in the process, the job reads full data again. This reduces the stability of data reading performance. Therefore, we recommend that you update the VVR version used by your job to 4.0.8 or later.
CREATE TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' );
Parameters in the WITH clause
|connector||The type of the source table.||Yes||STRING||Set the value to
|hostname||The IP address or hostname of the MySQL database.||Yes||STRING||N/A.|
|username||The username that is used to access the MySQL database.||Yes||STRING||N/A.|
|password||The password that is used to access the MySQL database.||Yes||STRING||N/A.|
|database-name||The name of the MySQL database.||Yes||STRING||You can set this parameter to a regular expression to read data from multiple databases.|
|table-name||The name of the MySQL table.||Yes||STRING||You can set this parameter to a regular expression to read data from multiple tables.|
|port||The port number that is used to access the MySQL database.||No||INTEGER||Default value: 3306.|
|server-id||The ID that is allocated to a database client.||No||STRING||The ID must be unique in a MySQL cluster. We recommend that you configure a unique
server ID for each job in the same database. By default, a value in the range of 5400
to 6400 is randomly generated.
This parameter can also be set to an ID range, such as 5400-5408. If the incremental snapshot algorithm is enabled, parallel reading is supported. In this case, we recommend that you set this parameter to an ID range so that each parallel job uses a unique ID.
|scan.incremental.snapshot.enabled||Specifies whether to enable the incremental snapshot algorithm.||No||BOOLEAN||By default, the incremental snapshot algorithm is enabled. The incremental snapshot
algorithm is a new mechanism for reading snapshots of full data. Compared with the
original snapshot reading algorithm, the incremental snapshot algorithm has the following
If you want the MySQL CDC connector to support parallel reading, a unique server ID must be allocated to each parallel reader. Therefore, server-id must be set to a range, such as 5400-6400, and the range must be greater than or equal to the value of the Parallelism parameter.
|scan.incremental.snapshot.chunk.size||The chunk size of the table. The chunk size indicates the number of rows.||No||Integer||Default value: 8096. If the incremental snapshot algorithm is enabled, the table is split into multiple chunks for data reading. Before data in a chunk is read, the data is cached in the memory. Therefore, if the chunk size is too large, out of memory (OOM) may occur. If the chunk size is small, a small number of data records are read after fault recovery. This reduces data throughput.|
|scan.snapshot.fetch.size||The maximum number of records that can be extracted each time full data of a table is read.||No||Integer||Default value: 1024.|
|scan.startup.mode||The startup mode when data is consumed.||No||STRING||Valid values:
|server-time-zone||The time zone of the session used by the database.||No||STRING||Example: Asia/Shanghai. This parameter determines how to convert data in the MySQL database from the TIMESTAMP type to the STRING type. For more information, see Debezium time types.|
|debezium.min.row.count.to.stream.results||The minimum number of data records allowed in a table to trigger the batch data read mode. If the number of data records in a table is greater than the value of this parameter, the batch data read mode is used.||No||INTEGER||Default value: 1000. Flink reads data from a MySQL source table in one of the following
|connect.timeout||The maximum duration that the connector waits before it times out after the connector attempts to connect to the MySQL database server.||No||Duration||Default value: 30. Unit: seconds.|
A MySQL CDC connector can run multiple jobs at the same time to read full data. This improves the data loading efficiency. If you use a MySQL CDC connector with the Autopilot feature provided by the VVP of fully managed Flink, automatic scale-in can be performed during incremental data reading after parallel reading is complete. This saves computing resources.
- In Basic mode, the Parallelism parameter specifies the global parallelism of all jobs.
- In Expert mode, you can configure the Parallelism parameter for a specific vertex node based on your business requirements.
Automatic scale-in by using Autopilot
A large amount of historical data is accumulated during full data reading. To improve the reading efficiency, Flink reads historical data in parallel in most cases. During incremental data reading, only a single job is required to read data because the amount of binary log data is small and global order must be ensured. The numbers of compute units (CUs) required during full data reading and incremental data reading are different. You can use the Autopilot feature to balance performance and resource consumption.
Data type mapping
|Data type of MySQL CDC||Data type of Flink|
|BIGINT UNSIGNED||DECIMAL(20, 0)|
|NUMERIC(p, s)||DECIMAL(p, s)|
|TIME [(p)]||TIME [(p)] [WITHOUT TIMEZONE]|
|DATETIME [(p)]||TIMESTAMP [(p)] [WITHOUT TIMEZONE]|
|TIMESTAMP [(p)]||TIMESTAMP [(p)]|
|TIMESTAMP [(p)] WITH LOCAL TIME ZONE|
- How do I perform window aggregation if watermarks are not supported?
- How do I enable the system to skip the snapshot stage and read only the change data?
- How does the system read data from a MySQL database on which sharding is performed?
- What do I do if the data reading efficiency is low and backpressure exists when full data is read from tables?
- What do I do if the error message "com.github.shyiko.mysql.binlog.network.ServerException" appears?
- What do I do if the error message "The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' appears?