Setelah mengonfigurasi tugas pelacakan perubahan, Anda dapat menggunakan file flink-dts-connector untuk mengonsumsi data yang dilacak. Topik ini menjelaskan cara menggunakan file flink-dts-connector untuk mengonsumsi data tersebut.
Catatan penggunaan
Data Transmission Service (DTS) mendukung jenis program Flink berikut: DataStream API dan Table API & SQL.
Jika menggunakan program Table API & SQL, Anda hanya dapat mengonsumsi data dari satu tabel setiap kali mengonfigurasi tugas pelacakan perubahan. Untuk mengonsumsi data dari beberapa tabel, konfigurasikan tugas terpisah untuk setiap tabel.
Prosedur
Contoh ini menggunakan IntelliJ IDEA Community Edition 2020.1 di Windows.
- Buat tugas pelacakan perubahan. Untuk informasi lebih lanjut, lihat topik terkait di Ikhtisar Skenario Pelacakan Perubahan.
- Buat satu atau beberapa grup konsumen. Untuk informasi lebih lanjut, lihat Buat Grup Konsumen.
Unduh file flink-dts-connector dan ekstrak file tersebut.
Buka IntelliJ IDEA. Di jendela yang muncul, klik Open or Import.

Di kotak dialog yang muncul, buka direktori tempat file flink-dts-connector diekstrak, lalu perluas folder untuk menemukan file pom.xml.

- Di kotak dialog yang muncul, pilih Open as Project.
Tambahkan dependensi berikut ke file pom.xml:
<dependency> <groupId>com.alibaba.flink</groupId> <artifactId>flink-dts-connector</artifactId> <version>1.1.1-SNAPSHOT</version> <classifier>jar-with-dependencies</classifier> </dependency>Di IntelliJ IDEA, perluas folder untuk menemukan file Java. Lalu, klik dua kali file Java sesuai dengan jenis konektor Flink yang digunakan.
Jika menggunakan konektor DataStream API, klik dua kali file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java dan lakukan langkah-langkah berikut:
Di bilah menu atas IntelliJ IDEA, klik ikon Run.

Di kotak dialog yang muncul, pilih .

Di bidang Program arguments, masukkan parameter dan nilai yang sesuai, lalu klik Run untuk menjalankan flink-dts-connector.
CatatanUntuk informasi lebih lanjut tentang parameter dan metode untuk mendapatkan nilai parameter, lihat bagian Parameter dari topik ini.
--broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043Gambar berikut menunjukkan bahwa program Flink dapat melacak perubahan data dari database sumber.
CatatanUntuk memeriksa catatan tertentu dari perubahan data, Anda dapat membuka halaman Task Manager dari program Flink.
Jika menggunakan konektor Table API & SQL, klik dua kali file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java dan lakukan langkah-langkah berikut:
CatatanFile
DtsTableISelectTCaseTest.javatunggal hanya dapat digunakan untuk mengonfigurasi satu tugas pelacakan perubahan dan mengonsumsi data dari satu tabel saja. Jika ingin mengonsumsi data dari beberapa tabel, konfigurasikan tugas terpisah untuk setiap tabel.Masukkan dua garis miring (
//) dan tambahkan komentar, seperti ditunjukkan pada gambar berikut.
Tentukan informasi tabel dari mana Anda ingin melacak perubahan data. Pernyataan SQL didukung.
Atur parameter yang diperlukan untuk instance pelacakan perubahan. Untuk informasi lebih lanjut, lihat bagian Parameter dari topik ini.

Di bilah menu atas IntelliJ IDEA, klik Run'DtsTableISelectTCaseTest' untuk menjalankan flink-dts-connector.
Gambar berikut menunjukkan bahwa program Flink dapat melacak perubahan data dari database sumber.
CatatanUntuk memeriksa catatan tertentu dari perubahan data, Anda dapat membuka halaman Task Manager dari program Flink.
Parameter
Parameter dalam file DstExample | Parameter dalam file DtsTableISelectTCaseTest | Deskripsi | Metode untuk mendapatkan nilai parameter |
|
| Titik akhir dan nomor port dari instance pelacakan perubahan. Catatan Jika Anda melacak perubahan data melalui jaringan internal, latensi jaringan minimal. Ini berlaku jika instance Elastic Compute Service (ECS) tempat Anda menyebarkan program Flink berada di jaringan klasik atau di virtual private cloud (VPC) yang sama dengan instance pelacakan perubahan. | Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instansinya. Di halaman Basic Information, Anda dapat melihat Topic dan Network dari instansinya. |
|
| Nama topik dari instance pelacakan perubahan. | |
|
| ID grup konsumen. | Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instansinya. Di panel navigasi di sebelah kiri, klik Consume Data. Anda dapat melihat Consumer Group ID/Name dari instansinya dan Account dari grup konsumen. Catatan Kata sandi akun grup konsumen ditentukan saat Anda membuat grup konsumen. |
|
| Nama pengguna grup konsumen. Peringatan Jika Anda tidak menggunakan file flink-dts-connector yang dijelaskan dalam topik ini, Anda harus menentukan parameter ini dalam format berikut: | |
|
| Kata sandi grup konsumen. | |
|
| Offset konsumen. Ini adalah timestamp yang dihasilkan ketika flink-dts-connector mengonsumsi data pertama kali. Nilainya adalah timestamp UNIX. Contoh: 1624440043. Catatan Offset konsumen dapat digunakan dalam skenario berikut:
| Offset konsumen dari data yang dikonsumsi harus berada dalam rentang data dari instance pelacakan perubahan. Offset konsumen harus dikonversi menjadi timestamp UNIX. Catatan
|
N/A |
| Objek untuk pelacakan perubahan. Anda hanya dapat menentukan satu tabel dalam format | Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instansinya. Di bagian atas halaman Basic Information atau Task Management, klik View Objects untuk melihat database dan tabel tempat objek untuk pelacakan perubahan berada. |
FAQ
Pesan kesalahan | Penyebab yang mungkin | Solusi |
| Modul DStore yang digunakan oleh DTS untuk membaca data inkremental beralih. Akibatnya, offset konsumen dari program Flink hilang. | Anda tidak perlu me-restart program Flink. Anda hanya perlu memeriksa offset konsumen dari program Flink dan menyetel parameter |