This topic provides the DDL syntax that is used to create a MySQL Change Data Capture (CDC) source table, describes the parameters in the WITH clause, and provides data type mappings.

What is a MySQL CDC source table?

A MySQL CDC source table is a streaming source table of MySQL databases. A MySQL CDC source table reads full historical data from a database and then reads data from binary log files. This way, data accuracy is ensured. If an error occurs, the exactly-once semantics can be used to ensure data accuracy. You can run multiple jobs at the same time to read full data from a MySQL CDC source table by using the MySQL CDC connector. When the jobs are running, the incremental snapshot algorithm is used to perform lock-free reading and resumable uploads.

The MySQL CDC connector provides the following features:
  • Integrates stream processing and batch processing to support full and incremental data reading. This way, you do not need to separately implement stream processing and batch processing.
  • Allows you to run multiple jobs at the same time to read full data from a MySQL CDC source table. This way, data can be read in a more efficient manner.
  • Seamlessly switches between full and incremental data reading and supports automatic scale-in operations. This reduces computing resources that are consumed.
  • Supports resumable uploads during full data reading. This way, data can be uploaded in a more stable manner.
  • Reads full data without locks. This way, online business is not affected.

Implementation

When a fully managed Flink job starts, the MySQL CDC connector scans the full table and splits the table into multiple chunks based on the primary key. Then, the MySQL CDC connector uses the incremental snapshot algorithm to read the data from each chunk. The job periodically generates checkpoints to record the chunks whose data is read. If a failover occurs, the MySQL CDC connector needs to only continue reading data from the chunks whose data is not read. After the data of all chunks is read, incremental change records are read from the previously obtained binary log file position. The job continues to periodically generate checkpoints to record the binary log file position. If a failover occurs, the MySQL CDC connector processes data from the previous binary log file position. This way, the exactly-once semantics is implemented. For more information about the incremental snapshot algorithm, see MySQL CDC Connector.

Prerequisites

  • A network connection is established between your MySQL database and Ververica Platform (VVP).
  • The MySQL server meets the following requirements:
    • The MySQL version is 5.7 or 8.0.X.
    • Binary logging is enabled.
    • The format of binary logs is set to ROW.
    • The binlog_row_image parameter is set to FULL.
  • The interactive_timeout and wait_timeout parameters are configured in the MySQL configuration file.
  • A MySQL user is created and granted the SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT permissions.
Note For more information about how to perform the preceding configurations for an ApsaraDB RDS for MySQL database, a PolarDB for MySQL database, or a self-managed MySQL database, see Configure a MySQL database.

Limits

  • Lock-free reading and parallel reading are supported only when the version of Flink compute engine is vvr-4.0.8-flink-1.13 or later.
  • The MySQL CDC connector supports MySQL V5.7 and MySQL V8.0.X.
    Note In Ververica Runtime (VVR) 4.0.11 and later, data of MySQL V5.6 can be read.
  • MySQL CDC source tables do not support watermarks. If you want to perform window aggregation on a MySQL CDC source table, you can use a different approach to perform time-based aggregation. For more information, see How do I perform window aggregation if watermarks are not supported?.
  • Only MySQL users who are granted specific permissions can read full data and incremental data from a MySQL CDC source table. The permissions include SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT.

Precautions

  • Unique server IDs must be explicitly configured for each job.

    Each client that synchronizes data from a database has a unique server ID. The MySQL server maintains network connections and binary log file positions based on the server ID. If a large number of clients that have different server IDs connect to the MySQL server, CPU utilization of the MySQL server may significantly increase. This affects the stability of online business.

    If multiple jobs share the same server ID, the checkpoints for the binary log file positions may be out of order. As a result, the data that is read may not meet your business requirements. Therefore, we recommend that you configure a unique server ID for each CDC job. We recommend that you use dynamic hints to configure server IDs instead of configuring server IDs in DDL statements. The following sample code shows how to configure server IDs.
    SELECT * FROM source_table  /*+ OPTIONS('server-id'='123456') */ ;
    For more information about dynamic hints, see Dynamic hints.
    Note If the incremental snapshot algorithm is enabled to read data, you must set the server-id parameter to the server ID range that corresponds to the parallelism of jobs.
  • Only jobs that use VVR 4.0.8 or later support lock-free reading, parallel reading, and resumable uploads during full data reading.

    If your job uses a VVR version that is earlier than 4.0.8, you must grant the RELOAD permission to the MySQL user to obtain the global read lock. This ensures data reading consistency. The global read lock may block write operations for a few seconds. This may affect online business.

    If your job uses a VVR version that is earlier than 4.0.8, checkpoints cannot be generated during full data reading. If your job fails in the process, the job reads full data again. This reduces the stability of data reading performance. Therefore, we recommend that you update the version of VVR that is used by your job to 4.0.8 or later.

