全部产品
Search
文档中心

Data Transmission Service:Mengonsumsi data yang telah berlangganan menggunakan SDK

更新时间:Jan 24, 2026

Setelah mengonfigurasi saluran pelacakan perubahan dengan membuat tugas pelacakan dan kelompok konsumen, Anda dapat menggunakan kit pengembangan perangkat lunak (SDK) yang disediakan oleh DTS untuk mengonsumsi data yang telah berlangganan. Topik ini menjelaskan cara menggunakan kode contoh.

Catatan

Prasyarat

Perhatian

  • Saat mengonsumsi data yang telah berlangganan, Anda harus memanggil metode commit dari DefaultUserRecord untuk melakukan commit informasi offset. Jika tidak, data mungkin dikonsumsi berulang kali.

  • Proses konsumsi yang berbeda bersifat independen satu sama lain.

  • Di konsol, Current Offset menunjukkan offset yang dilanggan oleh pelacakan tugas, bukan offset yang dikomit oleh klien.

Prosedur

  1. Unduh file kode SDK contoh dan ekstrak paketnya.

  2. Verifikasi versi kode SDK.

    1. Buka direktori tempat Anda mengekstrak kode SDK contoh.

    2. Gunakan editor teks untuk membuka file pom.xml di direktori tersebut.

    3. Perbarui SDK pelacakan perubahan ke versi terbaru.

      Catatan

      Anda dapat menemukan dependensi Maven terbaru di halaman dts-new-subscribe-sdk.

      Lokasi parameter versi SDK (klik untuk memperluas)

      <name>dts-new-subscribe-sdk</name>
      <url>https://www.aliyun.com/product/dts</url>
      <description>The Aliyun new Subscribe SDK for Java used for accessing Data Transmission Service</description>
      <packaging>jar</packaging>
      <groupId>com.aliyun.dts</groupId>
      <artifactId>dts-new-subscribe-sdk</artifactId>
      <version>2.1.4</version>
  3. Edit kode SDK.

    1. Gunakan integrated development environment (IDE) untuk membuka file yang telah diekstrak.

    2. Buka file Java yang sesuai dengan mode penggunaan klien SDK Anda.

      Catatan

      Jalur file Java adalah aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/.

      Mode penggunaan

      File Java

      Deskripsi

      Skenario

      ASSIGN mode

      DTSConsumerAssignDemo.java

      Untuk memastikan urutan pesan global, DTS hanya menetapkan satu partisi (partisi 0) untuk setiap topik pelacakan. Jika Anda menggunakan klien SDK dalam ASSIGN mode, jalankan hanya satu klien SDK.

      Hanya satu klien SDK dalam kelompok konsumen yang mengonsumsi data yang telah berlangganan.

      SUBSCRIBE mode

      DTSConsumerSubscribeDemo.java

      Untuk memastikan urutan pesan global, DTS hanya menetapkan satu partisi (partisi 0) untuk setiap topik pelacakan. Jika Anda menggunakan klien SDK dalam SUBSCRIBE mode, Anda dapat menjalankan beberapa klien SDK dalam kelompok konsumen yang sama untuk disaster recovery. Jika klien yang sedang mengonsumsi data gagal, klien SDK lain akan secara acak dan otomatis ditetapkan ke partisi 0 untuk melanjutkan konsumsi.

      Beberapa klien SDK dalam kelompok konsumen yang sama mengonsumsi data yang telah berlangganan. Ini merupakan skenario pemulihan bencana data.

    3. Atur parameter dalam kode Java.

      Kode contoh

      ******        
          public static void main(String[] args) {
              // URL broker Kafka.
              String brokerUrl = "dts-cn-***.com:18001";
              // Topik tempat mengonsumsi data. Partisinya adalah 0.
              String topic = "cn_***_version2";
              // Username, password, dan SID untuk autentikasi.
              String sid = "dts***";
              String userName = "dts***";
              String password = "DTS***";
              // Checkpoint awal untuk pencarian pertama. Ini adalah stempel waktu UNIX. Misalnya, atur parameter ini ke 1566180200 jika Anda ingin memulai konsumsi dari pukul 10:03:21 (CST) pada Senin, 19 Agustus 2019.
              String initCheckpoint = "1740472***";
              // Jika Anda menggunakan mode SUBSCRIBE, Anda harus mengonfigurasi group. Grup konsumen Kafka diaktifkan.
              ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE;
        
              DTSConsumerSubscribeDemo consumerDemo = new DTSConsumerSubscribeDemo(brokerUrl, topic, sid, userName, password, initCheckpoint, subscribeMode);
              consumerDemo.start();
          }
      ******

      Parameter

      Deskripsi

      Cara mendapatkan

      brokerUrl

      Menentukan alamat jaringan dan nomor port saluran pelacakan perubahan.

      Catatan
      • Jika server tempat Anda men-deploy klien SDK, seperti Instance ECS, dan instansi pelacakan perubahan berada dalam virtual private cloud (VPC) yang sama, Anda dapat mengonsumsi data melalui VPC untuk mengurangi latensi jaringan.

      • Kami tidak merekomendasikan penggunaan titik akhir publik karena potensi ketidakstabilan jaringan.

      Di Konsol DTS, klik ID instansi langganan target. Di halaman Basic Information, dapatkan alamat jaringan dan nomor port dari bagian Network.

      topic

      Topik saluran pelacakan perubahan.

      Di Konsol DTS, klik ID instansi langganan target. Di halaman Basic Information, buka bagian Basic Information dan dapatkan Topic.

      sid

      ID kelompok konsumen.

      Di Konsol DTS, klik ID instansi langganan target. Di halaman Consume Data, dapatkan Consumer Group ID/Name dan Account.

      userName

      Username kelompok konsumen.

      Peringatan

      Jika Anda tidak menggunakan klien yang disediakan dalam topik ini, atur username dalam format <consumer group username>-<consumer group ID> (misalnya, dtstest-dtsae******bpv). Jika tidak, koneksi akan gagal.

      password

      Password untuk akun tersebut.

      Password yang Anda tetapkan untuk username kelompok konsumen saat membuat kelompok konsumen.

      initCheckpoint

      Offset konsumen. Ini adalah stempel waktu UNIX dari catatan data pertama yang akan dikonsumsi oleh klien SDK, misalnya 1620962769.

      Catatan

      Anda dapat menggunakan offset konsumen untuk:

      • Melanjutkan konsumsi data dari offset tertentu setelah aplikasi Anda terganggu guna mencegah kehilangan data.

      • Menyesuaikan offset awal untuk konsumsi data sesuai kebutuhan.

      Offset konsumen harus berada dalam rentang waktu instansi pelacakan dan harus dikonversi ke stempel waktu UNIX.

      Catatan

      Kolom Data Range dalam daftar tugas pelacakan menunjukkan rentang waktu instansi langganan target.

      subscribeMode

      Mode penggunaan klien SDK. Anda tidak perlu mengubah parameter ini.

      • ConsumerContext.ConsumerSubscribeMode.ASSIGN: ASSIGN mode.

      • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: SUBSCRIBE mode.

      N/A

  4. Buka struktur proyek di IDE Anda dan pastikan versi OpenJDK untuk proyek tersebut adalah 1.8.

  5. Jalankan kode klien.

    Catatan

    Saat menjalankan kode untuk pertama kalinya, IDE memerlukan waktu untuk memuat plugin dan dependensi yang diperlukan secara otomatis.

    Hasil contoh (klik untuk memperluas)

    Hasil eksekusi normal

    Jika hasil berikut dikembalikan, klien berjalan dengan benar dan siap berlangganan perubahan data dari database sumber.

    ******
    [2025-02-25 18:47:22.991] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.consumer.KafkaConsumer:1587] - [Consumer clientId=consumer-dtsl5vy2ao5250****-1, groupId=dtsl5vy2ao5250****] Seeking to offset 8200 for partition cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0
    [2025-02-25 18:47:22.993] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap:116] - RecordFetcher consumer:  subscribe for [cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0] with checkpoint [Checkpoint[ topicPartition: cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0timestamp: 174048****, offset: 8200, info: 174048****]] start
    [2025-02-25 18:47:23.011] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174048044****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8200]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8201]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    ******

    Hasil langganan normal

    Jika hasil berikut dikembalikan, klien berhasil berlangganan perubahan data (operasi UPDATE) dari database sumber.

    ******
    [2025-02-25 18:48:24.905] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8413]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [UPDATE]
    Schema info [{, 
    recordFields= [{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}, {fieldName='name', rawDataTypeNum=253, isPrimaryKey=false, isUniqueKey=false, fieldPosition=1}], 
    databaseName='dtsdb', 
    tableName='person', 
    primaryIndexInfo [[indexType=PrimaryKey, indexFields=[{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}], cardinality=0, nullable=true, isFirstUniqueIndex=false, name=null]], 
    uniqueIndexInfo [[]], 
    partitionFields = null}]
    Before image {[Field [id] [3]
    Field [name] [test1]
    ]}
    After image {[Field [id] [3]
    Field [name] [test2]
    ]}
    ******

    Hasil eksekusi abnormal

    Jika hasil berikut dikembalikan, klien tidak dapat terhubung ke database sumber.

    ******
    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:22.002] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****-1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:22.509] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    [2025-02-25 18:22:23.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":1740478943160,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:27.192] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:27.618] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    ******

    Klien SDK secara berkala mengumpulkan dan menampilkan statistik tentang konsumsi data. Statistik ini mencakup jumlah total catatan data yang dikirim dan diterima, volume data total, serta jumlah catatan per detik (RPS).

    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}

    Parameter

    Deskripsi

    outCounts

    Jumlah total catatan data yang dikonsumsi oleh klien SDK.

    outBytes

    Volume total data yang dikonsumsi oleh klien SDK, dalam byte.

    outRps

    Jumlah permintaan per detik untuk konsumsi data oleh klien SDK.

    outBps

    Jumlah bit yang ditransmisikan per detik untuk konsumsi data oleh klien SDK.

    count

    Jumlah total parameter dalam informasi konsumsi data (metrik).

    Catatan

    Tidak termasuk count itu sendiri.

    inBytes

    Volume total data yang dikirim oleh server DTS, dalam byte.

    DStoreRecordQueue

    Ukuran antrian cache data saat server DTS mengirim data.

    inCounts

    Jumlah total catatan data yang dikirim oleh server DTS.

    inBps

    Jumlah bit yang ditransmisikan per detik saat server DTS mengirim data.

    inRps

    Jumlah permintaan per detik saat server DTS mengirim data.

    __dt

    Stempel waktu saat klien SDK menerima data, dalam milidetik.

    DefaultUserRecordQueue

    Ukuran antrian cache data setelah serialisasi.

  6. Edit kode sesuai kebutuhan untuk mengonsumsi data yang telah berlangganan.

    Saat mengonsumsi data yang telah berlangganan, Anda perlu mengelola offset konsumen guna mencegah kehilangan data, meminimalkan duplikasi data, dan memungkinkan konsumsi sesuai permintaan.

