全部产品
Search
文档中心

Data Transmission Service:Gunakan demo SDK untuk mengonsumsi data yang dilacak

更新时间:Jul 02, 2025

Setelah mengonfigurasi tugas pelacakan perubahan, Anda dapat menggunakan demo SDK yang disediakan oleh Data Transmission Service (DTS) untuk melacak dan mengonsumsi data. Topik ini menjelaskan cara menggunakan demo SDK untuk mengonsumsi data yang dilacak.

Prosedur

Penting

Topik ini menjelaskan cara menggunakan demo SDK untuk mengonsumsi data yang dilacak. Contoh ini menggunakan IntelliJ IDEA Community Edition 2020.1 untuk Windows.

  1. Buat tugas pelacakan perubahan. Untuk detail lebih lanjut, lihat Lacak Perubahan Data dari Instance ApsaraDB RDS for MySQL, Lacak Perubahan Data dari Kluster PolarDB for MySQL, atau Lacak Perubahan Data dari Database Oracle yang Dikelola Sendiri.

  2. Buat satu atau beberapa kelompok konsumen. Untuk informasi lebih lanjut, lihat Buat Kelompok Konsumen.

  3. Gunakan demo SDK sesuai dengan kebutuhan bisnis Anda.

    • (Direkomendasikan) Gunakan SDK Pelacakan Perubahan Baru yang Dikemas

      1. Buka IntelliJ IDEA dan klik Create New Project.

      2. Di proyek yang dibuat, temukan file pom.xml.

      3. Tambahkan dependensi berikut ke file pom.xml:

        <dependency>
            <groupId>com.aliyun.dts</groupId>
            <artifactId>dts-new-subscribe-sdk</artifactId>
            <version>{dts_new_sdk_version}</version>
        </dependency>
        Catatan

        Anda dapat menemukan versi terbaru dari SDK Pelacakan Perubahan di halaman dts-new-subscribe-sdk.

      4. Gunakan SDK Pelacakan Perubahan Baru. Untuk detail lebih lanjut tentang kode demo, lihat Kode Demo SDK.

    • Gunakan SDK Pelacakan Perubahan Baru dengan Menyesuaikan Kode

      1. Unduh Paket Demo SDK dan ekstrak paket tersebut.

        Catatan

        Untuk mengunduh paket, pilih code > Download ZIP.

      2. Buka direktori tempat paket diekstraksi. Kemudian, buka file pom.xml menggunakan editor teks dan ubah versi SDK ke versi terbaru. Set the SDK version

        Penting

        Versi terbaru dari SDK Pelacakan Perubahan dapat ditemukan di situs Maven. Untuk informasi lebih lanjut, kunjungi Halaman Maven dari SDK Pelacakan Perubahan.

      3. Buka IntelliJ IDEA. Di jendela yang muncul, klik Open or Import. Open a project

      4. Di kotak dialog yang muncul, navigasikan ke direktori tempat paket diekstraksi, temukan file pom.xml, lalu klik OK. Find the pom.xml file

      5. Di kotak dialog yang muncul, pilih Open as Project.

      6. Di IntelliJ IDEA, perluas folder untuk menemukan file Java. Lalu, klik dua kali file Java berdasarkan mode penggunaan klien SDK. File Java DTSConsumerAssignDemo.java dan DTSConsumerSubscribeDemo.java tersedia. Java files of the client

        Catatan

        DTS mendukung mode berikut dalam menggunakan klien SDK:

        • Mode ASSIGN: Untuk memastikan urutan global pesan, DTS hanya menetapkan satu partisi (Partition 0) ke setiap topik yang dilacak. Jika Anda menggunakan klien SDK dalam mode ASSIGN, kami sarankan untuk memulai hanya satu klien SDK.

        • Mode SUBSCRIBE: Untuk memastikan urutan global pesan, DTS hanya menetapkan satu partisi (Partition 0) ke setiap topik yang dilacak. Dalam mode SUBSCRIBE, Anda dapat memulai beberapa klien SDK dalam kelompok konsumen secara bersamaan untuk mengimplementasikan pemulihan bencana. Jika klien SDK dalam kelompok konsumen gagal, klien SDK lainnya akan dialokasikan secara acak dan otomatis ke Partition 0 untuk melanjutkan konsumsi data.

  4. Atur parameter yang diperlukan dalam kode file Java.

    assigndemo

    Tabel 1. Parameter yang Diperlukan

    Parameter

    Deskripsi

    Cara mendapatkan

    brokerUrl

    Titik akhir dan nomor port dari instance pelacakan perubahan.

    Catatan

    Jika Anda melacak perubahan data melalui jaringan internal, latensi jaringan minimal. Ini berlaku jika instance Elastic Compute Service (ECS) tempat Anda menyebarkan klien SDK termasuk dalam jaringan klasik atau VPC yang sama dengan instance pelacakan perubahan.

    Di konsol DTS baru, klik ID instance. Di halaman Basic Information, Anda dapat memperoleh titik akhir dan nomor port di bagian Network. Network

    topic

    Nama topik dari instance pelacakan perubahan.

    Di konsol DTS, klik ID instance. Di halaman Basic Information, Anda dapat memperoleh topik yang dilacak di bagian Basic Information. topic

    sid

    ID kelompok konsumen.

    Di konsol DTS, klik ID instance. Di panel navigasi di sebelah kiri, klik Consume Data. Anda dapat memperoleh ID dan akun kelompok konsumen.

    Catatan

    Kata sandi akun kelompok konsumen ditentukan secara otomatis saat Anda membuat kelompok konsumen.

    Consumer group account

    userName

    Akun kelompok konsumen.

    Peringatan

    Jika Anda tidak menggunakan klien SDK yang dijelaskan dalam topik ini, Anda harus menentukan parameter ini dalam format berikut: <Username>-<Consumer group ID>. Contoh: dtstest-dtsae******bpv. Jika tidak, koneksi gagal.

    password

    Kata sandi akun.

    initCheckpoint

    Checkpoint konsumsi. Ini adalah timestamp ketika klien SDK mengonsumsi catatan data pertama. Nilainya adalah timestamp UNIX. Contoh: 1620962769.

    Catatan

    Checkpoint konsumsi dapat digunakan dalam skenario berikut:

    • Jika proses konsumsi terganggu, Anda dapat menentukan checkpoint konsumsi untuk melanjutkan konsumsi data. Ini memungkinkan Anda mencegah kehilangan data.

    • Saat Anda memulai klien pelacakan perubahan, Anda dapat menentukan checkpoint konsumsi untuk mengonsumsi data sesuai permintaan.

    Checkpoint konsumsi harus berada dalam rentang data instance pelacakan perubahan, seperti yang ditunjukkan pada gambar berikut. Checkpoint konsumsi harus dikonversi ke timestamp UNIX. Data range

    Catatan

    Anda dapat menggunakan mesin pencari untuk mendapatkan konverter timestamp UNIX.

    ConsumerContext.ConsumerSubscribeMode subscribeMode

    Mode Anda menggunakan klien SDK. Nilai valid:

    • ConsumerContext.ConsumerSubscribeMode.ASSIGN: Dalam mode ASSIGN, hanya satu klien SDK dalam kelompok konsumen yang dapat mengonsumsi data yang dilacak.

    • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: Dalam mode SUBSCRIBE, Anda dapat memulai beberapa klien SDK dalam kelompok konsumen secara bersamaan untuk mengimplementasikan pemulihan bencana.

    Tidak tersedia

  5. Di bilah navigasi atas IntelliJ IDEA, pilih Run > Run untuk menjalankan klien.

    Catatan

    Saat menjalankan IntelliJ IDEA untuk pertama kalinya, diperlukan waktu untuk memuat dan menginstal dependensi yang relevan.

    • Gambar berikut menunjukkan hasilnya. Hasil ini menunjukkan bahwa klien SDK dapat melacak perubahan data dari database sumber. Consume data

    • Klien SDK menghitung dan menampilkan informasi tentang data yang dikonsumsi pada interval reguler. Informasi tersebut mencakup jumlah total catatan data yang dikirim dan diterima, jumlah total data, serta jumlah permintaan per detik (RPS). Information about the consumed data

      Tabel 2. Parameter dalam Informasi Tersebut

      Parameter

      Deskripsi

      outCounts

      Jumlah total catatan data yang dikonsumsi oleh klien SDK.

      outBytes

      Jumlah total data yang dikonsumsi oleh klien SDK. Satuan: byte.

      outRps

      RPS di mana klien SDK mengonsumsi data.

      outBps

      Jumlah bit yang ditransmisikan per detik di mana klien SDK mengonsumsi data.

      inBytes

      Jumlah total data yang dikirim oleh server DTS. Satuan: byte.

      DStoreRecordQueue

      Ukuran antrian cache data saat ini ketika server DTS mengirim data.

      inCounts

      Jumlah total catatan data yang dikirim oleh server DTS.

      inRps

      RPS di mana server DTS mengirim data.

      __dt

      Timestamp yang dihasilkan ketika klien SDK menerima data. Satuan: milidetik.

      DefaultUserRecordQueue

      Ukuran antrian cache data saat ini setelah serialisasi.