DDL syntax

CREATE TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

Parameters in the WITH clause

Parameter Description Required Data type Remarks
connector The type of the source table. Yes STRING You can set this parameter to mysql-cdc or mysql. The two values are equivalent.
hostname The IP address or hostname that is used to access the MySQL database. Yes STRING N/A.
username The username that is used to access the MySQL database. Yes STRING N/A.
password The password that is used to access the MySQL database. Yes STRING N/A.
database-name The name of the MySQL database. Yes STRING If you want to read data from multiple databases, you can set this parameter to a regular expression.
table-name The name of the MySQL table. Yes STRING If you want to read data from multiple tables, you can set this parameter to a regular expression.
port The port number that is used to access the MySQL database. No INTEGER Default value: 3306.
server-id The ID that is allocated to a database client. No STRING The ID must be unique in a MySQL cluster. We recommend that you configure a unique server ID for each job in the same database. By default, a value in the range of 5400 to 6400 is randomly generated.

This parameter can also be set to an ID range, such as 5400-5408. If the incremental snapshot algorithm is enabled, parallel reading is supported. In this case, we recommend that you set this parameter to an ID range so that each parallel job uses a unique ID.

scan.incremental.snapshot.enabled Specifies whether to enable the incremental snapshot algorithm. No BOOLEAN By default, the incremental snapshot algorithm is enabled. The incremental snapshot algorithm is a new mechanism that is used to read snapshots of full data. Compared with the original snapshot reading algorithm, the incremental snapshot algorithm has the following advantages:
  • When the MySQL CDC connector reads full data, the MySQL CDC connector can read data from multiple MySQL databases at the same time.
  • When the MySQL CDC connector reads full data, the MySQL CDC connector can generate checkpoints for chunks.
  • When the MySQL CDC connector reads full data, the MySQL CDC connector does not need to execute the FLUSH TABLES WITH READ LOCK statement to obtain a global read lock.

If you want the MySQL CDC connector to support parallel reading, a unique server ID must be allocated to each parallel reader. Therefore, you must set server-id to a range that is greater than or equal to the value of the Parallelism parameter. For example, you can set server-id to 5400-6400.

scan.incremental.snapshot.chunk.size The chunk size of the table. The chunk size indicates the number of rows. No INTEGER Default value: 8096. If the incremental snapshot algorithm is enabled, the table is split into multiple chunks for data reading. Before data in a chunk is read, the data is cached in the memory. Therefore, if the chunk size is too large, an out of memory (OOM) error may occur. If the chunk size is small, a small number of data records are read after fault recovery. This reduces data throughput.
scan.snapshot.fetch.size The maximum number of records that can be extracted each time full data of a table is read. No INTEGER Default value: 1024.
scan.startup.mode The startup mode when data is consumed. No STRING Valid values:
  • initial: When the MySQL CDC connector starts for the first time, fully managed Flink scans all historical data and reads the most recent binary log data. This is the default value.
  • latest-offset: When the MySQL CDC connector starts for the first time, fully managed Flink reads binary log data from the most recent offset, instead of scanning all historical data. This way, fully managed Flink reads only the most recent incremental data after the connector starts.
server-time-zone The time zone of the session that is used by the database. No STRING Example: Asia/Shanghai. This parameter determines how to convert data in the MySQL database from the TIMESTAMP type to the STRING type. For more information, see Debezium time types.
debezium.min.row.count.to.stream.results The minimum number of data records allowed in a table to trigger the batch data read mode. If the number of data records in a table is greater than the value of this parameter, the batch data read mode is used. No INTEGER Default value: 1000. Fully managed Flink reads data from a MySQL source table in one of the following modes:
  • Full data read: reads all data from the table and writes the data to the memory. Data is read at a high speed, but memory space is consumed. If the source table contains a large amount of data, an OOM error may occur in this mode.
  • Batch data read: reads data of a specific number of rows at a specified point in time until all data is read. OOM is prevented even if the table contains a large amount of data. However, data is read at a low speed.
connect.timeout The maximum duration that the connector waits before it times out after the connector attempts to connect to the MySQL database server. No Duration Default value: 30. Unit: seconds.