FAQ

  • Apa yang harus saya lakukan jika tidak dapat terhubung ke instansi langganan?

    Pecahkan masalah berdasarkan pesan error. Untuk informasi selengkapnya, lihat Pemecahan Masalah.

  • Apa format data offset konsumen setelah dipersistensi?

    Setelah offset konsumen dipersistensi, data dikembalikan dalam format JSON. Offset konsumen yang dipersistensi adalah stempel waktu Unix yang dapat Anda teruskan langsung ke SDK. Misalnya, dalam data yang dikembalikan, nilai 1700709977 untuk kunci "timestamp" adalah offset konsumen yang dipersistensi.

    {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}
  • Apakah tugas pelacakan dapat dikonsumsi oleh beberapa klien secara paralel?

    Tidak. Meskipun mode SUBSCRIBE memungkinkan beberapa klien berjalan secara paralel, hanya satu klien yang dapat mengonsumsi data dalam satu waktu.

  • Versi klien Kafka mana yang dienkapsulasi dalam kode SDK?

    Versi 2.0.0 dan yang lebih baru dari dts-new-subscribe-sdk mengenkapsulasi klien Kafka (kafka-clients) 2.7.0. Versi sebelum 2.0.0 mengenkapsulasi klien Kafka 1.0.0.

    Catatan

    Jika Anda menggunakan tool deteksi kerentanan paket dependensi dalam proses pengembangan aplikasi Anda dan menemukan bahwa klien Kafka (kafka-clients) yang dienkapsulasi oleh dts-new-subscribe-sdk memiliki kerentanan keamanan, Anda dapat mengatasi kerentanan ini dengan mengganti klien tersebut menggunakan versi 2.1.4-shaded.

    <dependency>
        <groupId>com.aliyun.dts</groupId>
        <artifactId>dts-new-subscribe-sdk</artifactId>
        <version>2.1.4-shaded</version>
    </dependency>

