All Products
Search
Document Center

Data Transmission Service:Mengonsumsi data langganan menggunakan kode contoh SDK

Last Updated:Jun 17, 2026

Setelah mengonfigurasi instans Data Subscription, gunakan kode contoh SDK yang disediakan oleh Data Transmission Service (DTS) untuk mengonsumsi data perubahan.

Prosedur

Penting

Prosedur berikut menjelaskan cara menjalankan kode contoh SDK untuk mengonsumsi data Langganan Data di IntelliJ IDEA (Community Edition 2020.1 untuk Windows).

  1. Buat instans Data Subscription. Untuk informasi lebih lanjut, lihat Membuat saluran Langganan Data untuk instans ApsaraDB RDS for MySQL, Membuat saluran Langganan Data untuk kluster PolarDB for MySQL, atau Membuat saluran Langganan Data untuk database Oracle.

  2. Buat satu atau beberapa kelompok konsumen. Untuk informasi lebih lanjut, lihat Membuat kelompok konsumen.

    Penting

    Saat mengonsumsi data Langganan Data, Anda harus memanggil metode commit dari DefaultUserRecord untuk melakukan commit checkpoint. Jika tidak, hal ini dapat menyebabkan konsumsi data duplikat.

  3. Gunakan kode contoh SDK sesuai kebutuhan bisnis Anda.

    • Gunakan paket SDK Langganan Data versi baru (disarankan)

      1. Buka IntelliJ IDEA, lalu klik Create New Project untuk membuat proyek bagi aplikasi Anda.

      2. Dalam proyek tersebut, temukan file project object model (POM): pom.xml.

      3. Tambahkan dependensi berikut ke file pom.xml:

        <dependency>
            <groupId>com.aliyun.dts</groupId>
            <artifactId>dts-new-subscribe-sdk</artifactId>
            <version>{dts_new_sdk_version}</version>
        </dependency>
        Catatan

        Anda dapat menemukan dependensi Maven terbaru di halaman dts-new-subscribe-sdk.

      4. Untuk informasi lebih lanjut tentang cara menggunakan SDK langganan versi baru, lihat Gunakan kode contoh.

    • Gunakan versi kustom SDK Langganan Data versi baru

      1. Unduh paket kode contoh SDK, lalu ekstrak.

        Catatan

        Klik code dan pilih Download ZIP untuk mengunduh paket tersebut.

      2. Buka direktori kode contoh SDK yang telah diekstrak. Gunakan editor teks untuk membuka file pom.xml dan perbarui SDK Langganan Data ke versi terbaru.设置SDK版本

        Penting

        Anda dapat memperoleh versi terbaru SDK Langganan Data dari situs web Maven. Untuk informasi lebih lanjut, lihat halaman Maven untuk SDK Langganan Data.

      3. Buka IntelliJ IDEA dan klik Open or Import.打开工程

      4. Pada kotak dialog yang muncul, buka direktori kode contoh SDK yang telah diekstrak, buka folder-folder tersebut, lalu temukan file pom.xml.找到项目对象模型文件

      5. Pada kotak dialog yang muncul, pilih Open as Project.

      6. Di IntelliJ IDEA, buka folder-folder tersebut. Berdasarkan mode penggunaan klien SDK, pilih dan klik ganda file Java yang sesuai: DTSConsumerAssignDemo.java atau DTSConsumerSubscribeDemo.java.java客户端文件

        Catatan

        DTS mendukung mode penggunaan klien SDK berikut:

        • Mode ASSIGN: Untuk memastikan urutan global pesan, DTS hanya menetapkan satu partisi (partisi 0) untuk setiap topik langganan. Saat menggunakan klien SDK dalam mode ASSIGN, kami menyarankan hanya menjalankan satu klien.

        • Mode SUBSCRIBE: Untuk memastikan urutan global pesan, DTS hanya menetapkan satu partisi (partisi 0) untuk setiap topik langganan. Jika Anda menggunakan klien SDK dalam mode SUBSCRIBE, Anda dapat menjalankan beberapa klien SDK dalam satu kelompok konsumen untuk pemulihan bencana. Jika klien aktif gagal, klien SDK lain akan secara otomatis ditetapkan ke partisi 0 untuk melanjutkan konsumsi.

  4. Atur parameter yang diperlukan dalam file Java.

    assigndemo

    Tabel 1. Parameter yang diperlukan

    Parameter

    Deskripsi

    Sumber

    brokerUrl

    Titik akhir dan nomor port instans Data Subscription.

    Catatan
    • Jika instans ECS yang menjalankan klien SDK dan instans Data Subscription berada dalam jaringan klasik atau Virtual Private Cloud (VPC) yang sama, kami menyarankan Anda menggunakan titik akhir internal untuk meminimalkan latensi jaringan.

    • Kami tidak menyarankan menggunakan titik akhir publik karena potensi ketidakstabilan jaringan.

    Di Konsol DTS, klik ID instans Data Subscription yang dituju. Pada halaman Basic Information, Anda dapat memperoleh titik akhir dan nomor port di bagian Network.

    topic

    Topik langganan untuk instans tersebut.

    Di Konsol DTS, klik ID instans Data Subscription yang dituju. Pada halaman Basic Information, Anda dapat memperoleh Topic di bagian Basic Information.

    sid

    ID kelompok konsumen.

    Di Konsol DTS, klik ID instans Data Subscription yang dituju, lalu klik Consume Data. Anda dapat memperoleh Consumer Group ID dan Account kelompok konsumen.

    Catatan

    Kata sandi untuk username kelompok konsumen ditentukan saat Anda membuat kelompok konsumen tersebut.

    userName

    Username untuk kelompok konsumen.

    Peringatan

    Jika Anda tidak menggunakan klien yang disediakan dalam topik ini, Anda harus mengatur username dalam format <Username>-<Consumer Group ID>. Contoh: dtstest-dtsae******bpv. Jika tidak, koneksi akan gagal.

    password

    Kata sandi untuk username tersebut.

    initCheckpoint

    Titik pemeriksaan konsumsi, yang ditentukan sebagai Stempel waktu UNIX, tempat klien SDK mulai mengonsumsi data. Contoh: 1620962769.

    Catatan

    Anda dapat menggunakan informasi titik pemeriksaan konsumsi dalam skenario berikut:

    • Untuk melanjutkan konsumsi dan mencegah kehilangan data setelah gangguan aplikasi, berikan titik pemeriksaan konsumsi terakhir yang diketahui.

    • Saat memulai klien, Anda dapat memberikan titik pemeriksaan konsumsi tertentu untuk mengonsumsi data dari posisi yang diinginkan.

    Titik pemeriksaan konsumsi harus berada dalam rentang data instans Data Subscription (seperti yang ditunjukkan pada gambar) dan harus dikonversi ke Stempel waktu UNIX.

    Catatan

    Anda dapat menggunakan mesin pencari untuk menemukan konverter Stempel waktu UNIX.

    ConsumerContext.ConsumerSubscribeMode subscribeMode

    Mode penggunaan klien SDK. Nilai yang valid:

    • ConsumerContext.ConsumerSubscribeMode.ASSIGN: Mode ASSIGN. Hanya satu klien SDK dalam satu kelompok konsumen yang dapat mengonsumsi data Langganan Data.

    • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: Mode SUBSCRIBE. Anda dapat menjalankan beberapa klien SDK dalam kelompok konsumen yang sama untuk disaster recovery.

    N/A

  5. Di bilah navigasi atas IntelliJ IDEA, pilih Run > Run untuk menjalankan klien.

    Catatan

    Pertama kali menjalankan klien, mungkin memerlukan waktu untuk memuat dan menginstal dependensi yang diperlukan secara otomatis.

    • Gambar berikut menunjukkan hasilnya. Klien berhasil mengonsumsi data perubahan dari database sumber.消费数据

    • Klien SDK secara berkala mengumpulkan dan menampilkan statistik tentang konsumsi data, termasuk jumlah total dan volume catatan yang dikirim serta diterima, dan requests per second (RPS).统计信息

      Tabel 2. Statistik konsumsi data

      Parameter

      Deskripsi

      outCounts

      Jumlah total catatan data yang dikonsumsi oleh klien SDK.

      outBytes

      Volume total data yang dikonsumsi oleh klien SDK. Satuan: byte.

      outRps

      Jumlah requests per second (RPS) saat klien SDK mengonsumsi data.

      outBps

      Laju konsumsi data klien SDK, dalam bit per detik (bps).

      inBytes

      Volume total data yang dikirim oleh server DTS. Satuan: byte.

      DStoreRecordQueue

      Ukuran antrean cache data internal untuk catatan masuk dari server DTS.

      inCounts

      Jumlah total catatan data yang dikirim oleh server DTS.

      inRps

      RPS saat server DTS mengirim data.

      __dt

      Stempel waktu saat klien SDK menerima data. Satuan: milidetik.

      DefaultUserRecordQueue

      Ukuran antrean data yang menyimpan catatan siap diproses oleh aplikasi konsumen.

