The MySQL CDC DataStream connector supports seamless switching from full data reading to incremental data reading in the console of fully managed Flink. This helps prevent data duplication and data loss. This topic describes how to use the MySQL CDC DataStream connector to read data from a MySQL database in the console of fully managed Flink.
Background information
Item | Description |
---|---|
Overview | Tje MySQL CDC DataStream connector is the source connector that is supported by fully managed Flink. Fully managed Flink uses the MySQL CDC DataStream connector to read full historical data from a MySQL database and then smoothly switches to read data from binary log files. This way, data is not missing and no duplicate data is generated regardless of whether an error occurs. This implements the exactly-once semantics. You can run multiple jobs at the same time to read full data from a MySQL CDC source table by using the MySQL CDC DataStream connector. When the jobs are running, the incremental snapshot algorithm is used to perform lock-free reading and resumable uploads. |
Implementation | When the source starts to read data from a table, the source scans the full table, separates the table into multiple chunks based on the primary key, and then uses the incremental snapshot algorithm to read data from each chunk. A job periodically generates checkpoints to record the chunks whose data is read. This way, the source needs to read data only from the chunks whose data is not read when a failover occurs. After the data of all chunks is read, the source starts to read the incremental change records from the previously obtained binary log file position. The job continues to periodically generate checkpoints to record the binary log file position. If a failover occurs, the MySQL CDC DataStream connector processes data from the previous binary log file position. This way, the exactly-once semantics is implemented. For more information about the incremental snapshot algorithm, see MySQL CDC Connector. |
Features |
|
Prerequisites
- A network connection is established between your MySQL database and Ververica Platform (VVP).
- The MySQL server meets the following requirements:
- The MySQL version is 5.7 or 8.0.X.
- Binary logging is enabled.
- The binary log format is set to ROW.
- The binlog_row_image parameter is set to FULL.
- The interactive_timeout and wait_timeout parameters are configured in the MySQL configuration file.
- A MySQL user is created and granted the SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT permissions.
Limits
The MySQL CDC DataStream connector supports only MySQL 5.7 and MySQL 8.0.X.
Usage notes
MySqlSource.builder().serverId("56000")
Procedure
Step 1: Prepare the development environment for a DataStream job
- Add the following configurations to the POM file of the Maven project to reference
SNAPSHOT repositories:
<repositories> <repository> <id>oss.sonatype.org-snapshot</id> <name>OSS Sonatype Snapshot Repository</name> <url>http://oss.sonatype.org/content/repositories/snapshots</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories>
- Check whether the
<mirrorOf>*</mirrorOf>
configuration is contained in your settings.xml configuration file.If the
<mirrorOf>*</mirrorOf>
configuration is contained in the configuration file, change the configuration to<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
.This change prevents the two SNAPSHOT repositories that you configured in Step 1 from being overwritten. If only an asterisk (*) is enclosed in the mirrorOf element, the SNAPSHOT repositories are overwritten.
- Add the connector that you want to use to the Maven POM file as a project dependency.
<dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.1.0</version> </dependency>
Different connector versions may correspond to different connector types. We recommend that you use the latest version for the type of the connector that you use. For more information about the mappings among connector versions, VVR or Flink versions, and connector types, see DataStream connectors.Notice- You must search for the connector versions that contain the SNAPSHOT keyword in the SNAPSHOT repository oss.sonatype.org. You cannot find the versions in the Maven central repository search.maven.org.
- If you use multiple connectors, you must merge the files in the META-INF directory.
To merge the files, add the following code to the POM file:
<transformers> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink</projectName> <encoding>UTF-8</encoding> </transformer> </transformers>
Step 2: Develop a DataStream job
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}
The following table describes the parameters that you must configure for the MySqlSource
class. Parameter | Description |
---|---|
hostname | The IP address or hostname that is used to access the MySQL database. |
port | The port that is used to access the MySQL database. |
databaseList | The name of the MySQL database that you want to access.
Note You can set this parameter to a regular expression to read data from multiple databases.
|
username | The username that is used to access the MySQL database. |
password | The password that is used to access the MySQL database. |
deserializer | A deserializer, which deserializes SourceRecord to a specified type. Valid values:
|
Step 3: Package the program and publish a DataStream job
Use Maven to package the program and upload the generated JAR file to the console of fully managed Flink. For more information, see Publish a job.