Lampiran

Mengelola offset konsumen

Saat klien SDK pertama kali dijalankan, di-restart, atau melakukan retry internal, Anda harus menanyakan dan meneruskan offset konsumen untuk memulai atau melanjutkan konsumsi data. Offset konsumen adalah stempel waktu UNIX dari catatan data pertama yang akan dikonsumsi oleh klien SDK.

Untuk mengatur ulang offset konsumen klien, Anda dapat menanyakan dan memodifikasi offset konsumen berdasarkan mode konsumsi (mode penggunaan SDK) seperti yang dijelaskan dalam tabel berikut.

Skenario

Mode penggunaan SDK

Metode pengelolaan offset

Menanyakan offset konsumen

ASSIGN mode, SUBSCRIBE mode

  • Karena klien SDK menyimpan offset pesan setiap 5 detik dan melakukan commit ke server DTS, Anda dapat menanyakan offset konsumen terakhir dari salah satu lokasi berikut:

    • File localCheckpointStore di server tempat klien SDK berada.

    • Halaman Data Consumption saluran pelacakan perubahan.

  • Jika Anda telah mengonfigurasi media penyimpanan bersama persisten eksternal, seperti database, dalam file consumerContext.java menggunakan setUserRegisteredStore(new UserMetaStore()), media penyimpanan ini menyimpan offset pesan setiap 5 detik untuk Anda tanyakan.

