All Products
Search
Document Center

Realtime Compute for Apache Flink:Debug a DataStream job that uses the MySQL connector

Last Updated:Jan 12, 2026

This topic describes how to debug and run a DataStream job that uses the MySQL connector.

Debug a DataStream job

You can create a DataStream API program that uses MySqlSource. The following code provides an example:

Important

To debug a job locally, you must download the required JAR files and configure the dependencies. For more information, see Run and debug jobs that contain connectors locally. For a sample Maven project, see MysqlCDCDemo.zip.

package com.alibaba.realtimecompute;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.configuration.Configuration;

public class MysqlCDCDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("pipeline.classpaths", "file://" + "The absolute path of the MySQL uber JAR file");  // Configure the dependency.
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("hostname")
                .port(3306)
                .databaseList("test_db") // Set the database to capture.
                .tableList("test_db.test_table") // Set the table to capture.
                .username("username")
                .password("password")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        env.enableCheckpointing(3000);
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(4)
                .print().setParallelism(1);
        env.execute("Print MySQL Snapshot + Binary Log");
    }
}

When you build MySqlSource, specify the following parameters in the code:

Parameter

Description

hostname

The IP address or hostname of the MySQL database.

port

The port number of the MySQL database service.

databaseList

The name of the MySQL database.

Note

This parameter supports regular expressions to read data from multiple databases. You can use .* to match all databases.

username

The username for the MySQL database service.

password

The password for the MySQL database service.

deserializer

The deserializer that converts records of the SourceRecord type to a specified type. Valid values:

  • RowDataDebeziumDeserializeSchema: Converts SourceRecord to the Flink Table or SQL internal data structure RowData.

  • JsonDebeziumDeserializationSchema: Converts SourceRecord to a string in JSON format.

pom dependencies

Local debugging

The connectors for Realtime Compute for Apache Flink contain commercial features and differ from the open-source Apache Flink versions. To debug a job locally, you must modify the pom file as follows:

  1. Set ${flink.version} to 1.19.0.

  2. The recommended versions for the connector, ${vvr.version}, are as follows:

    1. If you use a Ververica Runtime (VVR) 8.x engine, version 1.17-vvr-8.0.11-6 is recommended.

    2. If you use a VVR 11.x engine, use the latest connector version that matches your engine version. For example, if you use a VVR 11.1 engine, the latest recommended version is 1.20-vvr-11.1.4-jdk11. For information about other versions, see Maven Repository.

  3. Add the Kafka connector dependency.

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>8</java.version>
    <flink.version>1.19.0</flink.version>
    <vvr.version>1.17-vvr-8.0.4-1</vvr.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <!-- Use this Group ID for VVR 11.x versions -->
        <groupId>com.alibaba.ververica</groupId>

       <!-- Use this Group ID for VVR 8.x versions -->
        <!-- <groupId>org.apache.flink</groupId> -->
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-mysql</artifactId>
        <version>${vvr.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr.version}</version>
    </dependency>
</dependencies>

Debug a deployment in Realtime Compute for Apache Flink

When you debug a job in Realtime Compute for Apache Flink, no version restrictions apply to the connector. Ensure that the ${flink.version} and ${vvr.version} values match the database engine version of the job. For more information about version mappings, see DPI engine. The following code provides a sample pom file:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>