MySQL CDC DataStream Connector支持从全量读取到增量读取的无缝切换,保证数据不重不丢。本文为您介绍如何在Flink全托管控制台上使用MySQL CDC DataStream Connector来读取MySQL数据。
背景信息
类别 | 详情 |
---|---|
功能介绍 | MySQL CDC Connector是一个Flink的Source Connector,会先读取数据库的历史全量数据,并平滑切换到Binlog读取上,保证无论是否发生异常,数据不重不丢,实现Exactly Once语义。MySQL CDC Connector支持并发地读取全量数据,通过增量快照算法实现了全程无锁和断点续传。 |
实现原理 | Source在启动时会扫描全表,将表按照主键分成多个chunk,并使用增量快照算法逐个读取每个chunk的数据。作业会周期性执行Checkpoint,记录下已经完成的chunk,以保证在发生Failover时,只需要继续读取未完成的chunk。当chunk全部读取完后,会从之前获取的Binlog位点读取增量的变更记录。Flink作业会继续周期性执行Checkpoint,记录下Binlog位点,当作业发生Failover,便会从之前记录的Binlog位点继续处理,从而实现Exactly Once语义。更详细的增量快照算法,请参见MySQL CDC Connector。 |
核心特性 |
|
前提条件
- MySQL和VVP的网络连通。
- MySQL服务器配置如下:
- MySQL版本为5.7或者8.0.X。
- 已开启了Binlog。
- Binlog格式已设置为ROW。
- binlog_row_image已设置为FULL。
- 已在MySQL配置文件中配置了交互超时或等待超时参数。
- 已创建MySQL用户,并授予了SELECT、 SHOW DATABASES 、REPLICATION SLAVE和REPLICATION CLIENT权限。
说明 以上配置需要在RDS MySQL、PolarDB MySQL或者自建MySQL上操作,详情请参见配置MySQL。
使用限制
MySQL CDC Connector支持读取的MySQL版本为5.7和8.0.X。
注意事项
每个用于读取Binlog的MySQL数据库客户端都应该具有唯一的ID,称为server-id。MySQL服务器将使用此ID来维护网络连接和Binlog位点。因此,如果不同的作业共享相同的server-id,则可能导致从错误的Binlog位点读取数据,造成数据缺失、重复和MySQL服务器的CPU陡增,影响线上业务稳定性等问题。因此建议您在每个DataStream作业的每个MySqlSource数据源,都需要显式配置上不同的server-id。配置不同的Server
ID代码示例如下。
MySqlSource.builder().serverId("56000")
说明 如果开启增量快照读取数据,则server-id配置项需要配置成与作业并发匹配的Server ID范围。例如作业并发度为4,则需要配置为serverId("56001-56004")。
操作流程
Maven中央库中已经内置了VVR Connector,以供您在作业开发时直接使用。您可以通过以下步骤来使用MySQL CDC DataStream Connector:
步骤一:准备DataStream作业开发环境
推荐您直接将Connector作为项目依赖打进作业JAR包中。您可以按照如下步骤操作:
- 在Maven项目的pom.xml文件中,添加以下配置以引用SNAPSHOT仓库。
<repositories> <repository> <id>oss.sonatype.org-snapshot</id> <name>OSS Sonatype Snapshot Repository</name> <url>http://oss.sonatype.org/content/repositories/snapshots</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories>
- 检查您的settings.xml配置文件中是否存在
<mirrorOf>*</mirrorOf>
配置。如果存在
<mirrorOf>*</mirrorOf>
配置,则需要将此配置改为<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。修改的目的是为了避免SNAPSHOT仓库被覆盖,因为mirrorOf中只使用星号(*)会导致第一步中配置的两个repository被覆盖。
- 在作业的Maven项目的pom.xml文件中,添加您需要的Connector作为项目依赖。
<dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.1.0</version> </dependency>
每个Connector版本对应的Connector类型可能不同,建议您使用最新版本。Connector版本、VVR/Flink版本和Connector类型的对应关系请参见Connector列表。注意- 您需要在SNAPSHOT仓库(oss.sonatype.org)查找带SNAPSHOT的Connector版本,在Maven中央库(search.maven.org)上会查找不到。
- 在使用多个Connector时,请注意META-INF目录需要Merge,即在pom.xml文件中添加如下代码。
<transformers> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink</projectName> <encoding>UTF-8</encoding> </transformer> </transformers>
步骤二:开发DataStream作业
创建DataStream API程序并使用MySqlSource。代码示例如下。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}
在构建MySqlSource时,必须指定以下参数。参数 | 说明 |
---|---|
hostname | MySQL数据库的IP地址或者Hostname。 |
port | MySQL数据库服务的端口号。 |
databaseList | MySQL数据库名称。
说明 数据库名称支持正则表达式以读取多个数据库的数据。
|
username | MySQL数据库服务的用户名。 |
password | MySQL数据库服务的密码。 |
deserializer | 反序列化器,将SourceRecord类型记录反序列化到指定类型。参数取值如下:
|
步骤三:打包并提交DataStream作业
使用Maven工具打包工程项目,并将生成的JAR包上传和提交到Flink全托管平台上,详细请参见作业提交。