The MySQL CDC DataStream connector supports seamless switching from full data reading to incremental data reading in the console of fully managed Flink. This helps prevent data duplication and data loss. This topic describes how to use the MySQL CDC DataStream connector to read data from a MySQL database in the console of fully managed Flink.

Background information

Item Description
Overview Tje MySQL CDC DataStream connector is the source connector that is supported by fully managed Flink. Fully managed Flink uses the MySQL CDC DataStream connector to read full historical data from a MySQL database and then smoothly switches to read data from binary log files. This way, data is not missing and no duplicate data is generated regardless of whether an error occurs. This implements the exactly-once semantics. You can run multiple jobs at the same time to read full data from a MySQL CDC source table by using the MySQL CDC DataStream connector. When the jobs are running, the incremental snapshot algorithm is used to perform lock-free reading and resumable uploads.
Implementation When the source starts to read data from a table, the source scans the full table, separates the table into multiple chunks based on the primary key, and then uses the incremental snapshot algorithm to read data from each chunk. A job periodically generates checkpoints to record the chunks whose data is read. This way, the source needs to read data only from the chunks whose data is not read when a failover occurs. After the data of all chunks is read, the source starts to read the incremental change records from the previously obtained binary log file position. The job continues to periodically generate checkpoints to record the binary log file position. If a failover occurs, the MySQL CDC DataStream connector processes data from the previous binary log file position. This way, the exactly-once semantics is implemented. For more information about the incremental snapshot algorithm, see MySQL CDC Connector.
Features
  • Supports full data reading and incremental binary log reading. This allows you to initialize full data and obtain the change data.
  • Allows you to run multiple jobs at the same time to read full data from a MySQL CDC source table. This improves data reading performance.
  • Supports seamless switching from full data reading to incremental data reading. This implements automatic scale-in and saves computing resources.
  • Supports full data reading without locks. This way, online business is not affected.
  • Supports resumable uploads during full data reading. This improves data fault tolerance.

Prerequisites

  • A network connection is established between your MySQL database and Ververica Platform (VVP).
  • The MySQL server meets the following requirements:
    • The MySQL version is 5.7 or 8.0.X.
    • Binary logging is enabled.
    • The binary log format is set to ROW.
    • The binlog_row_image parameter is set to FULL.
  • The interactive_timeout and wait_timeout parameters are configured in the MySQL configuration file.
  • A MySQL user is created and granted the SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT permissions.
Note For more information about how to perform the preceding configurations for an ApsaraDB RDS for MySQL database, a PolarDB for MySQL database, or a self-managed MySQL database, see Configure a MySQL database.

Limits

The MySQL CDC DataStream connector supports only MySQL 5.7 and MySQL 8.0.X.

Usage notes

Each MySQL database client that is used to read data from binary data files must have a unique server ID. The MySQL server maintains network connections and the binary log file position based on the server ID. Therefore, if different jobs share the same server ID, data may be read from an invalid binary log file position. In this case, data loss, data duplication, or a surge in CPU utilization of the MySQL server occurs. This affects the stability of online business. To address these issues, we recommend that you explicitly configure a unique server ID for each MySQL data source of each DataStream job. The following sample code shows how to configure server IDs.
MySqlSource.builder().serverId("56000")
Note If you enable the incremental snapshot algorithm to read data, you must set the serverId parameter to a valid ID range that corresponds to the job parallelism. For example, if the job parallelism is 4, configure serverId("56001-56004").

Procedure

The Ververica Runtime (VVR) connector is stored in the Maven central repository for you to use during job development. If you want to use the MySQL CDC DataStream connector, perform the following steps:
  1. Step 1: Prepare the development environment for a DataStream job
  2. Step 2: Develop a DataStream job
  3. Step 3: Package the program and publish a DataStream job

Step 1: Prepare the development environment for a DataStream job

We recommend that you package the connector as a project dependency into the JAR file of your job. Perform the following steps:
  1. Add the following configurations to the POM file of the Maven project to reference SNAPSHOT repositories:
    <repositories>
      <repository>
        <id>oss.sonatype.org-snapshot</id>
        <name>OSS Sonatype Snapshot Repository</name>
        <url>http://oss.sonatype.org/content/repositories/snapshots</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
      <repository>
        <id>apache.snapshots</id>
        <name>Apache Development Snapshot Repository</name>
        <url>https://repository.apache.org/content/repositories/snapshots/</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
    </repositories>
  2. Check whether the <mirrorOf>*</mirrorOf> configuration is contained in your settings.xml configuration file.

    If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.

    This change prevents the two SNAPSHOT repositories that you configured in Step 1 from being overwritten. If only an asterisk (*) is enclosed in the mirrorOf element, the SNAPSHOT repositories are overwritten.

  3. Add the connector that you want to use to the Maven POM file as a project dependency.
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.1.0</version>
    </dependency>
    Different connector versions may correspond to different connector types. We recommend that you use the latest version for the type of the connector that you use. For more information about the mappings among connector versions, VVR or Flink versions, and connector types, see DataStream connectors.
    Notice
    • You must search for the connector versions that contain the SNAPSHOT keyword in the SNAPSHOT repository oss.sonatype.org. You cannot find the versions in the Maven central repository search.maven.org.
    • If you use multiple connectors, you must merge the files in the META-INF directory. To merge the files, add the following code to the POM file:
      <transformers>
          <!-- The service transformer is needed to merge META-INF/services files -->
          <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
          <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
              <projectName>Apache Flink</projectName>
              <encoding>UTF-8</encoding>
          </transformer>
      </transformers>

Step 2: Develop a DataStream job

Create a DataStream API program and use the MySqlSource class. Sample code:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlSourceExample {

  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // set captured database
        .tableList("yourDatabaseName.yourTableName") // set captured table
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
        .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // enable checkpoint
    env.enableCheckpointing(3000);

    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute("Print MySQL Snapshot + Binlog");
  }
}
           
The following table describes the parameters that you must configure for the MySqlSource class.
Parameter Description
hostname The IP address or hostname that is used to access the MySQL database.
port The port that is used to access the MySQL database.
databaseList The name of the MySQL database that you want to access.
Note You can set this parameter to a regular expression to read data from multiple databases.
username The username that is used to access the MySQL database.
password The password that is used to access the MySQL database.
deserializer A deserializer, which deserializes SourceRecord to a specified type. Valid values:
  • RowDataDebeziumDeserializeSchema: deserializes SourceRecords to the internal data structure RowData of the Flink Table API or Flink SQL API.
  • JsonDebeziumDeserializationSchema: deserializes SourceRecords to JSON strings.

Step 3: Package the program and publish a DataStream job

Use Maven to package the program and upload the generated JAR file to the console of fully managed Flink. For more information, see Publish a job.