Simpan dan kueri checkpoint konsumsi

Saat klien SDK dijalankan untuk pertama kali atau dimulai ulang, atau terjadi percobaan ulang internal, Anda harus mengkueri dan menentukan checkpoint konsumsi untuk memulai atau melanjutkan konsumsi data. Tabel berikut menjelaskan cara mengelola dan mengkueri checkpoint konsumsi dalam skenario berbeda. Ini mencegah kehilangan data atau data duplikat dan memungkinkan Anda mengonsumsi data sesuai permintaan.

Skenario

Mode penggunaan klien SDK

Metode kueri

Kueri checkpoint konsumsi

ASSIGN dan SUBSCRIBE

  • Klien SDK menyimpan checkpoint konsumsi setiap 5 detik dan mengirimkan checkpoint konsumsi ke server DTS. Untuk mengkueri checkpoint konsumsi terakhir, Anda dapat menggunakan metode berikut:

    • Temukan file localCheckpointStore server tempat klien SDK berada.

    • Pergi ke halaman Consume Data instance pelacakan perubahan.

  • Jika Anda mengonfigurasi medium penyimpanan bersama persisten eksternal seperti database di parameter setUserRegisteredStore(newUserMetaStore()) dalam file consumerContext.java, medium penyimpanan menyimpan checkpoint konsumsi setiap 5 detik. Anda dapat mengkueri checkpoint konsumsi menggunakan medium penyimpanan.

