This topic describes how to debug and run a DataStream job that uses the MySQL connector.
Debug a DataStream job
You can create a DataStream API program that uses MySqlSource. The following code provides an example:
To debug a job locally, you must download the required JAR files and configure the dependencies. For more information, see Run and debug jobs that contain connectors locally. For a sample Maven project, see MysqlCDCDemo.zip.
package com.alibaba.realtimecompute;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.configuration.Configuration;
public class MysqlCDCDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString("pipeline.classpaths", "file://" + "The absolute path of the MySQL uber JAR file"); // Configure the dependency.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("hostname")
.port(3306)
.databaseList("test_db") // Set the database to capture.
.tableList("test_db.test_table") // Set the table to capture.
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.setParallelism(4)
.print().setParallelism(1);
env.execute("Print MySQL Snapshot + Binary Log");
}
}When you build MySqlSource, specify the following parameters in the code:
Parameter | Description |
hostname | The IP address or hostname of the MySQL database. |
port | The port number of the MySQL database service. |
databaseList | The name of the MySQL database. Note This parameter supports regular expressions to read data from multiple databases. You can use |
username | The username for the MySQL database service. |
password | The password for the MySQL database service. |
deserializer | The deserializer that converts records of the SourceRecord type to a specified type. Valid values:
|
pom dependencies
Local debugging
The connectors for Realtime Compute for Apache Flink contain commercial features and differ from the open-source Apache Flink versions. To debug a job locally, you must modify the pom file as follows:
Set ${flink.version} to 1.19.0.
The recommended versions for the connector, ${vvr.version}, are as follows:
If you use a Ververica Runtime (VVR) 8.x engine, version 1.17-vvr-8.0.11-6 is recommended.
If you use a VVR 11.x engine, use the latest connector version that matches your engine version. For example, if you use a VVR 11.1 engine, the latest recommended version is 1.20-vvr-11.1.4-jdk11. For information about other versions, see Maven Repository.
Add the Kafka connector dependency.
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>8</java.version>
<flink.version>1.19.0</flink.version>
<vvr.version>1.17-vvr-8.0.4-1</vvr.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<!-- Use this Group ID for VVR 11.x versions -->
<groupId>com.alibaba.ververica</groupId>
<!-- Use this Group ID for VVR 8.x versions -->
<!-- <groupId>org.apache.flink</groupId> -->
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr.version}</version>
</dependency>
</dependencies>Debug a deployment in Realtime Compute for Apache Flink
When you debug a job in Realtime Compute for Apache Flink, no version restrictions apply to the connector. Ensure that the ${flink.version} and ${vvr.version} values match the database engine version of the job. For more information about version mappings, see DPI engine. The following code provides a sample pom file:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>