All Products
Search
Document Center

Data Transmission Service:Mengonsumsi data yang dilacak menggunakan flink-dts-connector

Last Updated:Jun 21, 2026

Setelah mengonfigurasi instans pelacakan perubahan untuk Data Transmission Service (DTS), Anda dapat menggunakan flink-dts-connector untuk membuat program Flink yang mengonsumsi data tersebut.

Catatan penggunaan

  • Konektor ini mendukung program Flink yang menggunakan DataStream API, Table API, maupun SQL.

  • Jika program Flink Anda menggunakan Table API dan SQL, Anda hanya dapat mengonsumsi data dari satu tabel dalam satu waktu. Untuk mengonsumsi data dari beberapa tabel, Anda harus mengonfigurasi dan menjalankan tugas terpisah untuk masing-masing tabel.

Prosedur

Topik ini menggunakan IntelliJ IDEA Community Edition 2020.1 untuk Windows sebagai contoh untuk menunjukkan cara menggunakan flink-dts-connector guna mengonsumsi data yang dilacak dari instans pelacakan perubahan.

  1. Buat instans pelacakan perubahan. Untuk informasi selengkapnya, lihat Buat instans pelacakan perubahan untuk instans ApsaraDB RDS for MySQL, Buat instans pelacakan perubahan untuk kluster PolarDB for MySQL, atau Buat instans pelacakan perubahan untuk instans ApsaraDB for Oracle.

  2. Buat satu atau beberapa kelompok konsumen. Untuk informasi selengkapnya, lihat Buat kelompok konsumen.

  3. Unduh file flink-dts-connector dan ekstrak file tersebut.

  4. Jalankan IntelliJ IDEA dan klik Open or Import.

  5. Pada kotak dialog yang muncul, arahkan ke direktori tempat Anda mengekstrak file flink-dts-connector. Buka folder-folder tersebut hingga menemukan file Project Object Model (POM): pom.xml.

  6. Pada kotak dialog yang muncul, pilih Open as Project.

  7. Tambahkan dependensi berikut ke file pom.xml:

    <dependency>
          <groupId>com.alibaba.flink</groupId>
          <artifactId>flink-dts-connector</artifactId>
          <version>1.1.1-SNAPSHOT</version>
          <classifier>jar-with-dependencies</classifier>
    </dependency>
  8. Di IntelliJ IDEA, buka folder proyek dan pilih file Java yang sesuai berdasarkan jenis Flink API yang digunakan oleh program Anda.

    • Jika program Flink Anda menggunakan DataStream API, klik ganda file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java dan lakukan langkah-langkah berikut:

      1. Dari bilah menu atas, pilih Run > Run....

      2. Pada jendela pop-up, klik Edit > DtsExample.

      3. Pada bidang Program arguments, masukkan parameter beserta nilainya seperti pada contoh berikut, lalu klik Run untuk menjalankan flink-dts-connector.

        Catatan

        Untuk detail tentang parameter dan cara menemukan nilainya, lihat Parameter.

        --broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
      4. Output menunjukkan bahwa program berhasil melacak perubahan data dari database sumber. Setelah startup, catatan perubahan data dalam DataStream tampak seperti contoh berikut.

        LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006303@289540@2688@1625045211000]}
        LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006307@290047@2688@1625045212000]}
        LazyParseRecord {operationType [UPDATE], checkpoint [0@12006305@290016@2688@1625045212000]}
        LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006308@290047@2688@1625045214000]}
        LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006309@290047@2688@1625045215000]}
        LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006310@290047@2688@1625045216000]}
        LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006311@290047@2688@1625045217000]}
        Catatan

        Untuk melihat detail catatan perubahan data, login ke UI Task Manager program Flink Anda.

    • Jika program Flink Anda menggunakan Table API dan SQL, klik ganda file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java dan lakukan langkah-langkah berikut:

      Catatan

      Satu file DtsTableISelectTCaseTest.java hanya mendukung konfigurasi dan konsumsi data yang dilacak dari satu tabel. Untuk mengonsumsi data dari beberapa tabel, Anda harus mengonfigurasi dan menjalankan tugas terpisah untuk masing-masing tabel.

      1. Beri komentar pada baris properties.load dengan menambahkan awalan //.

        /*Memuat nilai parameter yang Anda atur di halaman platform ke objek Properties.*/
        //properties.load(new StringReader(new String(Files.readAllBytes(Paths.get(configFilePath)), StandardCharsets.UTF_8)));
      2. Tentukan satu tabel dari mana Anda ingin mengonsumsi data.

      3. Atur parameter untuk instance pelacakan perubahan. Untuk detail mengenai parameter dan cara menemukan nilainya, lihat Parameter.

        public static void main(String[] args) throws Exception {
            setup(args);
            final String createTable =
                    "create table `dts` (\n"
                    + " `ts` TIMESTAMP(3) METADATA FROM 'timestamp',\n"
                    + " `id` bigint,\n"
                    + " `name` varchar,\n"
                    + " `age` bigint,\n"
                    + " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND"
                    + ") with (\n"
                    + "'connector' = 'dts',"
                    + "'dts.server' = 'dts-cn-xxx:18001',"
                    + "'topic' = 'cn_hangzhou_rm_xxx_dtstest_version2',"
                    + "'dts.sid' = 'dtsxxx', "
                    + "'dts.user' = 'dtstest', "
                    + "'dts.password' = 'xxx',"
                    + "'dts.checkpoint' = '1624440043', "
                    + "'dts-cdc.table.name' = 'dtstestdata.order',"
                    + "'format' = 'dts-cdc')";
        }
      4. Di bagian atas antarmuka IntelliJ IDEA, klik Run 'DtsTableISelectTCaseTest' untuk menjalankan flink-dts-connector.

      5. Output menunjukkan bahwa program berhasil melacak perubahan data dari database sumber. Setelah startup, terminal menampilkan catatan changelog, dengan event pembaruan yang muncul berpasangan: nilai sebelum pembaruan (-U) dan nilai setelah pembaruan (+U).

        ######> (false,-U(2021-06-23T20:32:17.391,null,null,null))
        ######> (true,+U(2021-06-23T20:32:17.391,null,null,null))
        ######> (false,-U(2021-06-23T20:32:45.604,null,null,null))
        ######> (true,+U(2021-06-23T20:32:45.604,null,null,null))
        ######> (false,-U(2021-06-30T17:26:52.201,null,null,null))
        ######> (true,+U(2021-06-30T17:26:52.201,null,null,null))
        ######> (false,-U(2021-06-30T19:19:26.975,null,null,null))
        ######> (true,+U(2021-06-30T19:19:26.975,null,null,null))
        Catatan

        Untuk melihat detail catatan perubahan data, masuk ke UI Task Manager program Flink Anda.

