全部产品
Search
文档中心

Realtime Compute for Apache Flink:Debug program DataStream yang mencakup konektor MySQL

更新时间:Nov 10, 2025

Topik ini menjelaskan cara men-debug program DataStream yang menggunakan konektor MySQL.

Solusi untuk men-debug program DataStream

Buat program API DataStream dan gunakan kelas MySqlSource. Contoh kode:

Penting

Untuk men-debug program Flink secara lokal, Anda harus mengunduh file JAR terkait dan mengonfigurasi dependensi. Untuk informasi lebih lanjut, lihat Men-debug program Flink secara lokal yang mencakup konektor. Klik MysqlCDCDemo.zip untuk mengunduh proyek Maven contoh kami.

package com.alibaba.realtimecompute;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.configuration.Configuration;

public class MysqlCDCDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("pipeline.classpaths", "file://" + "Jalur absolut file uber JAR dari konektor MySQL");  // Konfigurasikan dependensi.
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("hostname")
                .port(3306)
                .databaseList("test_db") // Tentukan database yang akan ditangkap.
                .tableList("test_db.test_table") // Tentukan tabel yang akan ditangkap.
                .username("username")
                .password("password")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        env.enableCheckpointing(3000);
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(4)
                .print().setParallelism(1);
        env.execute("Print MySQL Snapshot + Binlog");
    }
}

Ganti nilai placeholder dalam kode di atas untuk mengonfigurasi kelas MySqlSource dengan nilai sebenarnya Anda:

Parameter

Deskripsi

hostname

Alamat IP atau nama host yang digunakan untuk mengakses database MySQL Anda.

port

Port yang digunakan untuk mengakses database MySQL Anda.

databaseList

Nama database MySQL Anda.

Catatan

Jika Anda ingin membaca data dari beberapa database, Anda dapat mengatur parameter ini menjadi ekspresi reguler. Anda dapat menggunakan kombinasi titik dan tanda bintang (.*) untuk mencocokkan semua database.

username

Nama pengguna yang digunakan untuk mengakses database MySQL Anda.

password

Kata sandi yang digunakan untuk mengakses database MySQL Anda.

deserializer

Deserializer, yang mendeserialisasi SourceRecords ke tipe tertentu. Nilai valid:

  • RowDataDebeziumDeserializeSchema: mendeserialisasi SourceRecords ke struktur data internal RowData Flink Table/SQL.

  • JsonDebeziumDeserializationSchema: mendeserialisasi SourceRecords ke string JSON.

Dependensi proyek dalam POM

Pen-debug-an lokal

Konektor Realtime Compute for Apache Flink tersedia secara komersial dan berbeda dalam banyak hal dari konektor Apache Flink. Untuk men-debug secara lokal program yang menggunakan konektor MySQL dari Realtime Compute for Apache Flink, modifikasi file POM sebagai berikut:

  1. Tetapkan ${flink.version} ke 1.19.0.

  2. Atur ${vvr.version} konektor MySQL ke nilai yang direkomendasikan, seperti 1.17-vvr-8.0.11-6 atau 1.20-vvr-11.1.3-jdk11. Untuk versi lainnya, lihat Maven Repository.

  3. Tambahkan dependensi konektor Kafka.

Contoh kode:

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>8</java.version>
    <flink.version>1.19.0</flink.version>
    <vvr.version>1.17-vvr-8.0.4-1</vvr.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <!-- VVR 11.x versions use this Group ID -->
        <groupId>com.alibaba.ververica</groupId>

       <!-- VVR 8.x versions use this Group ID -->
        <!-- <groupId>org.apache.flink</groupId> -->
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-mysql</artifactId>
        <version>${vvr.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr.version}</version>
    </dependency>
</dependencies>

Pen-debug-an cloud

Sebelum program Anda digunakan dalam produksi, Anda dapat membuat penyebaran dari program JAR di konsol Realtime Compute for Apache Flink dan menjalankannya untuk inspeksi. Dalam hal ini, tidak ada batasan pada versi konektor MySQL yang dapat Anda gunakan. Namun, pastikan baik ${flink.version} maupun ${vvr.version} selaras dengan versi mesin penyebaran. Untuk informasi tentang pemetaan versi, lihat Pembaruan mesin. Berikut adalah contoh POM untuk pen-debug-an cloud:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>