All Products
Search
Document Center

Realtime Compute for Apache Flink:Debug MySQL DataStream

Last Updated:Mar 26, 2026

This topic explains how to write and debug a DataStream job that reads from MySQL using the MySQL connector, covering both local debugging and deployment on Realtime Compute for Apache Flink.

Prerequisites

Before you begin, ensure that you have:

Write a DataStream job with MySqlSource

The following example shows a complete DataStream job that captures changes from a MySQL table using MySqlSource and prints them as JSON.

Important

To debug this job locally, download the required JAR files and configure the dependencies before running. For a sample Maven project, see MysqlCDCDemo.zip.

package com.alibaba.realtimecompute;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.configuration.Configuration;

public class MysqlCDCDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // Configure the dependency: provide the absolute path to the MySQL uber JAR file.
        conf.setString("pipeline.classpaths", "file://" + "<absolute-path-to-mysql-uber-jar>");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("<mysql-hostname>")           // IP address or hostname of the MySQL database
                .port(3306)                             // Port of the MySQL database service
                .databaseList("test_db")               // Database to capture. Supports regex — use .* to match all databases.
                .tableList("test_db.test_table")       // Table to capture, in the format <database>.<table>
                .username("<username>")                 // MySQL username
                .password("<password>")                 // MySQL password
                .deserializer(new JsonDebeziumDeserializationSchema()) // Converts SourceRecord to a JSON string
                .build();

        env.enableCheckpointing(3000);
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(4)          // 4 parallel source tasks for reading
                .print().setParallelism(1); // Parallelism 1 on the sink preserves message ordering

        env.execute("Print MySQL Snapshot + Binary Log");
    }
}

MySqlSource parameters

Parameter Description
hostname IP address or hostname of the MySQL database
port Port number of the MySQL database service
databaseList Database to capture. Supports regular expressions — use .* to match all databases.
tableList Table to capture, in the format <database>.<table>
username Username for the MySQL database service
password Password for the MySQL database service
deserializer Converts records of the SourceRecord type to a specified type. Valid values: RowDataDebeziumDeserializeSchema (converts to Flink's internal RowData structure) or JsonDebeziumDeserializationSchema (converts to a JSON string)

Configure pom dependencies

The pom configuration differs between local debugging and deployment on Realtime Compute for Apache Flink. For local debugging, all dependencies are bundled into the JAR. For platform deployment, most Flink dependencies are marked as provided because the platform already supplies them.

Local debugging

Set ${flink.version} to 1.19.0 and choose ${vvr.version} based on your Ververica Runtime (VVR) engine version:

  • VVR 8.x: use version 1.17-vvr-8.0.11-6

  • VVR 11.x: use the latest version that matches your engine. For VVR 11.1, the latest recommended version is 1.20-vvr-11.1.4-jdk11. For other versions, see Maven Repository.

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>8</java.version>
    <flink.version>1.19.0</flink.version>
    <vvr.version>1.17-vvr-8.0.4-1</vvr.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <!-- Use com.alibaba.ververica for VVR 11.x; use org.apache.flink for VVR 8.x -->
        <groupId>com.alibaba.ververica</groupId>
        <!-- <groupId>org.apache.flink</groupId> -->
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-mysql</artifactId>
        <version>${vvr.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr.version}</version>
    </dependency>
</dependencies>

Deployment on Realtime Compute for Apache Flink

When deploying to Realtime Compute for Apache Flink, no connector version restrictions apply. Match ${flink.version} and ${vvr.version} to your job's database engine version. For version mappings, see DPI engine.

Mark all standard Flink dependencies as provided — the platform supplies them at runtime. Only ververica-connector-mysql is bundled without provided.

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>

What's next