MySQL CDC DataStream Connector支持从全量读取到增量读取的无缝切换,保证数据不重不丢。本文为您介绍如何在Flink全托管控制台上使用MySQL CDC DataStream Connector来读取MySQL数据。

背景信息

类别 详情
功能介绍 MySQL CDC Connector是一个Flink的Source Connector,会先读取数据库的历史全量数据,并平滑切换到Binlog读取上,保证无论是否发生异常,数据不重不丢,实现Exactly Once语义。MySQL CDC Connector支持并发地读取全量数据,通过增量快照算法实现了全程无锁和断点续传。
实现原理 Source在启动时会扫描全表,将表按照主键分成多个chunk,并使用增量快照算法逐个读取每个chunk的数据。作业会周期性执行Checkpoint,记录下已经完成的chunk,以保证在发生Failover时,只需要继续读取未完成的chunk。当chunk全部读取完后,会从之前获取的Binlog位点读取增量的变更记录。Flink作业会继续周期性执行Checkpoint,记录下Binlog位点,当作业发生Failover,便会从之前记录的Binlog位点继续处理,从而实现Exactly Once语义。更详细的增量快照算法,请参见MySQL CDC Connector
核心特性
  • 支持读取全量和增量Binlog数据,方便您一站式初始化全量数据和获取变化数据。
  • 支持并发读取全量数据,扩展性能水平。
  • 支持从全量读取到增量读取的无缝切换,自动缩容,节省计算资源。
  • 支持无锁读取全量数据,不影响在线业务。
  • 全量阶段读取支持断点续传,提升数据容错能力。

前提条件

  • MySQL和VVP的网络连通。
  • MySQL服务器配置如下:
    • MySQL版本为5.7或者8.0.X。
    • 已开启了Binlog。
    • Binlog格式已设置为ROW。
    • binlog_row_image已设置为FULL。
  • 已在MySQL配置文件中配置了交互超时或等待超时参数。
  • 已创建MySQL用户,并授予了SELECT、 SHOW DATABASES 、REPLICATION SLAVE和REPLICATION CLIENT权限。
说明 以上配置需要在RDS MySQL、PolarDB MySQL或者自建MySQL上操作,详情请参见配置MySQL

使用限制

MySQL CDC Connector支持读取的MySQL版本为5.7和8.0.X。

注意事项

每个用于读取Binlog的MySQL数据库客户端都应该具有唯一的ID,称为server-id。MySQL服务器将使用此ID来维护网络连接和Binlog位点。因此,如果不同的作业共享相同的server-id,则可能导致从错误的Binlog位点读取数据,造成数据缺失、重复和MySQL服务器的CPU陡增,影响线上业务稳定性等问题。因此建议您在每个DataStream作业的每个MySqlSource数据源,都需要显式配置上不同的server-id。配置不同的Server ID代码示例如下。
MySqlSource.builder().serverId("56000")
说明 如果开启增量快照读取数据,则server-id配置项需要配置成与作业并发匹配的Server ID范围。例如作业并发度为4,则需要配置为serverId("56001-56004")。

操作流程

Maven中央库中已经内置了VVR Connector,以供您在作业开发时直接使用。您可以通过以下步骤来使用MySQL CDC DataStream Connector:
  1. 步骤一:准备DataStream作业开发环境
  2. 步骤二:开发DataStream作业
  3. 步骤三:打包并提交DataStream作业

步骤一:准备DataStream作业开发环境

推荐您直接将Connector作为项目依赖打进作业JAR包中。您可以按照如下步骤操作:
  1. 在Maven项目的pom.xml文件中,添加以下配置以引用SNAPSHOT仓库。
    <repositories>
      <repository>
        <id>oss.sonatype.org-snapshot</id>
        <name>OSS Sonatype Snapshot Repository</name>
        <url>http://oss.sonatype.org/content/repositories/snapshots</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
      <repository>
        <id>apache.snapshots</id>
        <name>Apache Development Snapshot Repository</name>
        <url>https://repository.apache.org/content/repositories/snapshots/</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
    </repositories>
  2. 检查您的settings.xml配置文件中是否存在<mirrorOf>*</mirrorOf>配置。

    如果存在<mirrorOf>*</mirrorOf>配置,则需要将此配置改为<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>

    修改的目的是为了避免SNAPSHOT仓库被覆盖,因为mirrorOf中只使用星号(*)会导致第一步中配置的两个repository被覆盖。

  3. 在作业的Maven项目的pom.xml文件中,添加您需要的Connector作为项目依赖。
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.1.0</version>
    </dependency>
    每个Connector版本对应的Connector类型可能不同,建议您使用最新版本。Connector版本、VVR/Flink版本和Connector类型的对应关系请参见Connector列表
    注意
    • 您需要在SNAPSHOT仓库(oss.sonatype.org)查找带SNAPSHOT的Connector版本,在Maven中央库(search.maven.org)上会查找不到。
    • 在使用多个Connector时,请注意META-INF目录需要Merge,即在pom.xml文件中添加如下代码。
      <transformers>
          <!-- The service transformer is needed to merge META-INF/services files -->
          <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
          <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
              <projectName>Apache Flink</projectName>
              <encoding>UTF-8</encoding>
          </transformer>
      </transformers>

步骤二:开发DataStream作业

创建DataStream API程序并使用MySqlSource。代码示例如下。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlSourceExample {

  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // set captured database
        .tableList("yourDatabaseName.yourTableName") // set captured table
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
        .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // enable checkpoint
    env.enableCheckpointing(3000);

    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute("Print MySQL Snapshot + Binlog");
  }
}
           
在构建MySqlSource时,必须指定以下参数。
参数 说明
hostname MySQL数据库的IP地址或者Hostname。
port MySQL数据库服务的端口号。
databaseList MySQL数据库名称。
说明 数据库名称支持正则表达式以读取多个数据库的数据。
username MySQL数据库服务的用户名。
password MySQL数据库服务的密码。
deserializer 反序列化器,将SourceRecord类型记录反序列化到指定类型。参数取值如下:
  • RowDataDebeziumDeserializeSchema:将SourceRecord转成Flink Table或SQL内部数据结构RowData。
  • JsonDebeziumDeserializationSchema:将SourceRecord转成JSON格式的String。

步骤三:打包并提交DataStream作业

使用Maven工具打包工程项目,并将生成的JAR包上传和提交到Flink全托管平台上,详细请参见作业提交