Saat Anda memulai klien SDK untuk pertama kali, Anda harus menentukan checkpoint konsumsi untuk mengonsumsi data.

ASSIGN dan SUBSCRIBE

Pilih file DTSConsumerAssignDemo.java atau DTSConsumerSubscribeDemo.java berdasarkan mode Anda menggunakan klien SDK. Lalu, tentukan parameter initCheckpoint untuk mengonsumsi data. Untuk informasi lebih lanjut, lihat 3 dan 4.

Saat terjadi percobaan ulang internal, Anda harus menentukan checkpoint konsumsi dari catatan data sebelumnya untuk melanjutkan konsumsi data.

ASSIGN

Lakukan langkah-langkah berikut untuk menemukan checkpoint konsumsi dari catatan data sebelumnya:

  1. Temukan medium penyimpanan eksternal yang Anda konfigurasikan di parameter setUserRegisteredStore(newUserMetaStore()) dalam file consumerContext.java.

  2. Temukan file localCheckpointStore server tempat klien SDK berada.

  3. Temukan timestamp awal yang Anda tentukan di parameter initCheckpoint dalam file DTSConsumerSubscribeDemo.java.

SUBSCRIBE

Lakukan langkah-langkah berikut untuk menemukan checkpoint konsumsi dari catatan data sebelumnya:

  1. Temukan medium penyimpanan eksternal yang Anda konfigurasikan di parameter setUserRegisteredStore(newUserMetaStore()) dalam file consumerContext.java.

  2. Temukan checkpoint konsumsi yang disimpan dari server DTS (DStore).

  3. Temukan timestamp awal yang Anda tentukan di parameter initCheckpoint dalam file DTSConsumerSubscribeDemo.java.

  4. Gunakan checkpoint konsumsi awal dari server DTS (new DStore).