Menyimpan dan mengkueri titik pemeriksaan konsumsi

Untuk memulai atau melanjutkan konsumsi data (misalnya, saat peluncuran pertama, restart, atau retry internal), klien SDK memerlukan titik pemeriksaan konsumsi. Tabel berikut menjelaskan cara mengelola dan mengkueri checkpoint dalam berbagai skenario untuk mencegah kehilangan data, meminimalkan konsumsi duplikat, dan memungkinkan konsumsi sesuai permintaan.

Skenario

Mode penggunaan SDK

Metode kueri

Mengkueri titik pemeriksaan konsumsi

Mode ASSIGN, mode SUBSCRIBE

  • Klien SDK menyimpan titik pemeriksaan konsumsi setiap 5 detik dan mengirimkannya ke server DTS. Untuk mengkueri titik pemeriksaan konsumsi terbaru, gunakan salah satu metode berikut:

    • Periksa file localCheckpointStore di server tempat klien SDK berjalan.

    • Periksa halaman Consume Data instans Langganan Data.

  • Jika Anda telah mengonfigurasi media penyimpanan bersama persisten eksternal (seperti database) dengan menggunakan setUserRegisteredStore(newUserMetaStore()) dalam file consumerContext.java, media penyimpanan ini menyimpan checkpoint pesan setiap 5 detik untuk Anda kueri.