Parameter

Parameter DataStream API

Parameter Table API

Deskripsi

Cara memperoleh

broker-url

dts.server

Titik akhir dan port instans pelacakan perubahan.

Catatan
  • Untuk meminimalkan latensi jaringan, gunakan titik akhir internal jika instans ECS program Flink Anda dan instans pelacakan perubahan berada dalam virtual private cloud (VPC) atau jaringan klasik yang sama.

  • Kami tidak menyarankan penggunaan titik akhir publik karena potensi ketidakstabilan jaringan.

Di Konsol Data Transmission Service (DTS), klik ID instans pelacakan perubahan target. Di halaman View Task Settings, Anda dapat menemukan Topic, titik akhir, dan port.

topic

topic

Topik yang dilacak dari instans pelacakan perubahan.

sid

dts.sid

ID kelompok konsumen.

Di Konsol DTS, klik ID instans pelacakan perubahan target, lalu klik Consume Data. Anda dapat menemukan ID dan Account kelompok konsumen.

Catatan

Password untuk username kelompok konsumen adalah password yang Anda tentukan saat membuat kelompok konsumen.

user

dts.user

Username untuk kelompok konsumen.

Peringatan

flink-dts-connector yang disediakan dalam topik ini secara otomatis menangani format username yang diperlukan. Jika Anda menggunakan client lain, Anda harus memformat username secara manual menjadi <username>-<consumer group ID> (misalnya, dtstest-dtsae******bpv) untuk mencegah kegagalan koneksi.

password

dts.password

Password untuk username tersebut.

checkpoint

dts.checkpoint

Offset konsumen. Ini adalah timestamp tempat flink-dts-connector mulai mengonsumsi data. Nilainya harus berupa Unix timestamp, misalnya, 1624440043.

Catatan

Offset konsumen berguna dalam skenario berikut:

  • Jika program konsumen terganggu, gunakan offset terakhir yang diproses untuk melanjutkan konsumsi dan mencegah kehilangan data.

  • Saat program konsumen dimulai, berikan offset tertentu untuk mulai mengonsumsi data dari titik waktu yang diinginkan.

Offset konsumen harus berada dalam rentang data instans pelacakan perubahan. Di halaman View Task Settings instans pelacakan perubahan DTS, Anda dapat menemukan waktu mulai dan akhir di bidang Data Range. Anda harus mengatur offset konsumen ke waktu dalam rentang tersebut dan mengonversinya ke Unix timestamp.

Catatan

Gunakan mesin pencari daring untuk menemukan konverter Unix timestamp.

N/A

dts-cdc.table.name

Parameter ini menentukan tabel untuk pelacakan perubahan. Hanya satu tabel yang didukung per tugas. Gunakan format berikut:

  • Untuk database MySQL, PolarDB for MySQL, PolarDB-X 1.0, dan PolarDB-X 2.0, gunakan format <Nama Database>.<Nama Tabel>.

  • Untuk jenis database lain, gunakan format <Nama Skema>.<Nama Tabel>.

Di Konsol DTS, klik ID instans pelacakan perubahan target. Di halaman View Task Settings, klik View Objects di pojok kanan atas untuk menemukan nama database dan tabel objek tersebut.

FAQ

Pesan error

Kemungkinan penyebab

Solusi

Cluster changed from *** to ***, consumer require restart.

Modul DStore, yang digunakan DTS untuk membaca data inkremental, telah dialihkan. Hal ini menyebabkan offset konsumen program Flink hilang.

Jangan restart program. Sebagai gantinya, temukan offset konsumen terakhir yang diketahui dan berikan sebagai parameter checkpoint atau dts.checkpoint dalam file DtsExample.java atau DtsTableISelectTCaseTest.java Anda untuk melanjutkan konsumsi.