All Products
Search
Document Center

Realtime Compute for Apache Flink:Debug a DataStream program that includes the MySQL connector

Last Updated:Nov 10, 2025

This topic describes how to debug a DataStream program that uses the MySQL connector.

Solution for debugging a DataStream program

Create a DataStream API program and use the MySqlSource class. Sample code:

Important

To locally debug a Flink program, you must download the related JAR file and configure dependencies. For more information, see Run and debug connectors locally. Click MysqlCDCDemo.zip to download our sample Maven project.

package com.alibaba.realtimecompute;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.configuration.Configuration;

public class MysqlCDCDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("pipeline.classpaths", "file://" + "Absolute path of the uber JAR file of the MySQL connector");  // Configure dependencies.
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("hostname")
                .port(3306)
                .databaseList("test_db") // Specify the database to capture.
                .tableList("test_db.test_table") // Specify the table to capture.
                .username("username")
                .password("password")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        env.enableCheckpointing(3000);
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(4)
                .print().setParallelism(1);
        env.execute("Print MySQL Snapshot + Binlog");
    }
}

Replace the placeholder values in the above code for configuring the MySqlSource class with your actual values:

Parameter

Description

hostname

The IP address or hostname that is used to access your MySQL database.

port

The port that is used to access your MySQL database.

databaseList

Your MySQL database name.

Note

If you want to read data from multiple databases, you can set this parameter to a regular expression. You can use a combination of a period and an asterisk (.*) to match all databases.

username

The username that is used to access your MySQL database.

password

The password that is used to access your MySQL database.

deserializer

A deserializer, which deserializes SourceRecords to a specified type. Valid values:

  • RowDataDebeziumDeserializeSchema: deserializes SourceRecords to Flink Table/SQL internal data structure RowData.

  • JsonDebeziumDeserializationSchema: deserializes SourceRecords to JSON strings.

Project dependencies in the POM

Local debugging

The connectors of Realtime Compute for Apache Flink are commercially available and differ in many ways from those of Apache Flink. To locally debug a program that uses the Realtime Compute for Apache Flink connector of MySQL, modify the POM file as follows:

  1. Set ${flink.version} to 1.19.0.

  2. Set ${vvr.version} of the MySQL connector to a recommended values, such as 1.17-vvr-8.0.11-6 or 1.20-vvr-11.1.3-jdk11. For more versions, see Maven Repository.

  3. Add a Kafka connector dependency.

Example code:

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>8</java.version>
    <flink.version>1.19.0</flink.version>
    <vvr.version>1.17-vvr-8.0.4-1</vvr.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <!-- VVR 11.x versions use this Group ID -->
        <groupId>com.alibaba.ververica</groupId>

       <!-- VVR 8.x versions use this Group ID -->
        <!-- <groupId>org.apache.flink</groupId> -->
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-mysql</artifactId>
        <version>${vvr.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr.version}</version>
    </dependency>
</dependencies>

Cloud debugging

Before your program is used in production, you can create a deployment from the program JAR in the Realtime Compute for Apache Flink console and run it for inspection. In this case, there are no restrictions on the MySQL connector version you can use. However, ensure both ${flink.version} and ${vvr.version} are aligned with the engine version of the deployment. For information about the version mappings, see Engine updates. Here's an example POM for cloud debugging:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>