このトピックでは、MySQL コネクタを使用する DataStream ジョブをデバッグして実行する方法について説明します。
DataStream ジョブのデバッグ
MySqlSource を使用する DataStream API プログラムを作成できます。次のコードに例を示します。
ジョブをローカルでデバッグするには、必要な JAR ファイルをダウンロードし、依存関係を構成する必要があります。詳細については、「コネクタを含むジョブのローカルでの実行とデバッグ」をご参照ください。Maven プロジェクトのサンプルについては、「MysqlCDCDemo.zip」をご参照ください。
package com.alibaba.realtimecompute;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.configuration.Configuration;
public class MysqlCDCDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString("pipeline.classpaths", "file://" + "The absolute path of the MySQL uber JAR file"); // 依存関係を構成します。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("hostname")
.port(3306)
.databaseList("test_db") // キャプチャするデータベースを設定します。
.tableList("test_db.test_table") // キャプチャするテーブルを設定します。
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.setParallelism(4)
.print().setParallelism(1);
env.execute("Print MySQL Snapshot + Binary Log");
}
}MySqlSource をビルドする際、コード内で次のパラメーターを指定します。
パラメーター | 説明 |
hostname | MySQL データベースの IP アドレスまたはホスト名。 |
port | MySQL データベースサービスのポート番号。 |
databaseList | MySQL データベースの名前。 説明 このパラメーターは正規表現をサポートしており、複数のデータベースからデータを読み取ることができます。 |
username | MySQL データベースサービスのユーザー名。 |
password | MySQL データベースサービスのパスワード。 |
deserializer | SourceRecord 型のレコードを指定された型に変換するデシリアライザー。有効な値は次のとおりです。
|
pom 依存関係
ローカルデバッグ
Realtime Compute for Apache Flink のコネクタには商用機能が含まれており、オープンソースの Apache Flink バージョンとは異なります。ジョブをローカルでデバッグするには、次のように pom ファイルを変更する必要があります。
${flink.version} を 1.19.0 に設定します。
コネクタの推奨バージョン ${vvr.version} は次のとおりです。
Ververica Runtime (VVR) 8.x エンジンを使用する場合、バージョン 1.17-vvr-8.0.11-6 が推奨されます。
VVR 11.x エンジンを使用する場合、お使いのエンジンバージョンに一致する最新のコネクタバージョンを使用します。たとえば、VVR 11.1 エンジンを使用する場合、最新の推奨バージョンは 1.20-vvr-11.1.4-jdk11 です。その他のバージョンについては、「Maven Repository」をご参照ください。
Kafka コネクタの依存関係を追加します。
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>8</java.version>
<flink.version>1.19.0</flink.version>
<vvr.version>1.17-vvr-8.0.4-1</vvr.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<!-- VVR 11.x バージョンではこのグループ ID を使用します -->
<groupId>com.alibaba.ververica</groupId>
<!-- VVR 8.x バージョンではこのグループ ID を使用します -->
<!-- <groupId>org.apache.flink</groupId> -->
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr.version}</version>
</dependency>
</dependencies>Realtime Compute for Apache Flink でのデプロイメントのデバッグ
Realtime Compute for Apache Flink でジョブをデバッグする場合、コネクタにバージョンの制限は適用されません。${flink.version} と ${vvr.version} の値が、ジョブのデータベースエンジンバージョンと一致していることを確認してください。バージョンのマッピングに関する詳細については、「DPI エンジン」をご参照ください。次のコードに pom ファイルのサンプルを示します。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>