Setelah klien SDK dimulai ulang, Anda harus menentukan checkpoint konsumsi dari catatan data terakhir untuk melanjutkan konsumsi data.

ASSIGN

Periksa pengaturan parameter setForceUseCheckpoint dalam file consumerContext.java dan kueri checkpoint konsumsi.

  • Jika parameter diatur ke true, nilai parameter initCheckpoint digunakan sebagai checkpoint konsumsi setiap kali klien SDK dimulai ulang.

  • Jika parameter diatur ke false atau tidak ditentukan, lakukan langkah-langkah berikut untuk menemukan checkpoint konsumsi dari catatan data sebelumnya:

    1. Temukan file localCheckpointStore server tempat klien SDK berada.

    2. Temukan checkpoint konsumsi yang disimpan dari server DTS (DStore).

    3. Temukan medium penyimpanan eksternal yang Anda konfigurasikan di parameter setUserRegisteredStore(newUserMetaStore()) dalam file consumerContext.java.

SUBSCRIBE

Dalam mode ini, pengaturan parameter setForceUseCheckpoint dalam file consumerContext.java tidak berlaku. Lakukan langkah-langkah berikut untuk menemukan checkpoint konsumsi dari catatan data sebelumnya:

  1. Temukan medium penyimpanan eksternal yang Anda konfigurasikan di parameter setUserRegisteredStore(newUserMetaStore()) dalam file consumerContext.java.

  2. Temukan checkpoint konsumsi yang disimpan dari server DTS (DStore).

  3. Temukan timestamp awal yang Anda tentukan di parameter initCheckpoint dalam file DTSConsumerSubscribeDemo.java.

  4. Gunakan checkpoint konsumsi awal dari server DTS (new DStore).

Pemecahan Masalah

Masalah

Pesan kesalahan

Penyebab

Solusi

Koneksi gagal.

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

Nilai parameter brokerUrl tidak valid.

Masukkan nilai valid untuk parameter brokerUrl, userName, dan password. Untuk informasi lebih lanjut, lihat Tabel berikut menjelaskan parameter yang diperlukan..

telnet real node *** failed, please check the network

Alamat broker tidak dapat dialihkan ke alamat IP asli.

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

Nama pengguna atau kata sandi yang ditentukan tidak valid.

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

Parameter setUseCheckpoint dalam file consumerContext.java diatur ke true, tetapi checkpoint konsumsi tidak berada dalam rentang data instance pelacakan perubahan.

Tentukan checkpoint konsumsi dalam rentang data instance pelacakan perubahan. Untuk informasi lebih lanjut, lihat Tabel berikut menjelaskan parameter yang diperlukan..

Waktu respons konsumsi data meningkat.

Tidak tersedia

Anda dapat menganalisis penyebab dengan mengkueri parameter DStoreRecordQueue dan DefaultUserRecordQueue. Untuk informasi lebih lanjut, lihat Tabel berikut menjelaskan parameter dalam informasi tersebut..

  • Jika nilai parameter DStoreRecordQueue adalah 0, laju server DTS menarik data berkurang.

  • Jika nilai parameter DefaultUserRecordQueue adalah nilai default 512, laju klien SDK mengonsumsi data berkurang.