All Products
Search
Document Center

Data Transmission Service:Konsumsi data dengan flink-dts-connector

Last Updated:Jun 05, 2026

Setelah mengonfigurasi instans subscription, Anda dapat menggunakan flink-dts-connector untuk mengonsumsi data dari instans tersebut melalui client Flink. Topik ini menjelaskan cara menggunakan flink-dts-connector.

Catatan penggunaan

  • Connector ini mendukung client Flink yang menggunakan DataStream API, Table API, atau SQL.

  • Jika client Flink Anda menggunakan Table API atau SQL, Anda hanya dapat mengonsumsi data dari satu tabel per konfigurasi. Untuk mengonsumsi data dari beberapa tabel, Anda harus mengonfigurasi dan menjalankan beberapa task independen.

Prosedur

Topik ini menggunakan IntelliJ IDEA Community Edition 2020.1 untuk Windows sebagai contoh cara menggunakan flink-dts-connector untuk mengonsumsi data dari instans subscription.

  1. Buat tugas pelacakan perubahan. Untuk informasi selengkapnya, lihat topik terkait di Ikhtisar skenario pelacakan perubahan.
  2. Buat satu atau beberapa kelompok konsumen. Untuk informasi selengkapnya, lihat Tambahkan kelompok konsumen.
  3. Unduh dan ekstrak flink-dts-connector.

  4. Buka IntelliJ IDEA dan klik Open or Import.

    打开工程

  5. Pada kotak dialog yang muncul, arahkan ke direktori flink-dts-connector yang telah diekstrak, buka folder-folder tersebut, lalu cari file Project Object Model (pom.xml).

    pom模型

  6. Pada kotak dialog yang muncul, pilih Open as Project.
  7. Tambahkan dependensi berikut ke file pom.xml:

    <dependency>
          <groupId>com.alibaba.flink</groupId>
          <artifactId>flink-dts-connector</artifactId>
          <version>1.1.1-SNAPSHOT</version>
          <classifier>jar-with-dependencies</classifier>
    </dependency>
  8. Di IntelliJ IDEA, buka folder-folder tersebut dan pilih file Java yang sesuai dengan jenis program connector Flink Anda.

    • Jika client Flink Anda menggunakan DataStream API, klik ganda file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java dan lakukan langkah-langkah berikut:

      1. Di bagian atas jendela IntelliJ IDEA, klik ikon Run.run图标

      2. Pada menu yang muncul, klik DtsExample > Edit.edit

      3. Pada bidang Program arguments, masukkan parameter beserta nilainya berdasarkan contoh berikut. Kemudian, klik Run untuk memulai flink-dts-connector.

        Catatan

        Untuk informasi selengkapnya tentang parameter dan cara memperoleh nilainya, lihat Parameter.

        --broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
      4. Output menunjukkan bahwa client berhasil berlangganan perubahan data dari database sumber.数据变更信息(DataStream)

        Catatan

        Untuk mengkueri catatan perubahan data tertentu, login ke UI Task Manager client Flink Anda.

    • Jika client Flink Anda menggunakan Table API atau SQL, klik ganda file flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java dan lakukan langkah-langkah berikut:

      Catatan

      Setiap file DtsTableISelectTCaseTest.java hanya dapat dikonfigurasi untuk mengonsumsi data dari satu tabel. Untuk mengonsumsi data dari beberapa tabel, Anda harus mengonfigurasi dan menjalankan beberapa task independen.

      1. Tambahkan dua garis miring (//) untuk memberi komentar pada baris kode tertentu.注释掉一行

      2. Konfigurasikan informasi untuk satu tabel tempat Anda ingin mengonsumsi data. Pernyataan SQL didukung.

      3. Konfigurasikan parameter untuk instans subscription. Untuk informasi selengkapnya tentang parameter dan cara memperoleh nilainya, lihat Parameter.table api和sql的参数配置

      4. Di bagian atas jendela IntelliJ IDEA, klik Run'DtsTableISelectTCaseTest' untuk memulai flink-dts-connector.

      5. Output menunjukkan bahwa client berhasil berlangganan perubahan data dari database sumber.tableapi和sql-数据变更信息

        Catatan

        Untuk mengkueri catatan perubahan data tertentu, login ke UI Task Manager client Flink Anda.

Parameter

DtsExample.java

DtsTableISelectTCaseTest.java

Deskripsi

Metode kueri

broker-url

dts.server

Titik akhir jaringan dan Port instans subscription.

Catatan
  • Jika instans ECS tempat client Flink Anda dideploy dan instans subscription berada dalam jaringan klasik atau Virtual Private Cloud (VPC) yang sama, kami menyarankan Anda menggunakan titik akhir internal untuk berlangganan data guna meminimalkan latensi jaringan.

  • Kami tidak menyarankan penggunaan titik akhir publik karena potensi ketidakstabilan jaringan.

Di Konsol DTS, klik ID instans subscription target. Di halaman Basic Information, Anda dapat memperoleh informasi Topic dan Network.

topic

topic

Topik instans subscription.

sid

dts.sid

ID kelompok konsumen.

Di Konsol DTS, klik ID instans subscription target. Di panel navigasi sebelah kiri, klik Consume Data. Anda dapat memperoleh Consumer Group ID/Name dan Account kelompok konsumen.

Catatan

Password untuk akun kelompok konsumen ditentukan saat Anda membuat kelompok konsumen.

user

dts.user

Akun kelompok konsumen.

Peringatan

Jika Anda tidak menggunakan flink-dts-connector yang disediakan dalam topik ini, Anda harus mengatur username dalam format <consumer_group_account>-<consumer_group_ID>, seperti dtstest-dtsae******bpv. Jika tidak, koneksi akan gagal.

password

dts.password

Password akun tersebut.

checkpoint

dts.checkpoint

Offset konsumen. Parameter ini menentukan timestamp Unix dari catatan data pertama yang akan dikonsumsi. Contoh: 1624440043.

Catatan

Anda dapat menggunakan offset konsumen untuk:

  • Melanjutkan konsumsi dari offset terakhir setelah aplikasi terganggu guna mencegah kehilangan data.

  • Menentukan offset untuk mulai mengonsumsi data dari titik waktu tertentu.

Waktu mulai konsumsi harus berada dalam rentang data instans subscription dan harus dikonversi ke timestamp Unix.

Catatan
  • Anda dapat melihat rentang data instans subscription di kolom Data Range daftar tugas subscription.

  • Anda dapat menggunakan mesin pencari untuk menemukan alat konversi timestamp Unix.

N/A

dts-cdc.table.name

Objek yang dilanggankan. Anda hanya dapat menentukan satu tabel. Persyaratan formatnya adalah sebagai berikut:

  • Jika Database Type adalah MySQL, PolarDB for MySQL, PolarDB-X 1.0, atau PolarDB-X 2.0, gunakan format <database_name>.<table_name>.

  • Untuk jenis database lainnya, gunakan format <schema_name>.<table_name>.

Di Konsol DTS, klik ID instans subscription target. Di bagian atas halaman Basic Information atau Task Management, klik View Objects untuk menemukan database dan tabel yang dilanggankan.

FAQ

Pesan error

Kemungkinan penyebab

Solusi

Cluster changed from *** to ***, consumer require restart.

Alih bencana pada modul DStore, yang digunakan Data Transmission Service (DTS) untuk membaca data inkremental, menyebabkan offset konsumen client Flink hilang.

Jangan restart client. Sebagai gantinya, kueri offset konsumen client tersebut dan berikan kembali nilainya ke parameter checkpoint atau dts.checkpoint dalam file DtsExample.java atau DtsTableISelectTCaseTest.java untuk melanjutkan konsumsi data.