Klien SDK dijalankan pertama kali. Anda harus meneruskan offset konsumen untuk mengonsumsi data.

ASSIGN mode, SUBSCRIBE mode

Bergantung pada pola penggunaan klien SDK, pilih file Java DTSConsumerAssignDemo.java atau DTSConsumerSubscribeDemo.java, dan konfigurasikan offset konsumen (initCheckpoint) untuk mengonsumsi data.

Klien SDK melakukan retry internal. Anda harus meneruskan offset konsumen terakhir yang dicatat untuk melanjutkan konsumsi data.

ASSIGN mode

Cari offset konsumen terakhir yang dicatat dalam urutan berikut. Jika offset ditemukan, informasi offset tersebut dikembalikan:

  1. Media penyimpanan eksternal yang Anda konfigurasikan menggunakan setUserRegisteredStore(new UserMetaStore()) dalam file consumerContext.java.

  2. File localCheckpointStore di server tempat klien SDK berada.

  3. Stempel waktu mulai yang Anda teruskan ke initCheckpoint dalam file DTSConsumerSubscribeDemo.java.

SUBSCRIBE mode

Cari offset konsumen terakhir yang dicatat dalam urutan berikut. Jika offset ditemukan, informasi offset tersebut dikembalikan:

  1. Media penyimpanan eksternal yang Anda konfigurasikan dalam file consumerContext.java menggunakan setUserRegisteredStore(new UserMetaStore()).

  2. Offset yang disimpan di Server DTS (modul ingestion data inkremental).

    Catatan

    Offset ini hanya diperbarui setelah klien SDK memanggil metode commit untuk memperbarui offset konsumen.

  3. Stempel waktu mulai yang Anda teruskan ke initCheckpoint dalam file DTSConsumerSubscribeDemo.java.

  4. Offset awal Server DTS (modul ingestion data inkremental baru).

    Penting

    Jika modul ingestion data inkremental dialihkan, modul baru tidak dapat menyimpan offset konsumen terakhir klien. Hal ini dapat menyebabkan konsumsi data dimulai dari offset yang lebih lama. Kami merekomendasikan agar Anda menyimpan offset konsumen secara persisten di klien.

Klien SDK di-restart. Anda harus meneruskan offset konsumen terakhir yang dicatat untuk melanjutkan konsumsi data.

ASSIGN mode

Berdasarkan konfigurasi setForceUseCheckpoint dalam file consumerContext.java, offset konsumen ditanyakan, dan jika ditemukan, informasi offset tersebut dikembalikan:

  • Jika diatur ke true, klien SDK akan menggunakan initCheckpoint yang diteruskan sebagai offset konsumen setiap kali di-restart.

  • Jika dikonfigurasi sebagai false atau tidak dikonfigurasi, cari offset konsumen catatan sebelumnya dalam urutan berikut:

    1. File localCheckpointStore di server tempat klien SDK berada.

    2. Offset yang disimpan di Server DTS (modul ingestion data inkremental).

      Catatan

      Offset ini hanya diperbarui setelah klien SDK memanggil metode commit untuk memperbarui offset konsumen.

    3. Media penyimpanan eksternal yang Anda konfigurasikan dalam file consumerContext.java menggunakan setUserRegisteredStore(new UserMetaStore()).

