This topic explains how to write and debug a DataStream job that reads from MySQL using the MySQL connector, covering both local debugging and deployment on Realtime Compute for Apache Flink.
Prerequisites
Before you begin, ensure that you have:
-
A MySQL database accessible from your development environment
-
The MySQL connector JAR and its dependencies (required for local debugging only). For setup instructions, see Run and debug jobs that contain connectors locally
Write a DataStream job with MySqlSource
The following example shows a complete DataStream job that captures changes from a MySQL table using MySqlSource and prints them as JSON.
To debug this job locally, download the required JAR files and configure the dependencies before running. For a sample Maven project, see MysqlCDCDemo.zip.
package com.alibaba.realtimecompute;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.configuration.Configuration;
public class MysqlCDCDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// Configure the dependency: provide the absolute path to the MySQL uber JAR file.
conf.setString("pipeline.classpaths", "file://" + "<absolute-path-to-mysql-uber-jar>");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("<mysql-hostname>") // IP address or hostname of the MySQL database
.port(3306) // Port of the MySQL database service
.databaseList("test_db") // Database to capture. Supports regex — use .* to match all databases.
.tableList("test_db.test_table") // Table to capture, in the format <database>.<table>
.username("<username>") // MySQL username
.password("<password>") // MySQL password
.deserializer(new JsonDebeziumDeserializationSchema()) // Converts SourceRecord to a JSON string
.build();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.setParallelism(4) // 4 parallel source tasks for reading
.print().setParallelism(1); // Parallelism 1 on the sink preserves message ordering
env.execute("Print MySQL Snapshot + Binary Log");
}
}
MySqlSource parameters
| Parameter | Description |
|---|---|
hostname |
IP address or hostname of the MySQL database |
port |
Port number of the MySQL database service |
databaseList |
Database to capture. Supports regular expressions — use .* to match all databases. |
tableList |
Table to capture, in the format <database>.<table> |
username |
Username for the MySQL database service |
password |
Password for the MySQL database service |
deserializer |
Converts records of the SourceRecord type to a specified type. Valid values: RowDataDebeziumDeserializeSchema (converts to Flink's internal RowData structure) or JsonDebeziumDeserializationSchema (converts to a JSON string) |
Configure pom dependencies
The pom configuration differs between local debugging and deployment on Realtime Compute for Apache Flink. For local debugging, all dependencies are bundled into the JAR. For platform deployment, most Flink dependencies are marked as provided because the platform already supplies them.
Local debugging
Set ${flink.version} to 1.19.0 and choose ${vvr.version} based on your Ververica Runtime (VVR) engine version:
-
VVR 8.x: use version
1.17-vvr-8.0.11-6 -
VVR 11.x: use the latest version that matches your engine. For VVR 11.1, the latest recommended version is
1.20-vvr-11.1.4-jdk11. For other versions, see Maven Repository.
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>8</java.version>
<flink.version>1.19.0</flink.version>
<vvr.version>1.17-vvr-8.0.4-1</vvr.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<!-- Use com.alibaba.ververica for VVR 11.x; use org.apache.flink for VVR 8.x -->
<groupId>com.alibaba.ververica</groupId>
<!-- <groupId>org.apache.flink</groupId> -->
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr.version}</version>
</dependency>
</dependencies>
Deployment on Realtime Compute for Apache Flink
When deploying to Realtime Compute for Apache Flink, no connector version restrictions apply. Match ${flink.version} and ${vvr.version} to your job's database engine version. For version mappings, see DPI engine.
Mark all standard Flink dependencies as provided — the platform supplies them at runtime. Only ververica-connector-mysql is bundled without provided.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>