全部產品
Search
文件中心

Realtime Compute for Apache Flink:MySQL連接器DataStream調試

更新時間:Jan 13, 2026

本文為您介紹如何調試和運行使用MySQL連接器的 DataStream 作業。

DataStream調試方案

建立DataStream API程式並使用MySqlSource。程式碼範例如下:

重要

本地調試需要下載相關jar並配置依賴,詳情請參見本地運行和調試包含連接器的作業。本文已提供Maven樣本專案,詳情請參考MysqlCDCDemo.zip

package com.alibaba.realtimecompute;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.configuration.Configuration;

public class MysqlCDCDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("pipeline.classpaths", "file://" + "mysql uber jar絕對路徑");  // 配置依賴
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("hostname")
                .port(3306)
                .databaseList("test_db") // 設定捕獲的資料庫
                .tableList("test_db.test_table") // 設定捕獲的表
                .username("username")
                .password("password")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        env.enableCheckpointing(3000);
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(4)
                .print().setParallelism(1);
        env.execute("Print MySQL Snapshot + Binlog");
    }
}

在構建MySqlSource時,代碼中必須指定以下參數:

參數

說明

hostname

MySQL資料庫的IP地址或者主機名稱。

port

MySQL資料庫服務的連接埠號碼。

databaseList

MySQL資料庫名稱。

說明

資料庫名稱支援Regex以讀取多個資料庫的資料,您可以使用.*匹配所有資料庫。

username

MySQL資料庫服務的使用者名稱。

password

MySQL資料庫服務的密碼。

deserializer

還原序列化器,將SourceRecord類型記錄還原序列化到指定類型。參數取值如下:

  • RowDataDebeziumDeserializeSchema:將SourceRecord轉成Flink Table或SQL內部資料結構RowData。

  • JsonDebeziumDeserializationSchema:將SourceRecord轉成JSON格式的String。

pom依賴

本地調試

阿里雲Realtime ComputeFlink版連接器包含商業化的額外內容,與Apache Flink版本有許多差異,本地調試時需對pom做如下修改:

  1. ${flink.version}必須為1.19.0。

  2. 連接器${vvr.version}推薦版本如下:

    1. 使用vvr 8.x版本引擎時,推薦1.17-vvr-8.0.11-6版本。

    2. 使用vvr 11.x版本引擎時,推薦使用最新的對應引擎版本的連接器,例如如果使用vvr 11.1版本引擎,推薦使用最新版本1.20-vvr-11.1.4-jdk11。其他版本可見Maven Repository

  3. 需要添加kafka連接器依賴。

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>8</java.version>
    <flink.version>1.19.0</flink.version>
    <vvr.version>1.17-vvr-8.0.4-1</vvr.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <!-- VVR 11.x 版本使用此 Group ID -->
        <groupId>com.alibaba.ververica</groupId>

       <!-- VVR 8.x 版本使用此 Group ID -->
        <!-- <groupId>org.apache.flink</groupId> -->
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-mysql</artifactId>
        <version>${vvr.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr.version}</version>
    </dependency>
</dependencies>

Realtime ComputeFlink版部署調試

在Realtime ComputeFlink版啟動作業調試時,連接器無版本限制,${flink.version}和${vvr.version}僅需和作業引擎版本相對應即可,對應關係請參見引擎。參考pom如下:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>