全部产品
Search
文档中心

Data Transmission Service:Gunakan flink-dts-connector untuk mengonsumsi data yang dilacak

更新时间:Jul 02, 2025

Setelah mengonfigurasi tugas pelacakan perubahan, Anda dapat menggunakan file flink-dts-connector untuk mengonsumsi data yang dilacak. Topik ini menjelaskan cara menggunakan file flink-dts-connector untuk mengonsumsi data tersebut.

Catatan penggunaan

  • Data Transmission Service (DTS) mendukung jenis program Flink berikut: DataStream API dan Table API & SQL.

  • Jika menggunakan program Table API & SQL, Anda hanya dapat mengonsumsi data dari satu tabel setiap kali mengonfigurasi tugas pelacakan perubahan. Untuk mengonsumsi data dari beberapa tabel, konfigurasikan tugas terpisah untuk setiap tabel.

Prosedur

Contoh ini menggunakan IntelliJ IDEA Community Edition 2020.1 di Windows.

  1. Buat tugas pelacakan perubahan. Untuk informasi lebih lanjut, lihat topik terkait di Ikhtisar Skenario Pelacakan Perubahan.
  2. Buat satu atau beberapa grup konsumen. Untuk informasi lebih lanjut, lihat Buat Grup Konsumen.
  3. Unduh file flink-dts-connector dan ekstrak file tersebut.

  4. Buka IntelliJ IDEA. Di jendela yang muncul, klik Open or Import.

    Open a project

  5. Di kotak dialog yang muncul, buka direktori tempat file flink-dts-connector diekstrak, lalu perluas folder untuk menemukan file pom.xml.

    Find the pom.xml file

  6. Di kotak dialog yang muncul, pilih Open as Project.
  7. Tambahkan dependensi berikut ke file pom.xml:

    <dependency>
          <groupId>com.alibaba.flink</groupId>
          <artifactId>flink-dts-connector</artifactId>
          <version>1.1.1-SNAPSHOT</version>
          <classifier>jar-with-dependencies</classifier>
    </dependency>
  8. Di IntelliJ IDEA, perluas folder untuk menemukan file Java. Lalu, klik dua kali file Java sesuai dengan jenis konektor Flink yang digunakan.

    • Jika menggunakan konektor DataStream API, klik dua kali file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java dan lakukan langkah-langkah berikut:

      1. Di bilah menu atas IntelliJ IDEA, klik ikon Run. Run icon

      2. Di kotak dialog yang muncul, pilih DtsExample > Edit. edit

      3. Di bidang Program arguments, masukkan parameter dan nilai yang sesuai, lalu klik Run untuk menjalankan flink-dts-connector.

        Catatan

        Untuk informasi lebih lanjut tentang parameter dan metode untuk mendapatkan nilai parameter, lihat bagian Parameter dari topik ini.

        --broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
      4. Gambar berikut menunjukkan bahwa program Flink dapat melacak perubahan data dari database sumber. Data changes (DataStream API)

        Catatan

        Untuk memeriksa catatan tertentu dari perubahan data, Anda dapat membuka halaman Task Manager dari program Flink.

    • Jika menggunakan konektor Table API & SQL, klik dua kali file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java dan lakukan langkah-langkah berikut:

      Catatan

      File DtsTableISelectTCaseTest.java tunggal hanya dapat digunakan untuk mengonfigurasi satu tugas pelacakan perubahan dan mengonsumsi data dari satu tabel saja. Jika ingin mengonsumsi data dari beberapa tabel, konfigurasikan tugas terpisah untuk setiap tabel.

      1. Masukkan dua garis miring (//) dan tambahkan komentar, seperti ditunjukkan pada gambar berikut. Add comments

      2. Tentukan informasi tabel dari mana Anda ingin melacak perubahan data. Pernyataan SQL didukung.

      3. Atur parameter yang diperlukan untuk instance pelacakan perubahan. Untuk informasi lebih lanjut, lihat bagian Parameter dari topik ini. Parameters for Table API & SQL

      4. Di bilah menu atas IntelliJ IDEA, klik Run'DtsTableISelectTCaseTest' untuk menjalankan flink-dts-connector.

      5. Gambar berikut menunjukkan bahwa program Flink dapat melacak perubahan data dari database sumber. Data changes (Table API & SQL)

        Catatan

        Untuk memeriksa catatan tertentu dari perubahan data, Anda dapat membuka halaman Task Manager dari program Flink.

Parameter

Parameter dalam file DstExample

Parameter dalam file DtsTableISelectTCaseTest

Deskripsi

Metode untuk mendapatkan nilai parameter

broker-url

dts.server

Titik akhir dan nomor port dari instance pelacakan perubahan.

Catatan

Jika Anda melacak perubahan data melalui jaringan internal, latensi jaringan minimal. Ini berlaku jika instance Elastic Compute Service (ECS) tempat Anda menyebarkan program Flink berada di jaringan klasik atau di virtual private cloud (VPC) yang sama dengan instance pelacakan perubahan.

Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instansinya. Di halaman Basic Information, Anda dapat melihat Topic dan Network dari instansinya.

topic

topic

Nama topik dari instance pelacakan perubahan.

sid

dts.sid

ID grup konsumen.

Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instansinya. Di panel navigasi di sebelah kiri, klik Consume Data. Anda dapat melihat Consumer Group ID/Name dari instansinya dan Account dari grup konsumen.

Catatan

Kata sandi akun grup konsumen ditentukan saat Anda membuat grup konsumen.

user

dts.user

Nama pengguna grup konsumen.

Peringatan

Jika Anda tidak menggunakan file flink-dts-connector yang dijelaskan dalam topik ini, Anda harus menentukan parameter ini dalam format berikut: <Nama Pengguna>-<ID Grup Konsumen>. Jika tidak, koneksi akan gagal. Contoh: dtstest-dtsae******bpv.

password

dts.password

Kata sandi grup konsumen.

checkpoint

dts.checkpoint

Offset konsumen. Ini adalah timestamp yang dihasilkan ketika flink-dts-connector mengonsumsi data pertama kali. Nilainya adalah timestamp UNIX. Contoh: 1624440043.

Catatan

Offset konsumen dapat digunakan dalam skenario berikut:

  • Setelah proses konsumsi terganggu, Anda dapat menentukan offset konsumen untuk melanjutkan konsumsi data. Ini memungkinkan Anda mencegah kehilangan data.

  • Saat Anda memulai klien pelacakan perubahan, Anda dapat menentukan offset konsumen untuk mengonsumsi data berdasarkan kebutuhan bisnis Anda.

Offset konsumen dari data yang dikonsumsi harus berada dalam rentang data dari instance pelacakan perubahan. Offset konsumen harus dikonversi menjadi timestamp UNIX.

Catatan
  • Anda dapat melihat rentang data dari instance pelacakan perubahan di kolom Data Range di halaman Tugas Pelacakan Perubahan.

  • Anda dapat menggunakan mesin pencari untuk mendapatkan konverter timestamp UNIX.

N/A

dts-cdc.table.name

Objek untuk pelacakan perubahan. Anda hanya dapat menentukan satu tabel dalam format <Nama Database>.<Nama Tabel>. Contoh: dtstestdata.order.

Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instansinya. Di bagian atas halaman Basic Information atau Task Management, klik View Objects untuk melihat database dan tabel tempat objek untuk pelacakan perubahan berada.

FAQ

Pesan kesalahan

Penyebab yang mungkin

Solusi

Cluster changed from *** to ***, consumer require restart.

Modul DStore yang digunakan oleh DTS untuk membaca data inkremental beralih. Akibatnya, offset konsumen dari program Flink hilang.

Anda tidak perlu me-restart program Flink. Anda hanya perlu memeriksa offset konsumen dari program Flink dan menyetel parameter checkpoint atau dts.checkpoint di file DtsExample.java dan DtsTableISelectTCaseTest.java lagi untuk melanjutkan konsumsi data.