Topik ini menjelaskan cara mendebug dan menjalankan pekerjaan DataStream yang menggunakan konektor MySQL.
Debug Pekerjaan DataStream
Anda dapat membuat program API DataStream yang menggunakan MySqlSource. Berikut contoh kodenya:
Untuk mendebug pekerjaan secara lokal, Anda harus mengunduh file JAR yang diperlukan dan mengonfigurasi dependensinya. Untuk informasi selengkapnya, lihat Jalankan dan debug pekerjaan yang berisi konektor secara lokal. Contoh proyek Maven tersedia di MysqlCDCDemo.zip.
package com.alibaba.realtimecompute;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.configuration.Configuration;
public class MysqlCDCDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString("pipeline.classpaths", "file://" + "The absolute path of the MySQL uber JAR file"); // Konfigurasikan dependensi.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("hostname")
.port(3306)
.databaseList("test_db") // Tetapkan database yang akan di-capture.
.tableList("test_db.test_table") // Tetapkan tabel yang akan di-capture.
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.setParallelism(4)
.print().setParallelism(1);
env.execute("Print MySQL Snapshot + Binary Log");
}
}Saat membangun MySqlSource, tentukan parameter berikut dalam kode:
Parameter | Deskripsi |
hostname | Alamat IP atau hostname dari database MySQL. |
port | Nomor port layanan database MySQL. |
databaseList | Nama database MySQL. Catatan Parameter ini mendukung ekspresi reguler untuk membaca data dari multiple database. Anda dapat menggunakan |
username | Username untuk layanan database MySQL. |
password | Password untuk layanan database MySQL. |
deserializer | Deserializer yang mengonversi catatan bertipe SourceRecord ke tipe tertentu. Nilai yang valid:
|
dependensi pom
Pen-debug-an lokal
Konektor untuk Realtime Compute for Apache Flink mengandung fitur komersial dan berbeda dari versi open-source Apache Flink. Untuk mendebug pekerjaan secara lokal, modifikasi file pom.xml sebagai berikut:
Tetapkan ${flink.version} ke 1.19.0.
Gunakan versi konektor yang direkomendasikan, ${vvr.version}, sebagai berikut:
Jika Anda menggunakan engine Ververica Runtime (VVR) 8.x, disarankan menggunakan versi 1.17-vvr-8.0.11-6.
Jika Anda menggunakan engine VVR 11.x, gunakan versi konektor terbaru yang sesuai dengan versi engine tersebut. Misalnya, untuk engine VVR 11.1, versi terbaru yang direkomendasikan adalah 1.20-vvr-11.1.4-jdk11. Informasi selengkapnya mengenai versi lainnya tersedia di Maven Repository.
Tambahkan dependensi konektor Kafka.
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>8</java.version>
<flink.version>1.19.0</flink.version>
<vvr.version>1.17-vvr-8.0.4-1</vvr.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<!-- Use this Group ID for VVR 11.x versions -->
<groupId>com.alibaba.ververica</groupId>
<!-- Use this Group ID for VVR 8.x versions -->
<!-- <groupId>org.apache.flink</groupId> -->
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr.version}</version>
</dependency>
</dependencies>Debug Penerapan di Realtime Compute for Apache Flink
Saat mendebug pekerjaan di Realtime Compute for Apache Flink, tidak ada batasan versi pada konektor. Pastikan nilai ${flink.version} dan ${vvr.version} sesuai dengan versi engine database yang digunakan oleh pekerjaan tersebut. Untuk informasi lebih lanjut mengenai pemetaan versi, lihat Mesin DPI. Berikut contoh file pom.xml:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>