SUBSCRIBE mode

Dalam mode ini, dalam file consumerContext.java, konfigurasi setForceUseCheckpoint tidak berlaku. Cari offset konsumen catatan sebelumnya dalam urutan berikut:

  1. Media penyimpanan eksternal yang dikonfigurasikan dalam file consumerContext.java dengan setUserRegisteredStore(new UserMetaStore()).

  2. Offset yang disimpan di Server DTS (modul ingestion data inkremental).

    Catatan

    Offset ini hanya diperbarui setelah klien SDK memanggil metode commit untuk memperbarui offset konsumen.

  3. Stempel waktu mulai yang Anda teruskan ke initCheckpoint dalam file DTSConsumerSubscribeDemo.java.

  4. Offset awal Server DTS (modul ingestion data inkremental baru).

Menyimpan offset konsumen dengan persistensi

Jika terjadi alih bencana pada modul ingestion data inkremental, modul baru tidak dapat menyimpan offset konsumen terakhir klien. Hal ini terutama berlaku dalam mode SUBSCRIBE. Akibatnya, klien mungkin mulai mengonsumsi data dari offset yang lebih awal, menyebabkan konsumsi berulang data historis. Misalnya, asumsikan bahwa sebelum alih layanan, rentang offset modul lama adalah dari pukul 08:00:00 pada 11 November 2023 hingga 08:00:00 pada 12 November 2023, dan offset konsumen klien adalah 08:00:00 pada 12 November 2023. Setelah alih layanan, rentang offset modul baru adalah dari pukul 10:00:00 pada 8 November 2023 hingga 08:01:00 pada 12 November 2023. Dalam skenario ini, klien memulai konsumsi dari offset awal modul baru (10:00:00 pada 8 November 2023), yang mengakibatkan konsumsi data berulang.

Untuk mencegah konsumsi berulang data historis dalam skenario alih layanan ini, kami merekomendasikan agar Anda mengonfigurasi metode penyimpanan persisten untuk offset konsumen di klien. Metode contoh berikut disediakan sebagai referensi. Anda dapat memodifikasinya sesuai kebutuhan.

  1. Buat metode UserMetaStore() yang mewarisi dan mengimplementasikan metode AbstractUserMetaStore().

    Sebagai contoh, Anda dapat menggunakan database MySQL untuk menyimpan informasi offset. Kode Java contoh berikut menunjukkan cara melakukannya:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
            String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
            String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                rs = pres.executeQuery();
                              
                if (rs.next()) {
                    String checkpoint = rs.getString("checkpoint");
                    return checkpoint;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
            return null;
        }
    }
    
  2. Dalam file consumerContext.java, panggil metode setUserRegisteredStore(new UserMetaStore()) untuk mengonfigurasi media penyimpanan eksternal.

Pemecahan Masalah

Pengecualian

Pesan error

Penyebab

Solusi

Kegagalan koneksi

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

brokerUrl salah.

Masukkan brokerUrl, userName, dan password yang benar. Untuk informasi selengkapnya, lihat Deskripsi parameter.

telnet real node *** failed, please check the network

Anda tidak dapat terhubung ke alamat IP nyata menggunakan alamat broker.

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

Username atau password salah.

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

Dalam file consumerContext.java, setUseCheckpoint diatur ke true, tetapi offset konsumen tidak berada dalam rentang waktu instansi langganan.

Masukkan offset konsumen yang berada dalam rentang waktu instansi langganan. Untuk metode penanyaan, lihat Deskripsi parameter.

Perlambatan konsumsi langganan

N/A

  • Analisis penyebab perlambatan konsumsi data dengan menanyakan parameter dalam informasi statistik untuk ukuran antrian DStoreRecordQueue dan DefaultUserRecordQueue.

    • Jika nilai DStoreRecordQueue tetap 0, kecepatan server DTS menarik data lambat.

    • Jika nilai DefaultUserRecordQueue tetap pada nilai default 512, kecepatan klien SDK mengonsumsi data lambat.

  • Atur ulang offset dengan memodifikasi offset konsumen (initCheckpoint) dalam kode sesuai kebutuhan.