Setelah mengonfigurasi tugas pelacakan perubahan, Anda dapat menggunakan file flink-dts-connector untuk mengonsumsi data yang dilacak. Topik ini menjelaskan cara menggunakan file flink-dts-connector untuk mengonsumsi data tersebut.
Batasan
Data Transmission Service (DTS) mendukung jenis program Flink berikut: DataStream API dan Table API & SQL.
Jika Anda menggunakan program Table API & SQL, Anda hanya dapat mengonsumsi data dari satu tabel setiap kali mengonfigurasi tugas pelacakan perubahan. Untuk mengonsumsi data dari beberapa tabel, Anda harus mengonfigurasi tugas terpisah untuk setiap tabel.
Prosedur
Contoh ini menggunakan IntelliJ IDEA Community Edition 2020.1 untuk Windows.
Buat tugas pelacakan perubahan. Untuk informasi lebih lanjut, lihat Lacak Perubahan Data dari Instance ApsaraDB RDS for MySQL, Lacak Perubahan Data dari Kluster PolarDB for MySQL, atau Lacak Perubahan Data dari Database Oracle yang Dikelola Sendiri.
Buat satu atau lebih grup konsumen. Untuk informasi lebih lanjut, lihat Buat Grup Konsumen.
Unduh file flink-dts-connector dan ekstrak file tersebut.
Buka IntelliJ IDEA. Di jendela yang muncul, klik Open or Import.

Di kotak dialog yang muncul, navigasikan ke direktori tempat file flink-dts-connector diekstraksi, lalu perluas folder untuk menemukan file pom.xml.

Di kotak dialog yang muncul, pilih Open as Project.
Tambahkan dependensi berikut ke file pom.xml:
<dependency> <groupId>com.alibaba.flink</groupId> <artifactId>flink-dts-connector</artifactId> <version>1.1.1-SNAPSHOT</version> <classifier>jar-with-dependencies</classifier> </dependency>Di IntelliJ IDEA, perluas folder untuk menemukan file Java. Klik dua kali file Java sesuai dengan jenis konektor Flink yang digunakan.
Jika menggunakan konektor DataStream API, klik dua kali file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java dan lakukan langkah-langkah berikut:
Di bilah menu atas IntelliJ IDEA, klik ikon Run.

Di kotak dialog yang muncul, pilih .

Di bidang Program arguments, masukkan parameter dan nilai yang sesuai, lalu klik Run untuk menjalankan flink-dts-connector.
CatatanUntuk informasi lebih lanjut tentang parameter dan metode untuk mendapatkan nilai parameter, lihat bagian Parameter dari topik ini.
--broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043Gambar berikut menunjukkan bahwa program Flink dapat melacak perubahan data dari database sumber.
CatatanUntuk memeriksa catatan tertentu dari perubahan data, Anda dapat membuka halaman Task Manager program Flink.
Jika menggunakan konektor Table API & SQL, klik dua kali file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java dan lakukan langkah-langkah berikut:
CatatanFile
DtsTableISelectTCaseTest.javatunggal hanya dapat digunakan untuk mengonfigurasi satu tugas pelacakan perubahan dan mengonsumsi data dari satu tabel. Jika ingin mengonsumsi data dari beberapa tabel, Anda harus mengonfigurasi tugas terpisah untuk setiap tabel.Masukkan dua garis miring (
//) dan tambahkan komentar, seperti yang ditunjukkan pada gambar berikut.
Tentukan informasi tentang tabel dari mana Anda ingin melacak perubahan data. Pernyataan SQL didukung.
Konfigurasikan parameter yang diperlukan untuk instance pelacakan perubahan. Untuk informasi lebih lanjut, lihat bagian Parameter dari topik ini.

Di bilah menu atas IntelliJ IDEA, klik Run'DtsTableISelectTCaseTest' untuk menjalankan flink-dts-connector.
Gambar berikut menunjukkan bahwa program Flink dapat melacak perubahan data dari database sumber.
CatatanUntuk memeriksa catatan tertentu dari perubahan data, Anda dapat membuka halaman Task Manager program Flink.
Parameter
Parameter dalam file DstExample | Parameter dalam file DtsTableISelectTCaseTest | Deskripsi | Metode untuk mendapatkan nilai parameter |
|
| Alamat jaringan dan nomor port dari instance pelacakan perubahan. Catatan Jika Anda melacak perubahan data melalui jaringan internal, latensi jaringan minimal. Ini berlaku jika instance Elastic Compute Service (ECS) tempat Anda menyebarkan program Flink berada di jaringan klasik atau di virtual private cloud (VPC) yang sama dengan instance pelacakan perubahan. | Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di halaman View Task Settings, Anda dapat melihat topik yang dilacak, alamat jaringan, dan nomor port. |
|
| Nama topik dari instance pelacakan perubahan. | |
|
| ID grup konsumen. | Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di panel navigasi di sebelah kiri, klik Consume Data. Anda dapat melihat ID dan nama pengguna grup konsumen. Catatan Kata sandi nama pengguna grup konsumen ditentukan saat Anda membuat grup konsumen.
|
|
| Nama pengguna grup konsumen. Peringatan Jika Anda tidak menggunakan file flink-dts-connector yang dijelaskan dalam topik ini, Anda harus menentukan parameter ini dalam format berikut: | |
|
| Kata sandi nama pengguna grup konsumen. | |
|
| Offset konsumen. Ini adalah timestamp yang dihasilkan ketika flink-dts-connector mengonsumsi data pertama kali. Nilainya adalah timestamp UNIX. Contoh: 1624440043. Catatan Offset konsumen dapat digunakan dalam skenario berikut:
| Offset konsumen harus berada dalam rentang data instance pelacakan perubahan, seperti yang ditunjukkan pada gambar berikut. Offset konsumen harus dikonversi ke timestamp UNIX. Catatan Anda dapat menggunakan mesin pencari untuk mendapatkan konverter timestamp UNIX. |
Tidak ada |
| Objek untuk pelacakan perubahan. Anda hanya dapat menentukan satu tabel. Anda harus mematuhi persyaratan berikut saat menentukan nama tabel:
| Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di halaman View Task Settings, klik View Objects for Change tracking di pojok kanan atas. Di kotak dialog yang muncul, Anda dapat melihat database dan tabel tempat objek untuk pelacakan perubahan berada. |
FAQ
Pesan kesalahan | Penyebab yang mungkin | Solusi |
| Modul DStore yang digunakan oleh DTS untuk membaca data inkremental beralih. Akibatnya, offset konsumen program Flink hilang. | Anda tidak perlu memulai ulang program Flink. Anda hanya perlu memeriksa offset konsumen program Flink dan menyetel parameter |