Startup awal: Memberikan checkpoint untuk memulai konsumsi.

Mode ASSIGN, mode SUBSCRIBE

Berdasarkan mode penggunaan klien SDK, pilih file DTSConsumerAssignDemo.java atau DTSConsumerSubscribeDemo.java, lalu konfigurasikan parameter initCheckpoint untuk memulai konsumsi. Untuk instruksi konfigurasi, lihat langkah 3 dan 4.

Klien SDK perlu memberikan kembali titik pemeriksaan konsumsi terakhir yang dicatat untuk melanjutkan konsumsi setelah retry internal.

Mode ASSIGN

Cari titik pemeriksaan konsumsi terakhir yang dicatat dalam urutan berikut. Pencarian berhenti dan mengembalikan informasi checkpoint segera setelah ditemukan:

  1. Media penyimpanan eksternal yang Anda konfigurasi dengan menggunakan setUserRegisteredStore(newUserMetaStore()) dalam file consumerContext.java.

  2. File localCheckpointStore di server tempat klien SDK berjalan.

  3. Stempel waktu mulai yang Anda berikan melalui parameter initCheckpoint dalam file DTSConsumerSubscribeDemo.java.

Mode SUBSCRIBE

Cari titik pemeriksaan konsumsi terakhir yang dicatat dalam urutan berikut. Pencarian berhenti dan mengembalikan informasi checkpoint segera setelah ditemukan:

  1. Media penyimpanan eksternal yang Anda konfigurasi dengan menggunakan setUserRegisteredStore(newUserMetaStore()) dalam file consumerContext.java.

  2. Checkpoint yang disimpan di server DTS (modul pengumpulan data inkremental).

    Catatan

    Offset ini hanya diperbarui setelah klien SDK memanggil metode commit untuk memperbarui offset konsumen.

  3. Stempel waktu mulai yang Anda berikan melalui parameter initCheckpoint dalam file DTSConsumerSubscribeDemo.java.

  4. Checkpoint awal server DTS (modul pengumpulan data inkremental baru).

    Penting

    Jika terjadi alih bencana modul pengumpulan data inkremental, modul baru tidak menyimpan titik pemeriksaan konsumsi terakhir dari klien. Hal ini dapat menyebabkan klien mulai mengonsumsi data dari checkpoint yang lebih lama. Kami menyarankan agar Anda menyimpan titik pemeriksaan konsumsi secara persisten di sisi klien. Untuk informasi lebih lanjut, lihat Menyimpan titik pemeriksaan konsumsi secara persisten.

Klien SDK telah direstart dan perlu memberikan kembali titik pemeriksaan konsumsi terakhir yang dicatat untuk melanjutkan konsumsi.

Mode ASSIGN

Kueri titik pemeriksaan konsumsi berdasarkan konfigurasi setForceUseCheckpoint dalam file consumerContext.java. Pencarian berhenti dan mengembalikan informasi checkpoint segera setelah ditemukan:

  • Jika diatur ke true, initCheckpoint yang diberikan dipaksakan digunakan sebagai titik pemeriksaan konsumsi setiap kali klien SDK direstart.

  • Jika diatur ke false atau tidak dikonfigurasi, cari titik pemeriksaan konsumsi terakhir yang dicatat dalam urutan berikut:

    1. File localCheckpointStore di server tempat klien SDK berjalan.

    2. Checkpoint yang disimpan di server DTS (modul pengumpulan data inkremental).

      Catatan

      Offset ini hanya diperbarui setelah klien SDK memanggil metode commit untuk memperbarui offset konsumen.

    3. Media penyimpanan eksternal yang Anda konfigurasi dengan menggunakan setUserRegisteredStore(newUserMetaStore()) dalam file consumerContext.java.

Mode SUBSCRIBE

