Topik ini menjelaskan cara men-debug program DataStream yang menggunakan konektor MySQL.
Solusi untuk men-debug program DataStream
Buat program API DataStream dan gunakan kelas MySqlSource. Contoh kode:
Untuk men-debug program Flink secara lokal, Anda harus mengunduh file JAR terkait dan mengonfigurasi dependensi. Untuk informasi lebih lanjut, lihat Men-debug program Flink secara lokal yang mencakup konektor. Klik MysqlCDCDemo.zip untuk mengunduh proyek Maven contoh kami.
package com.alibaba.realtimecompute;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.configuration.Configuration;
public class MysqlCDCDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString("pipeline.classpaths", "file://" + "Jalur absolut file uber JAR dari konektor MySQL"); // Konfigurasikan dependensi.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("hostname")
.port(3306)
.databaseList("test_db") // Tentukan database yang akan ditangkap.
.tableList("test_db.test_table") // Tentukan tabel yang akan ditangkap.
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.setParallelism(4)
.print().setParallelism(1);
env.execute("Print MySQL Snapshot + Binlog");
}
}Ganti nilai placeholder dalam kode di atas untuk mengonfigurasi kelas MySqlSource dengan nilai sebenarnya Anda:
Parameter | Deskripsi |
hostname | Alamat IP atau nama host yang digunakan untuk mengakses database MySQL Anda. |
port | Port yang digunakan untuk mengakses database MySQL Anda. |
databaseList | Nama database MySQL Anda. Catatan Jika Anda ingin membaca data dari beberapa database, Anda dapat mengatur parameter ini menjadi ekspresi reguler. Anda dapat menggunakan kombinasi titik dan tanda bintang |
username | Nama pengguna yang digunakan untuk mengakses database MySQL Anda. |
password | Kata sandi yang digunakan untuk mengakses database MySQL Anda. |
deserializer | Deserializer, yang mendeserialisasi SourceRecords ke tipe tertentu. Nilai valid:
|
Dependensi proyek dalam POM
Pen-debug-an lokal
Konektor Realtime Compute for Apache Flink tersedia secara komersial dan berbeda dalam banyak hal dari konektor Apache Flink. Untuk men-debug secara lokal program yang menggunakan konektor MySQL dari Realtime Compute for Apache Flink, modifikasi file POM sebagai berikut:
Tetapkan ${flink.version} ke 1.19.0.
Atur ${vvr.version} konektor MySQL ke nilai yang direkomendasikan, seperti 1.17-vvr-8.0.11-6 atau 1.20-vvr-11.1.3-jdk11. Untuk versi lainnya, lihat Maven Repository.
Tambahkan dependensi konektor Kafka.
Contoh kode:
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>8</java.version>
<flink.version>1.19.0</flink.version>
<vvr.version>1.17-vvr-8.0.4-1</vvr.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<!-- VVR 11.x versions use this Group ID -->
<groupId>com.alibaba.ververica</groupId>
<!-- VVR 8.x versions use this Group ID -->
<!-- <groupId>org.apache.flink</groupId> -->
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr.version}</version>
</dependency>
</dependencies>Pen-debug-an cloud
Sebelum program Anda digunakan dalam produksi, Anda dapat membuat penyebaran dari program JAR di konsol Realtime Compute for Apache Flink dan menjalankannya untuk inspeksi. Dalam hal ini, tidak ada batasan pada versi konektor MySQL yang dapat Anda gunakan. Namun, pastikan baik ${flink.version} maupun ${vvr.version} selaras dengan versi mesin penyebaran. Untuk informasi tentang pemetaan versi, lihat Pembaruan mesin. Berikut adalah contoh POM untuk pen-debug-an cloud:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>