全部产品
Search
文档中心

Data Transmission Service:Gunakan flink-dts-connector untuk mengonsumsi data yang dilacak

更新时间:Jul 02, 2025

Setelah mengonfigurasi tugas pelacakan perubahan, Anda dapat menggunakan file flink-dts-connector untuk mengonsumsi data yang dilacak. Topik ini menjelaskan cara menggunakan file flink-dts-connector untuk mengonsumsi data tersebut.

Batasan

  • Data Transmission Service (DTS) mendukung jenis program Flink berikut: DataStream API dan Table API & SQL.

  • Jika Anda menggunakan program Table API & SQL, Anda hanya dapat mengonsumsi data dari satu tabel setiap kali mengonfigurasi tugas pelacakan perubahan. Untuk mengonsumsi data dari beberapa tabel, Anda harus mengonfigurasi tugas terpisah untuk setiap tabel.

Prosedur

Contoh ini menggunakan IntelliJ IDEA Community Edition 2020.1 untuk Windows.

  1. Buat tugas pelacakan perubahan. Untuk informasi lebih lanjut, lihat Lacak Perubahan Data dari Instance ApsaraDB RDS for MySQL, Lacak Perubahan Data dari Kluster PolarDB for MySQL, atau Lacak Perubahan Data dari Database Oracle yang Dikelola Sendiri.

  2. Buat satu atau lebih grup konsumen. Untuk informasi lebih lanjut, lihat Buat Grup Konsumen.

  3. Unduh file flink-dts-connector dan ekstrak file tersebut.

  4. Buka IntelliJ IDEA. Di jendela yang muncul, klik Open or Import.

    Open a project

  5. Di kotak dialog yang muncul, navigasikan ke direktori tempat file flink-dts-connector diekstraksi, lalu perluas folder untuk menemukan file pom.xml.

    Find the pom.xml file

  6. Di kotak dialog yang muncul, pilih Open as Project.

  7. Tambahkan dependensi berikut ke file pom.xml:

    <dependency>
          <groupId>com.alibaba.flink</groupId>
          <artifactId>flink-dts-connector</artifactId>
          <version>1.1.1-SNAPSHOT</version>
          <classifier>jar-with-dependencies</classifier>
    </dependency>
  8. Di IntelliJ IDEA, perluas folder untuk menemukan file Java. Klik dua kali file Java sesuai dengan jenis konektor Flink yang digunakan.

    • Jika menggunakan konektor DataStream API, klik dua kali file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java dan lakukan langkah-langkah berikut:

      1. Di bilah menu atas IntelliJ IDEA, klik ikon Run. Run icon

      2. Di kotak dialog yang muncul, pilih DtsExample > Edit. edit

      3. Di bidang Program arguments, masukkan parameter dan nilai yang sesuai, lalu klik Run untuk menjalankan flink-dts-connector.

        Catatan

        Untuk informasi lebih lanjut tentang parameter dan metode untuk mendapatkan nilai parameter, lihat bagian Parameter dari topik ini.

        --broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
      4. Gambar berikut menunjukkan bahwa program Flink dapat melacak perubahan data dari database sumber. Data changes (DataStream API)

        Catatan

        Untuk memeriksa catatan tertentu dari perubahan data, Anda dapat membuka halaman Task Manager program Flink.

    • Jika menggunakan konektor Table API & SQL, klik dua kali file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java dan lakukan langkah-langkah berikut:

      Catatan

      File DtsTableISelectTCaseTest.java tunggal hanya dapat digunakan untuk mengonfigurasi satu tugas pelacakan perubahan dan mengonsumsi data dari satu tabel. Jika ingin mengonsumsi data dari beberapa tabel, Anda harus mengonfigurasi tugas terpisah untuk setiap tabel.

      1. Masukkan dua garis miring (//) dan tambahkan komentar, seperti yang ditunjukkan pada gambar berikut. Add comments

      2. Tentukan informasi tentang tabel dari mana Anda ingin melacak perubahan data. Pernyataan SQL didukung.

      3. Konfigurasikan parameter yang diperlukan untuk instance pelacakan perubahan. Untuk informasi lebih lanjut, lihat bagian Parameter dari topik ini. Parameters for Table API & SQL

      4. Di bilah menu atas IntelliJ IDEA, klik Run'DtsTableISelectTCaseTest' untuk menjalankan flink-dts-connector.

      5. Gambar berikut menunjukkan bahwa program Flink dapat melacak perubahan data dari database sumber. Data changes (Table API & SQL)

        Catatan

        Untuk memeriksa catatan tertentu dari perubahan data, Anda dapat membuka halaman Task Manager program Flink.

Parameter

Parameter dalam file DstExample

Parameter dalam file DtsTableISelectTCaseTest

Deskripsi

Metode untuk mendapatkan nilai parameter

broker-url

dts.server

Alamat jaringan dan nomor port dari instance pelacakan perubahan.

Catatan

Jika Anda melacak perubahan data melalui jaringan internal, latensi jaringan minimal. Ini berlaku jika instance Elastic Compute Service (ECS) tempat Anda menyebarkan program Flink berada di jaringan klasik atau di virtual private cloud (VPC) yang sama dengan instance pelacakan perubahan.

Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di halaman View Task Settings, Anda dapat melihat topik yang dilacak, alamat jaringan, dan nomor port. Topic

topic

topic

Nama topik dari instance pelacakan perubahan.

sid

dts.sid

ID grup konsumen.

Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di panel navigasi di sebelah kiri, klik Consume Data. Anda dapat melihat ID dan nama pengguna grup konsumen.

Catatan

Kata sandi nama pengguna grup konsumen ditentukan saat Anda membuat grup konsumen.

Data consumption

user

dts.user

Nama pengguna grup konsumen.

Peringatan

Jika Anda tidak menggunakan file flink-dts-connector yang dijelaskan dalam topik ini, Anda harus menentukan parameter ini dalam format berikut: <Nama Pengguna>-<ID Grup Konsumen>. Jika tidak, koneksi gagal. Contoh: dtstest-dtsae******bpv.

password

dts.password

Kata sandi nama pengguna grup konsumen.

checkpoint

dts.checkpoint

Offset konsumen. Ini adalah timestamp yang dihasilkan ketika flink-dts-connector mengonsumsi data pertama kali. Nilainya adalah timestamp UNIX. Contoh: 1624440043.

Catatan

Offset konsumen dapat digunakan dalam skenario berikut:

  • Jika proses konsumsi terganggu, Anda dapat menentukan offset konsumen untuk melanjutkan konsumsi data. Ini memungkinkan Anda mencegah kehilangan data.

  • Saat Anda memulai klien pelacakan perubahan, Anda dapat menentukan offset konsumen untuk mengonsumsi data berdasarkan kebutuhan bisnis Anda.

Offset konsumen harus berada dalam rentang data instance pelacakan perubahan, seperti yang ditunjukkan pada gambar berikut. Offset konsumen harus dikonversi ke timestamp UNIX. Timestamp range

Catatan

Anda dapat menggunakan mesin pencari untuk mendapatkan konverter timestamp UNIX.

Tidak ada

dts-cdc.table.name

Objek untuk pelacakan perubahan. Anda hanya dapat menentukan satu tabel. Anda harus mematuhi persyaratan berikut saat menentukan nama tabel:

  • Jika basis data bertipe MySQL, PolarDB for MySQL, PolarDB-X 1.0, atau PolarDB-X 2.0, tentukan nama tabel dalam format <Nama Basis Data>.<Nama Tabel>.

  • Jika database adalah tipe lain, tentukan nama tabel dalam format <Nama Skema>.<Nama Tabel>.

Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di halaman View Task Settings, klik View Objects for Change tracking di pojok kanan atas. Di kotak dialog yang muncul, Anda dapat melihat database dan tabel tempat objek untuk pelacakan perubahan berada.

FAQ

Pesan kesalahan

Penyebab yang mungkin

Solusi

Cluster changed from *** to ***, consumer require restart.

Modul DStore yang digunakan oleh DTS untuk membaca data inkremental beralih. Akibatnya, offset konsumen program Flink hilang.

Anda tidak perlu memulai ulang program Flink. Anda hanya perlu memeriksa offset konsumen program Flink dan menyetel parameter checkpoint atau dts.checkpoint di file DtsExample.java dan DtsTableISelectTCaseTest.java lagi untuk melanjutkan konsumsi data.