Metadata

In most cases, access to metadata is required when you merge and synchronize tables in a sharded database. If you expect to identify data records by the source database names and table names after tables are merged, you can configure metadata columns in the data merging statement to read the source database name and table name of each data record. This way, you can identify the source of each data record after tables are merged.

MySQL CDC source tables that are used by fully managed Flink of vvr-4.0.11-flink-1.13 or later support the metadata column syntax. The following table describes the metadata that you can access by using metadata columns.
Metadata key Metadata type Description
database_name STRING NOT NULL The name of the source database to which the current data record belongs.
table_name STRING NOT NULL The name of the source table to which the current data record belongs.
op_ts TIMESTAMP_LTZ(3) NOT NULL The time when the current data record changes in the database. If the data record is obtained from the historical data of the table instead of from the binary log file, the value of the metadata key is fixed to 0.
The following sample code shows how to merge multiple orders tables in database shards of a MySQL instance into a MySQL table named mysql_orders and synchronize data from the MySQL table to a Hologres table named holo_orders.
CREATE TABLE mysql_orders (
  db_name STRING METADATA FROM 'database_name' VIRTUAL,  -- Read the database name. 
  table_name STRING METADATA  FROM 'table_name' VIRTUAL, -- Read the table name. 
  operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Read the change time. 
  order_id INT,
  order_date TIMESTAMP(0),
  customer_name STRING,
  price DECIMAL(10, 5),
  product_id INT,
  order_status BOOLEAN,
  PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'flinkuser',
  'password' = 'flinkpw',
  'database-name' = 'mydb_.*', -- Use a regular expression to match multiple database shards. 
  'table-name' = 'orders_.*'   -- Use a regular expression to match multiple tables in the sharded database. 
);

INSERT INTO holo_orders SELECT * FROM mysql_orders;

Parallelism control

The MySQL CDC connector can run multiple jobs at the same time to read full data. This improves the data loading efficiency. If you use the MySQL CDC connector with the Autopilot feature that is provided by the VVP of fully managed Flink, automatic scale-in can be performed during incremental data reading after parallel reading is complete. This saves computing resources.

You can configure the Parallelism parameter in Basic mode or Expert mode in the Resource Configuration panel in the console of fully managed Flink. The setting of the Parallelism parameter varies based on the resource configuration mode.
  • In Basic mode, the Parallelism parameter specifies the global parallelism of all jobs. Basic mode
  • In Expert mode, you can configure the Parallelism parameter for a specific vertex node based on your business requirements. Parallelism for a vertex node
For more information about resource configurations, see Configure resources in expert mode.
Notice When you configure the Parallelism parameter in Basic mode or Expert mode, make sure that the range specified by server-id in the table is greater than or equal to the value of the Parallelism parameter. For example, if the range of server-id is 5404-5412, eight unique server IDs can be used. Therefore, you can configure a maximum of eight parallel jobs. In addition, the range specified by server-id for the same MySQL instance in different jobs cannot overlap. Each job must be explicitly configured with a unique server ID.

Automatic scale-in by using Autopilot

When full data is read, a large amount of historical data is accumulated. In most cases, fully managed Flink reads historical data in parallel to improve reading efficiency. When incremental data is read, only a single job is required to read data because the amount of binary log data is small and the global order must be ensured. The numbers of compute units (CUs) that are required during full data reading and incremental data reading are different. You can use the Autopilot feature to balance performance and resource consumption.

Autopilot monitors the traffic for each task that is used by the MySQL CDC source table. If the binary log data is read in only one task and other tasks are idle during incremental data reading, Autopilot automatically reduces the number of CUs and the parallelism. To enable Autopilot, you need only to set the Mode parameter of Autopilot to Active on the Deployments page.
Note By default, the minimum interval at which the decrease of the parallelism is triggered is set to 24 hours. For more information about Autopilot and related parameters, see Configure Autopilot.

Data type mappings

The following table lists the data type mappings between MySQL CDC and Flink.
Data type of MySQL CDC Data type of Flink
TINYINT TINYINT
SMALLINT SMALLINT
TINYINT UNSIGNED
INT INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT BIGINT
INT UNSIGNED
BIGINT UNSIGNED DECIMAL(20, 0)
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DOUBLE PRECISION
NUMERIC(p, s) DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN BOOLEAN
TINYINT(1)
DATE DATE
TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE]
DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n) STRING
VARCHAR(n)
TEXT
BINARY BYTES
VARBINARY
BLOB

FAQ