Setelah mengonfigurasi instans subscription, Anda dapat menggunakan flink-dts-connector untuk mengonsumsi data dari instans tersebut melalui client Flink. Topik ini menjelaskan cara menggunakan flink-dts-connector.
Catatan penggunaan
-
Connector ini mendukung client Flink yang menggunakan DataStream API, Table API, atau SQL.
-
Jika client Flink Anda menggunakan Table API atau SQL, Anda hanya dapat mengonsumsi data dari satu tabel per konfigurasi. Untuk mengonsumsi data dari beberapa tabel, Anda harus mengonfigurasi dan menjalankan beberapa task independen.
Prosedur
Topik ini menggunakan IntelliJ IDEA Community Edition 2020.1 untuk Windows sebagai contoh cara menggunakan flink-dts-connector untuk mengonsumsi data dari instans subscription.
- Buat tugas pelacakan perubahan. Untuk informasi selengkapnya, lihat topik terkait di Ikhtisar skenario pelacakan perubahan.
- Buat satu atau beberapa kelompok konsumen. Untuk informasi selengkapnya, lihat Tambahkan kelompok konsumen.
-
Unduh dan ekstrak flink-dts-connector.
-
Buka IntelliJ IDEA dan klik Open or Import.

-
Pada kotak dialog yang muncul, arahkan ke direktori flink-dts-connector yang telah diekstrak, buka folder-folder tersebut, lalu cari file Project Object Model (pom.xml).

- Pada kotak dialog yang muncul, pilih Open as Project.
-
Tambahkan dependensi berikut ke file pom.xml:
<dependency> <groupId>com.alibaba.flink</groupId> <artifactId>flink-dts-connector</artifactId> <version>1.1.1-SNAPSHOT</version> <classifier>jar-with-dependencies</classifier> </dependency> -
Di IntelliJ IDEA, buka folder-folder tersebut dan pilih file Java yang sesuai dengan jenis program connector Flink Anda.
-
Jika client Flink Anda menggunakan DataStream API, klik ganda file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java dan lakukan langkah-langkah berikut:
-
Di bagian atas jendela IntelliJ IDEA, klik ikon Run.

-
Pada menu yang muncul, klik .

-
Pada bidang Program arguments, masukkan parameter beserta nilainya berdasarkan contoh berikut. Kemudian, klik Run untuk memulai flink-dts-connector.
CatatanUntuk informasi selengkapnya tentang parameter dan cara memperoleh nilainya, lihat Parameter.
--broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043 -
Output menunjukkan bahwa client berhasil berlangganan perubahan data dari database sumber.
CatatanUntuk mengkueri catatan perubahan data tertentu, login ke UI Task Manager client Flink Anda.
-
-
Jika client Flink Anda menggunakan Table API atau SQL, klik ganda file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java dan lakukan langkah-langkah berikut:
CatatanSetiap file
DtsTableISelectTCaseTest.javahanya dapat dikonfigurasi untuk mengonsumsi data dari satu tabel. Untuk mengonsumsi data dari beberapa tabel, Anda harus mengonfigurasi dan menjalankan beberapa task independen.-
Tambahkan dua garis miring (
//) untuk memberi komentar pada baris kode tertentu.
-
Konfigurasikan informasi untuk satu tabel tempat Anda ingin mengonsumsi data. Pernyataan SQL didukung.
-
Konfigurasikan parameter untuk instans subscription. Untuk informasi selengkapnya tentang parameter dan cara memperoleh nilainya, lihat Parameter.

-
Di bagian atas jendela IntelliJ IDEA, klik Run'DtsTableISelectTCaseTest' untuk memulai flink-dts-connector.
-
Output menunjukkan bahwa client berhasil berlangganan perubahan data dari database sumber.
CatatanUntuk mengkueri catatan perubahan data tertentu, login ke UI Task Manager client Flink Anda.
-
-
Parameter
|
DtsExample.java |
DtsTableISelectTCaseTest.java |
Deskripsi |
Metode kueri |
|
|
|
Titik akhir jaringan dan Port instans subscription. Catatan
|
Di Konsol DTS, klik ID instans subscription target. Di halaman Basic Information, Anda dapat memperoleh informasi Topic dan Network. |
|
|
|
Topik instans subscription. |
|
|
|
|
ID kelompok konsumen. |
Di Konsol DTS, klik ID instans subscription target. Di panel navigasi sebelah kiri, klik Consume Data. Anda dapat memperoleh Consumer Group ID/Name dan Account kelompok konsumen. Catatan
Password untuk akun kelompok konsumen ditentukan saat Anda membuat kelompok konsumen. |
|
|
|
Akun kelompok konsumen. Peringatan
Jika Anda tidak menggunakan flink-dts-connector yang disediakan dalam topik ini, Anda harus mengatur username dalam format |
|
|
|
|
Password akun tersebut. |
|
|
|
|
Offset konsumen. Parameter ini menentukan timestamp Unix dari catatan data pertama yang akan dikonsumsi. Contoh: 1624440043. Catatan
Anda dapat menggunakan offset konsumen untuk:
|
Waktu mulai konsumsi harus berada dalam rentang data instans subscription dan harus dikonversi ke timestamp Unix. Catatan
|
|
N/A |
|
Objek yang dilanggankan. Anda hanya dapat menentukan satu tabel. Persyaratan formatnya adalah sebagai berikut:
|
Di Konsol DTS, klik ID instans subscription target. Di bagian atas halaman Basic Information atau Task Management, klik View Objects untuk menemukan database dan tabel yang dilanggankan. |
FAQ
|
Pesan error |
Kemungkinan penyebab |
Solusi |
|
Alih bencana pada modul DStore, yang digunakan Data Transmission Service (DTS) untuk membaca data inkremental, menyebabkan offset konsumen client Flink hilang. |
Jangan restart client. Sebagai gantinya, kueri offset konsumen client tersebut dan berikan kembali nilainya ke parameter |