The MySQL CDC DataStream connector supports seamless switching from full data reading to incremental data reading in the console of fully managed Flink. This helps avoid data loss and data duplication. This topic describes how to use the MySQL CDC DataStream connector to read data from a MySQL database in the console of fully managed Flink.
Background information
Item | Description |
Overview | The MySQL CDC DataStream connector is a source connector that is supported by fully managed Flink. Fully managed Flink uses the MySQL CDC DataStream connector to read full historical data from a MySQL database and then smoothly switch to read data from binary log files. This avoids data loss and data duplication when an error occurs and implements the exactly-once semantics. You can run multiple deployments at the same time to read full data from a MySQL CDC source table by using the MySQL CDC DataStream connector. When the deployments are running, the incremental savepoint algorithm is used to perform lock-free reading and resumable uploads. |
Implementation | When the source starts to read data from a table, the source scans the full table, separates the table into multiple chunks based on the primary key, and then uses the incremental savepoint algorithm to read data from each chunk. A deployment periodically generates checkpoints to record the chunks whose data is read. This way, the source needs to read data only from the chunks whose data is not read when a failover occurs. After the data of all chunks is read, the source starts to read the incremental change records from the previously obtained binary log file position. The deployment continues to periodically generate checkpoints to record the binary log file position. If a failover occurs, the MySQL CDC DataStream connector processes data from the previous binary log file position. This way, the exactly-once semantics is implemented. For more information about the incremental savepoint algorithm, see MySQL CDC Connector. |
Features |
|
Prerequisites
A network connection is established between your MySQL database and Ververica Platform (VVP).
The MySQL server meets the following requirements:
The MySQL version is 5.7 or 8.0.X.
Binary logging is enabled.
The binary log format is set to ROW.
The binlog_row_image parameter is set to FULL.
The interactive_timeout and wait_timeout parameters are configured in the MySQL configuration file.
A MySQL user is created and granted the SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT permissions.
For more information about how to perform the preceding configurations for an ApsaraDB RDS for MySQL database, a PolarDB for MySQL database, or a self-managed MySQL database, see Configure a MySQL database.
Limits
The MySQL CDC DataStream connector that runs on VVR 4.0.8 or later can read data from MySQL databases of versions 5.7 and 8.0.X. The MySQL CDC DataStream connector that runs on VVR 4.0.11 or later can read data from MySQL databases of versions 5.6, 5.7, and 8.0.X.
Precautions
Each MySQL database client that is used to read data from binary data files must have a unique server ID. The MySQL server maintains network connections and binary log file positions based on the server ID. Therefore, if different deployments share the same server ID, data may be read from an invalid binary log file position. In this case, data loss, data duplication, or a surge in CPU utilization of the MySQL server occurs. This affects the stability of online business. To address these issues, we recommend that you explicitly configure a unique server ID for each MySQL data source of each DataStream deployment. The following sample code shows how to configure server IDs.
MySqlSource.builder().serverId("56000")
If you enable the incremental savepoint algorithm to read data, you must set the serverId parameter to a valid ID range that corresponds to the deployment parallelism. For example, if the deployment parallelism is 4, configure serverId("56001-56004").
Procedure
The Ververica Runtime (VVR) connector is stored in the Maven central repository for you to use during draft development. If you want to use the MySQL CDC DataStream connector, perform the following steps:
Step 1: Prepare the development environment for a DataStream draft
We recommend that you package the connector as a project dependency into the JAR file of your draft. Perform the following steps:
Add the following configurations to the POM file of the Maven project to reference SNAPSHOT repositories:
<repositories> <repository> <id>oss.sonatype.org-snapshot</id> <name>OSS Sonatype Snapshot Repository</name> <url>http://oss.sonatype.org/content/repositories/snapshots</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories>
Check whether your settings.xml configuration file contains the
<mirrorOf>*</mirrorOf>
configuration.If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, the current mirror contains all repositories and the Maven project does not download dependencies from the preceding two specified SNAPSHOT repositories. As a result, the Maven project cannot download SNAPSHOT dependencies from these repositories. To avoid this issue, perform the following operations based on the actual scenario: If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>. If the <mirrorOf>external:*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>external:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>. If the <mirrorOf>external:http:*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>external:http:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.
Add the connector that you want to use to the Maven POM file as a project dependency.
<dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <! -- vvr-4.0.X-flink-1.13 corresponds to V2.2.1 of the MySQL CDC DataStream connector --> <!-- vvr-6.0.X-flink-1.15 corresponds to V2.3.0 of the MySQL CDC DataStream connector --> <version>2.2.1</version> </dependency>
Different connector versions may correspond to different connector types. We recommend that you use the latest version for the type of the connector that you use.
ImportantYou must search for the connector versions that contain the SNAPSHOT keyword in the SNAPSHOT repository oss.sonatype.org. You cannot find the versions in the Maven central repository search.maven.org.
If you use multiple connectors, you must merge the files in the META-INF directory. To merge the files, add the following code to the POM file:
<transformers> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink</projectName> <encoding>UTF-8</encoding> </transformer> </transformers>
Step 2: Develop a DataStream draft
Create a DataStream API program and use the MySqlSource class. Sample statements:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}
The following table describes the parameters that you must configure for the MySqlSource class.
Property | Feature |
hostname | The IP address or hostname that is used to access the MySQL database. |
port | The port that is used to access the MySQL database. |
databaseList | The name of the MySQL database that you want to access. Note If you want to read data from multiple databases, you can set this parameter to a regular expression. |
username | The username that is used to access the MySQL database. |
password | The password that is used to access the MySQL database. |
deserializer | A deserializer, which deserializes SourceRecord into a specified type. Valid values:
|
Step 3: Package the program and publish the DataStream draft
Use Maven to package the program and upload the generated JAR file to the console of fully managed Flink. For more information, see Create a JAR deployment.