Dalam mode ini, konfigurasi setForceUseCheckpoint dalam file consumerContext.java tidak berpengaruh. Cari titik pemeriksaan konsumsi terakhir yang dicatat dalam urutan berikut:

  1. Media penyimpanan eksternal yang Anda konfigurasi dengan menggunakan setUserRegisteredStore(newUserMetaStore()) dalam file consumerContext.java.

  2. Checkpoint yang disimpan di server DTS (modul pengumpulan data inkremental).

    Catatan

    Offset ini hanya diperbarui setelah klien SDK memanggil metode commit untuk memperbarui offset konsumen.

  3. Stempel waktu mulai yang Anda berikan melalui parameter initCheckpoint dalam file DTSConsumerSubscribeDemo.java.

  4. Checkpoint awal server DTS (modul pengumpulan data inkremental baru).

Menyimpan titik pemeriksaan konsumsi secara persisten

Saat terjadi kejadian pemulihan bencana pada modul pengumpulan data inkremental (terutama dalam mode SUBSCRIBE), modul baru tidak menyimpan titik pemeriksaan konsumsi terbaru dari klien. Klien mungkin melanjutkan dari checkpoint yang lebih lama, sehingga menyebabkan konsumsi duplikat data historis. Misalnya, sebelum alih bencana, rentang checkpoint modul lama adalah dari 08:00:00 pada 11 November 2023 hingga 08:00:00 pada 12 November 2023, dan checkpoint klien adalah 08:00:00 pada 12 November 2023. Setelah alih bencana, rentang checkpoint modul baru adalah dari 10:00:00 pada 8 November 2023 hingga 08:01:00 pada 12 November 2023. Klien mulai dari checkpoint awal modul baru (10:00:00 pada 8 November 2023), sehingga menyebabkan konsumsi duplikat.

Untuk menghindari konsumsi duplikat dalam skenario ini, konfigurasikan penyimpanan checkpoint persisten di sisi klien. Contoh berikut memberikan salah satu implementasi yang mungkin yang dapat Anda sesuaikan dengan kebutuhan Anda.

  1. Buat kelas UserMetaStore yang memperluas AbstractUserMetaStore.

    Sebagai contoh, untuk menyimpan informasi checkpoint dalam database MySQL, gunakan kode Java berikut:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
    			  String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
    			  String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
    						ResultSet rs = pres.executeQuery()
                              
                String checkpoint = rs.getString("checkpoint");
              
                return checkpoint;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    }
    
  2. Dalam file consumerContext.java, konfigurasikan media penyimpanan eksternal dengan menggunakan metode setUserRegisteredStore(new UserMetaStore()).

FAQ

  • Bagaimana cara mengatasi masalah koneksi dengan instans Langganan Data?

    Atasi masalah tersebut berdasarkan pesan error. Untuk informasi lebih lanjut, lihat Pemecahan Masalah.

  • Dalam format apa titik pemeriksaan konsumsi disimpan secara persisten?

    Data titik pemeriksaan konsumsi yang disimpan secara persisten disimpan dalam format JSON. Titik pemeriksaan yang disimpan adalah Stempel waktu UNIX yang dapat langsung diberikan ke SDK. Dalam contoh tanggapan berikut, nilai 1700709977 untuk kunci "timestamp" adalah titik pemeriksaan konsumsi yang disimpan secara persisten.

    {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}

Pemecahan Masalah

Masalah

Pesan error

Penyebab

Solusi

Tidak dapat terhubung

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

brokerUrl yang ditentukan salah.

Masukkan nilai yang benar untuk parameter brokerUrl, userName, dan password. Untuk informasi lebih lanjut, lihat Parameter yang diperlukan.

telnet real node *** failed, please check the network

Alamat broker tidak dapat terhubung ke alamat IP aktual.

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

Username atau kata sandi salah.

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

Dalam file consumerContext.java, parameter setUseCheckpoint diatur ke true, tetapi titik pemeriksaan konsumsi tidak berada dalam rentang data instans Langganan Data (seperti yang ditunjukkan pada gambar).

Berikan titik pemeriksaan konsumsi yang berada dalam rentang data instans Langganan Data. Untuk informasi lebih lanjut, lihat Parameter yang diperlukan.

Konsumsi melambat

N/A

  • Periksa ukuran DStoreRecordQueue dan DefaultUserRecordQueue dalam statistik untuk mengidentifikasi bottleneck. Untuk informasi lebih lanjut, lihat Statistik konsumsi data.

    • Jika DStoreRecordQueue tetap 0, hal ini menunjukkan bahwa server DTS menarik data secara lambat.

    • Jika DefaultUserRecordQueue secara konsisten mendekati kapasitasnya (default adalah 512), klien SDK mengonsumsi data terlalu lambat.

  • Berdasarkan kebutuhan bisnis Anda, ubah titik pemeriksaan konsumsi (initCheckpoint) dalam kode untuk mengatur ulang checkpoint.