All Products
Search
Document Center

Data Transmission Service:Gunakan flink-dts-connector untuk mengonsumsi data yang dilacak

Last Updated:Mar 29, 2026

Setelah Anda mengonfigurasi tugas pelacakan perubahan, gunakan flink-dts-connector untuk mengonsumsi data yang dilacak dalam program Flink. Data Transmission Service (DTS) mendukung dua model pemrograman Flink: DataStream API dan Table API & SQL.

Catatan penggunaan

  • Table API & SQL: Setiap konfigurasi tugas pelacakan perubahan hanya mengonsumsi data dari satu tabel. Untuk mengonsumsi data dari beberapa tabel, konfigurasikan tugas terpisah untuk masing-masing tabel.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

Siapkan flink-dts-connector

Pada contoh ini, digunakan IntelliJ IDEA Community Edition 2020.1 untuk Windows.

  1. Unduh repositori flink-dts-connector dan ekstrak.

  2. Buka proyek di IntelliJ IDEA:

    • Pada layar selamat datang, klik Open or Import.

      Open a project

    • Arahkan ke direktori hasil ekstraksi, lalu buka folder-folder hingga menemukan pom.xml.

      Find the pom.xml file

    • Klik Open as Project.

  3. Tambahkan dependensi berikut ke pom.xml Anda:

    <dependency>
      <groupId>com.alibaba.flink</groupId>
      <artifactId>flink-dts-connector</artifactId>
      <version>1.1.1-SNAPSHOT</version>
      <classifier>jar-with-dependencies</classifier>
    </dependency>

Jalankan dengan DataStream API

Buka flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java.

Untuk menjalankan konektor:

  1. Pada bilah menu atas, klik ikon Run.

    Run icon

  2. Pilih DtsExample > Edit.

    edit

  3. Pada bidang Program arguments, masukkan parameter koneksi, lalu klik Run. Ganti nilai placeholder dengan nilai aktual Anda:

    --broker-url <endpoint>:<port> --topic <topic-name> --sid <consumer-group-id> --user <username> --password <password> --checkpoint <unix-timestamp>

    Contohnya:

    --broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043

    Untuk deskripsi parameter dan cara menemukan masing-masing nilai, lihat Parameter.

Saat konektor berhasil dijalankan, program Flink akan melacak dan menampilkan perubahan data dari database sumber.

Data changes (DataStream API)
Untuk mengkueri catatan perubahan tertentu, buka halaman Task Manager program Flink.

Jalankan dengan Table API & SQL

Satu file DtsTableISelectTCaseTest.java hanya mendukung satu tugas pelacakan perubahan dan satu tabel. Untuk mengonsumsi data dari beberapa tabel, konfigurasikan tugas terpisah untuk masing-masing tabel.

Buka flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java.

Untuk menjalankan konektor:

  1. Beri komentar pada baris konfigurasi yang ada dengan menambahkan // di awal setiap baris.

    Add comments

  2. Tentukan tabel yang akan dilacak menggunakan pernyataan SQL.

  3. Atur parameter koneksi untuk instans pelacakan perubahan. Untuk deskripsi parameter, lihat Parameter.

    Parameters for Table API & SQL

  4. Pada bilah menu atas, klik Run 'DtsTableISelectTCaseTest'.

Saat konektor berhasil dijalankan, program Flink akan melacak dan menampilkan perubahan data dari database sumber.

Data changes (Table API & SQL)
Untuk mengkueri catatan perubahan tertentu, buka halaman Task Manager program Flink.

Parameter

Parameter (DataStream API)Parameter (Table API & SQL)DeskripsiCara mendapatkan nilainya
broker-urldts.serverTitik akhir dan nomor port instans pelacakan perubahan. Untuk latensi jaringan minimal, deploy program Flink pada instans Elastic Compute Service (ECS) di jaringan klasik atau di virtual private cloud (VPC) yang sama dengan instans pelacakan perubahan.Di Konsol DTS, klik ID instans. Pada halaman Basic Information, lihat bagian Topic dan Network.
topictopicNama topik instans pelacakan perubahan.Sama seperti di atas.
siddts.sidID kelompok konsumen.Di Konsol DTS, klik ID instans. Di panel navigasi kiri, klik Consume Data. Lihat bagian Consumer Group ID/Name dan Account.
userdts.userUsername kelompok konsumen.
Peringatan

Jika Anda menggunakan client lain (bukan flink-dts-connector), format username sebagai <Username>-<Consumer group ID>, misalnya, dtstest-dtsae******bpv. Jika tidak, koneksi akan gagal.

Sama seperti di atas.
passworddts.passwordPassword kelompok konsumen.Ditentukan saat Anda membuat kelompok konsumen.
checkpointdts.checkpointTitik pemeriksaan konsumsi — stempel waktu UNIX tempat konektor mulai mengonsumsi data. Lihat Atur titik pemeriksaan konsumsi.Stempel waktu harus berada dalam rentang data instans pelacakan perubahan. Lihat bagian Data Range di halaman Tugas Pelacakan Perubahan. Gunakan konverter online untuk mendapatkan stempel waktu UNIX.
N/Adts-cdc.table.nameTabel yang akan dilacak, dalam format <Database name>.<Table name>, misalnya, dtstestdata.order. Satu tabel per konfigurasi.Di Konsol DTS, klik ID instans. Pada halaman Basic Information atau Task Management, klik View Objects.

Atur titik pemeriksaan konsumsi

Parameter checkpoint / dts.checkpoint mengontrol posisi awal pembacaan data oleh konektor. Gunakan dalam dua skenario:

  • Melanjutkan setelah gangguan: Setelah konsumsi terganggu, atur checkpoint ke offset terakhir yang dikonsumsi agar dapat dilanjutkan tanpa kehilangan data.

  • Mulai dari titik tertentu: Atur checkpoint ke stempel waktu apa pun dalam rentang data instans pelacakan perubahan untuk mengonsumsi data mulai dari titik tersebut.

Nilainya harus berupa stempel waktu UNIX dan berada dalam rentang data instans pelacakan perubahan. Lihat rentang data di kolom Data Range pada halaman Tugas Pelacakan Perubahan.

Pemecahan masalah

Pesan errorPenyebabSolusi
Cluster changed from * to *, consumer require restart.Modul DStore yang digunakan DTS untuk membaca data inkremental beralih, menyebabkan program Flink kehilangan offset konsumennya.Jangan restart program Flink. Kueri offset konsumen saat ini dan atur checkpoint (DataStream API) atau dts.checkpoint (Table API & SQL) ke nilai tersebut untuk melanjutkan konsumsi.

Langkah selanjutnya