本文為您介紹如何調試和運行使用MySQL連接器的 DataStream 作業。
DataStream調試方案
建立DataStream API程式並使用MySqlSource。程式碼範例如下:
本地調試需要下載相關jar並配置依賴,詳情請參見本地運行和調試包含連接器的作業。本文已提供Maven樣本專案,詳情請參考MysqlCDCDemo.zip。
package com.alibaba.realtimecompute;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.configuration.Configuration;
public class MysqlCDCDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString("pipeline.classpaths", "file://" + "mysql uber jar絕對路徑"); // 配置依賴
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("hostname")
.port(3306)
.databaseList("test_db") // 設定捕獲的資料庫
.tableList("test_db.test_table") // 設定捕獲的表
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.setParallelism(4)
.print().setParallelism(1);
env.execute("Print MySQL Snapshot + Binlog");
}
}在構建MySqlSource時,代碼中必須指定以下參數:
參數 | 說明 |
hostname | MySQL資料庫的IP地址或者主機名稱。 |
port | MySQL資料庫服務的連接埠號碼。 |
databaseList | MySQL資料庫名稱。 說明 資料庫名稱支援Regex以讀取多個資料庫的資料,您可以使用 |
username | MySQL資料庫服務的使用者名稱。 |
password | MySQL資料庫服務的密碼。 |
deserializer | 還原序列化器,將SourceRecord類型記錄還原序列化到指定類型。參數取值如下:
|
pom依賴
本地調試
阿里雲Realtime ComputeFlink版連接器包含商業化的額外內容,與Apache Flink版本有許多差異,本地調試時需對pom做如下修改:
${flink.version}必須為1.19.0。
連接器${vvr.version}推薦版本如下:
使用vvr 8.x版本引擎時,推薦1.17-vvr-8.0.11-6版本。
使用vvr 11.x版本引擎時,推薦使用最新的對應引擎版本的連接器,例如如果使用vvr 11.1版本引擎,推薦使用最新版本1.20-vvr-11.1.4-jdk11。其他版本可見Maven Repository。
需要添加kafka連接器依賴。
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>8</java.version>
<flink.version>1.19.0</flink.version>
<vvr.version>1.17-vvr-8.0.4-1</vvr.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<!-- VVR 11.x 版本使用此 Group ID -->
<groupId>com.alibaba.ververica</groupId>
<!-- VVR 8.x 版本使用此 Group ID -->
<!-- <groupId>org.apache.flink</groupId> -->
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr.version}</version>
</dependency>
</dependencies>Realtime ComputeFlink版部署調試
在Realtime ComputeFlink版啟動作業調試時,連接器無版本限制,${flink.version}和${vvr.version}僅需和作業引擎版本相對應即可,對應關係請參見引擎。參考pom如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>