すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:MySQL コネクタを使用する DataStream ジョブのデバッグ

最終更新日:Jan 13, 2026

このトピックでは、MySQL コネクタを使用する DataStream ジョブをデバッグして実行する方法について説明します。

DataStream ジョブのデバッグ

MySqlSource を使用する DataStream API プログラムを作成できます。次のコードに例を示します。

重要

ジョブをローカルでデバッグするには、必要な JAR ファイルをダウンロードし、依存関係を構成する必要があります。詳細については、「コネクタを含むジョブのローカルでの実行とデバッグ」をご参照ください。Maven プロジェクトのサンプルについては、「MysqlCDCDemo.zip」をご参照ください。

package com.alibaba.realtimecompute;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.configuration.Configuration;

public class MysqlCDCDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("pipeline.classpaths", "file://" + "The absolute path of the MySQL uber JAR file");  // 依存関係を構成します。
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("hostname")
                .port(3306)
                .databaseList("test_db") // キャプチャするデータベースを設定します。
                .tableList("test_db.test_table") // キャプチャするテーブルを設定します。
                .username("username")
                .password("password")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        env.enableCheckpointing(3000);
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(4)
                .print().setParallelism(1);
        env.execute("Print MySQL Snapshot + Binary Log");
    }
}

MySqlSource をビルドする際、コード内で次のパラメーターを指定します。

パラメーター

説明

hostname

MySQL データベースの IP アドレスまたはホスト名。

port

MySQL データベースサービスのポート番号。

databaseList

MySQL データベースの名前。

説明

このパラメーターは正規表現をサポートしており、複数のデータベースからデータを読み取ることができます。.* を使用して、すべてのデータベースを照合できます。

username

MySQL データベースサービスのユーザー名。

password

MySQL データベースサービスのパスワード。

deserializer

SourceRecord 型のレコードを指定された型に変換するデシリアライザー。有効な値は次のとおりです。

  • RowDataDebeziumDeserializeSchema: SourceRecord を Flink Table または SQL の内部データ構造である RowData に変換します。

  • JsonDebeziumDeserializationSchema: SourceRecord を JSON フォーマットの文字列に変換します。

pom 依存関係

ローカルデバッグ

Realtime Compute for Apache Flink のコネクタには商用機能が含まれており、オープンソースの Apache Flink バージョンとは異なります。ジョブをローカルでデバッグするには、次のように pom ファイルを変更する必要があります。

  1. ${flink.version} を 1.19.0 に設定します。

  2. コネクタの推奨バージョン ${vvr.version} は次のとおりです。

    1. Ververica Runtime (VVR) 8.x エンジンを使用する場合、バージョン 1.17-vvr-8.0.11-6 が推奨されます。

    2. VVR 11.x エンジンを使用する場合、お使いのエンジンバージョンに一致する最新のコネクタバージョンを使用します。たとえば、VVR 11.1 エンジンを使用する場合、最新の推奨バージョンは 1.20-vvr-11.1.4-jdk11 です。その他のバージョンについては、「Maven Repository」をご参照ください。

  3. Kafka コネクタの依存関係を追加します。

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>8</java.version>
    <flink.version>1.19.0</flink.version>
    <vvr.version>1.17-vvr-8.0.4-1</vvr.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <!-- VVR 11.x バージョンではこのグループ ID を使用します -->
        <groupId>com.alibaba.ververica</groupId>

       <!-- VVR 8.x バージョンではこのグループ ID を使用します -->
        <!-- <groupId>org.apache.flink</groupId> -->
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-mysql</artifactId>
        <version>${vvr.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr.version}</version>
    </dependency>
</dependencies>

Realtime Compute for Apache Flink でのデプロイメントのデバッグ

Realtime Compute for Apache Flink でジョブをデバッグする場合、コネクタにバージョンの制限は適用されません。${flink.version} と ${vvr.version} の値が、ジョブのデータベースエンジンバージョンと一致していることを確認してください。バージョンのマッピングに関する詳細については、「DPI エンジン」をご参照ください。次のコードに pom ファイルのサンプルを示します。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>