Setelah mengonfigurasi instans pelacakan perubahan untuk Data Transmission Service (DTS), Anda dapat menggunakan flink-dts-connector untuk membuat program Flink yang mengonsumsi data tersebut.
Catatan penggunaan
-
Konektor ini mendukung program Flink yang menggunakan DataStream API, Table API, maupun SQL.
-
Jika program Flink Anda menggunakan Table API dan SQL, Anda hanya dapat mengonsumsi data dari satu tabel dalam satu waktu. Untuk mengonsumsi data dari beberapa tabel, Anda harus mengonfigurasi dan menjalankan tugas terpisah untuk masing-masing tabel.
Prosedur
Topik ini menggunakan IntelliJ IDEA Community Edition 2020.1 untuk Windows sebagai contoh untuk menunjukkan cara menggunakan flink-dts-connector guna mengonsumsi data yang dilacak dari instans pelacakan perubahan.
-
Buat instans pelacakan perubahan. Untuk informasi selengkapnya, lihat Buat instans pelacakan perubahan untuk instans ApsaraDB RDS for MySQL, Buat instans pelacakan perubahan untuk kluster PolarDB for MySQL, atau Buat instans pelacakan perubahan untuk instans ApsaraDB for Oracle.
-
Buat satu atau beberapa kelompok konsumen. Untuk informasi selengkapnya, lihat Buat kelompok konsumen.
-
Unduh file flink-dts-connector dan ekstrak file tersebut.
-
Jalankan IntelliJ IDEA dan klik Open or Import.
-
Pada kotak dialog yang muncul, arahkan ke direktori tempat Anda mengekstrak file flink-dts-connector. Buka folder-folder tersebut hingga menemukan file Project Object Model (POM): pom.xml.
-
Pada kotak dialog yang muncul, pilih Open as Project.
-
Tambahkan dependensi berikut ke file pom.xml:
<dependency> <groupId>com.alibaba.flink</groupId> <artifactId>flink-dts-connector</artifactId> <version>1.1.1-SNAPSHOT</version> <classifier>jar-with-dependencies</classifier> </dependency> -
Di IntelliJ IDEA, buka folder proyek dan pilih file Java yang sesuai berdasarkan jenis Flink API yang digunakan oleh program Anda.
-
Jika program Flink Anda menggunakan DataStream API, klik ganda file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java dan lakukan langkah-langkah berikut:
-
Dari bilah menu atas, pilih Run > Run....
-
Pada jendela pop-up, klik .
-
Pada bidang Program arguments, masukkan parameter beserta nilainya seperti pada contoh berikut, lalu klik Run untuk menjalankan flink-dts-connector.
CatatanUntuk detail tentang parameter dan cara menemukan nilainya, lihat Parameter.
--broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043 -
Output menunjukkan bahwa program berhasil melacak perubahan data dari database sumber. Setelah startup, catatan perubahan data dalam DataStream tampak seperti contoh berikut.
LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006303@289540@2688@1625045211000]} LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006307@290047@2688@1625045212000]} LazyParseRecord {operationType [UPDATE], checkpoint [0@12006305@290016@2688@1625045212000]} LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006308@290047@2688@1625045214000]} LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006309@290047@2688@1625045215000]} LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006310@290047@2688@1625045216000]} LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006311@290047@2688@1625045217000]}CatatanUntuk melihat detail catatan perubahan data, login ke UI Task Manager program Flink Anda.
-
-
Jika program Flink Anda menggunakan Table API dan SQL, klik ganda file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java dan lakukan langkah-langkah berikut:
CatatanSatu file
DtsTableISelectTCaseTest.javahanya mendukung konfigurasi dan konsumsi data yang dilacak dari satu tabel. Untuk mengonsumsi data dari beberapa tabel, Anda harus mengonfigurasi dan menjalankan tugas terpisah untuk masing-masing tabel.Beri komentar pada baris
properties.loaddengan menambahkan awalan//./*Memuat nilai parameter yang Anda atur di halaman platform ke objek Properties.*/ //properties.load(new StringReader(new String(Files.readAllBytes(Paths.get(configFilePath)), StandardCharsets.UTF_8)));Tentukan satu tabel dari mana Anda ingin mengonsumsi data.
Atur parameter untuk instance pelacakan perubahan. Untuk detail mengenai parameter dan cara menemukan nilainya, lihat Parameter.
public static void main(String[] args) throws Exception { setup(args); final String createTable = "create table `dts` (\n" + " `ts` TIMESTAMP(3) METADATA FROM 'timestamp',\n" + " `id` bigint,\n" + " `name` varchar,\n" + " `age` bigint,\n" + " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" + ") with (\n" + "'connector' = 'dts'," + "'dts.server' = 'dts-cn-xxx:18001'," + "'topic' = 'cn_hangzhou_rm_xxx_dtstest_version2'," + "'dts.sid' = 'dtsxxx', " + "'dts.user' = 'dtstest', " + "'dts.password' = 'xxx'," + "'dts.checkpoint' = '1624440043', " + "'dts-cdc.table.name' = 'dtstestdata.order'," + "'format' = 'dts-cdc')"; }Di bagian atas antarmuka IntelliJ IDEA, klik Run 'DtsTableISelectTCaseTest' untuk menjalankan flink-dts-connector.
Output menunjukkan bahwa program berhasil melacak perubahan data dari database sumber. Setelah startup, terminal menampilkan catatan changelog, dengan event pembaruan yang muncul berpasangan: nilai sebelum pembaruan (-U) dan nilai setelah pembaruan (+U).
######> (false,-U(2021-06-23T20:32:17.391,null,null,null)) ######> (true,+U(2021-06-23T20:32:17.391,null,null,null)) ######> (false,-U(2021-06-23T20:32:45.604,null,null,null)) ######> (true,+U(2021-06-23T20:32:45.604,null,null,null)) ######> (false,-U(2021-06-30T17:26:52.201,null,null,null)) ######> (true,+U(2021-06-30T17:26:52.201,null,null,null)) ######> (false,-U(2021-06-30T19:19:26.975,null,null,null)) ######> (true,+U(2021-06-30T19:19:26.975,null,null,null))CatatanUntuk melihat detail catatan perubahan data, masuk ke UI Task Manager program Flink Anda.
-
Parameter
|
Parameter DataStream API |
Parameter Table API |
Deskripsi |
Cara memperoleh |
|
|
|
Titik akhir dan port instans pelacakan perubahan. Catatan
|
Di Konsol Data Transmission Service (DTS), klik ID instans pelacakan perubahan target. Di halaman View Task Settings, Anda dapat menemukan Topic, titik akhir, dan port. |
|
|
|
Topik yang dilacak dari instans pelacakan perubahan. |
|
|
|
|
ID kelompok konsumen. |
Di Konsol DTS, klik ID instans pelacakan perubahan target, lalu klik Consume Data. Anda dapat menemukan ID dan Account kelompok konsumen. Catatan
Password untuk username kelompok konsumen adalah password yang Anda tentukan saat membuat kelompok konsumen. |
|
|
|
Username untuk kelompok konsumen. Peringatan
flink-dts-connector yang disediakan dalam topik ini secara otomatis menangani format username yang diperlukan. Jika Anda menggunakan client lain, Anda harus memformat username secara manual menjadi |
|
|
|
|
Password untuk username tersebut. |
|
|
|
|
Offset konsumen. Ini adalah timestamp tempat flink-dts-connector mulai mengonsumsi data. Nilainya harus berupa Unix timestamp, misalnya, 1624440043. Catatan
Offset konsumen berguna dalam skenario berikut:
|
Offset konsumen harus berada dalam rentang data instans pelacakan perubahan. Di halaman View Task Settings instans pelacakan perubahan DTS, Anda dapat menemukan waktu mulai dan akhir di bidang Data Range. Anda harus mengatur offset konsumen ke waktu dalam rentang tersebut dan mengonversinya ke Unix timestamp. Catatan
Gunakan mesin pencari daring untuk menemukan konverter Unix timestamp. |
|
N/A |
|
Parameter ini menentukan tabel untuk pelacakan perubahan. Hanya satu tabel yang didukung per tugas. Gunakan format berikut:
|
Di Konsol DTS, klik ID instans pelacakan perubahan target. Di halaman View Task Settings, klik View Objects di pojok kanan atas untuk menemukan nama database dan tabel objek tersebut. |
FAQ
|
Pesan error |
Kemungkinan penyebab |
Solusi |
|
Modul DStore, yang digunakan DTS untuk membaca data inkremental, telah dialihkan. Hal ini menyebabkan offset konsumen program Flink hilang. |
Jangan restart program. Sebagai gantinya, temukan offset konsumen terakhir yang diketahui dan berikan sebagai parameter |