All Products
Search
Document Center

Data Transmission Service:Gunakan demo SDK untuk mengonsumsi data perubahan dari PolarDB-X 1.0

Last Updated:Jun 21, 2026

Setelah membuat tugas pelacakan perubahan, gunakan demo SDK yang disediakan oleh Data Transmission Service (DTS) untuk mengonsumsi data perubahan yang dihasilkan. Topik ini menjelaskan cara menggunakan demo SDK dengan sumber data terdistribusi seperti PolarDB-X 1.0 dan DMS LogicDB.

Prasyarat

  • JDK 1.8 telah diinstal.

  • IntelliJ IDEA telah diinstal.

Peringatan

Untuk mengonsumsi data perubahan sebagai pengguna RAM, Anda harus memberikan izin AliyunDTSFullAccess dan izin akses ke objek sumber kepada pengguna tersebut. Untuk informasi selengkapnya, lihat Otorisasi pengguna RAM untuk mengelola DTS dan Mengelola izin pengguna RAM.

Prosedur

Topik ini menunjukkan cara menjalankan demo SDK untuk mengonsumsi data perubahan dari instans PolarDB-X 1.0, dengan menggunakan IntelliJ IDEA Community Edition 2020.1 untuk Windows sebagai contoh.

  1. Buat instans pelacakan perubahan. Untuk informasi selengkapnya, lihat Buat tugas pelacakan perubahan untuk PolarDB-X 1.0.

  2. Buat satu atau beberapa kelompok konsumen. Untuk informasi selengkapnya, lihat Buat kelompok konsumen.

  3. Unduh dan ekstrak demo SDK. Untuk tautan unduhan, lihat Kode Demo SDK.

    Penting

    Saat mengonsumsi data perubahan, Anda harus memanggil metode commit dari DefaultUserRecord untuk mengirimkan offset konsumen. Jika tidak, Anda berisiko mengonsumsi data secara berulang.

  4. Buka proyek di IntelliJ IDEA.

    1. Buka IntelliJ IDEA dan klik Open or Import.

    2. Di kotak dialog, arahkan ke direktori tempat Anda mengekstrak demo SDK, buka folder-folder tersebut, lalu klik ganda file pom.xml.

    3. Di kotak dialog yang muncul, pilih Open as Project.

  5. Di IntelliJ IDEA, buka folder-folder tersebut. Berdasarkan mode penggunaan SDK client, pilih dan klik ganda untuk membuka file Java yang sesuai: DistributedDTSConsumerDemo.

    aliyun-dts-subscribe-sdk-java-master [dts-new-subscr...]
      .idea
      src
        main
        test
          java
            com.aliyun.dts.subscribe.clients
              DBMapperTest
              DistributedDTSConsumerDemo
              DTSConsumerAssignDemo
              DTSConsumerSubscribeDemo
              UserMetaStore
      .gitignore
      dts-new-subscribe-sdk.iml
      LICENSE
      pom.xml
      README.md
    External Libraries
    Scratches and Consoles
  6. Atur parameter yang diperlukan dalam file Java.

    public static void main(String[] args) throws ClientException {
            // Konfigurasikan pengaturan pelacakan perubahan untuk sumber data terdistribusi, seperti instans PolarDB-X 1.0. Atur parameter seperti pasangan AccessKey, ID instans, ID tugas, dan kelompok konsumen.
            String accessKeyId = "LTA***********99reZ";
            String accessKeySecret = "****************";
            String regionId = "cn-hangzhou";
            String dtsInstanceId = "dtse5212sed162****";
            String jobId = "l791216x16d****";
            String sid = "dtsip412t13160****";
            String userName = "xftest";
            String password = "******";
            String proxyUrl = "dts-cn-****.com:18001";
            // Offset konsumen awal, berupa timestamp Unix dalam satuan detik. Contoh: 1566180200 untuk Mon Aug 19 10:03:21 CST 2019.
            String checkpoint = "1639620090";
            // Konversi nama database/tabel fisik menjadi nama database/tabel logis
            boolean mapping = true;
            // Atur ke true untuk memaksa klien memulai dari checkpoint yang ditentukan. Pengaturan ulang checkpoint hanya berlaku dalam mode ASSIGN.
            boolean isForceUseInitCheckpoint = false;
            ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN;
            DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId,
                    jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl,
                    checkpoint, isForceUseInitCheckpoint, mapping);
            demo.start();
        }

    Parameter

    Deskripsi

    Cara mendapatkan

    accessKeyId

    ID AccessKey.

    Untuk informasi selengkapnya, lihat Dapatkan pasangan AccessKey.

    accessKeySecret

    Rahasia AccessKey.

    regionId

    ID wilayah dari instans pelacakan perubahan.

    Di Konsol DTS, klik ID instans pelacakan perubahan yang dituju. Di halaman Task Management, temukan informasi wilayahnya. Misalnya, jika wilayahnya China (Hangzhou), atur parameter ini menjadi cn-hangzhou. Untuk daftar wilayah, lihat Daftar wilayah yang didukung.

    dtsInstanceId

    ID instans pelacakan perubahan.

    Di Konsol DTS, klik ID instans pelacakan perubahan yang dituju. Di halaman Task Management, Anda dapat menemukan ID instans dan ID tugas.

    jobId

    ID tugas pelacakan perubahan.

    sid

    ID kelompok konsumen.

    Di Konsol DTS, klik ID instans pelacakan perubahan yang dituju, lalu klik Consume Data. Anda dapat menemukan SID dan Account dari kelompok konsumen.

    Catatan

    Kata sandi untuk akun kelompok konsumen ditentukan saat Anda membuat kelompok konsumen.

    userName

    Akun kelompok konsumen.

    password

    Kata sandi akun kelompok konsumen.

    proxyUrl

    Titik akhir dan port dari instans pelacakan perubahan.

    Catatan
    • Untuk meminimalkan latensi jaringan, gunakan titik akhir internal jika Instance ECS yang menjalankan SDK client berada dalam jaringan klasik atau VPC yang sama dengan instans pelacakan perubahan.

    • Kami tidak merekomendasikan penggunaan titik akhir publik karena potensi ketidakstabilan jaringan.

    Di Konsol DTS, klik ID instans pelacakan perubahan yang dituju. Di halaman Task Management, Anda dapat menemukan titik akhir dan nomor port-nya.

    checkpoint

    Offset konsumen, ditentukan sebagai timestamp Unix dalam satuan detik. SDK client mulai mengonsumsi data dari titik waktu ini.

    Catatan

    Offset konsumen digunakan dalam skenario berikut:

    • Jika aplikasi Anda terganggu, berikan offset konsumen terakhir untuk melanjutkan konsumsi tanpa kehilangan data.

    • Saat klien dimulai, Anda dapat memberikan offset konsumen tertentu untuk mengonsumsi data sesuai kebutuhan.

    Offset konsumen harus berada dalam rentang data instans pelacakan perubahan dan harus berupa timestamp Unix.

    Catatan

    Anda dapat menggunakan mesin pencari untuk menemukan konverter timestamp Unix.

  7. Di bilah menu atas IntelliJ IDEA, pilih Run > Run untuk menjalankan klien.

    Catatan

    Pada jalankan pertama, IntelliJ IDEA secara otomatis menginstal dependensi yang diperlukan, yang mungkin memerlukan waktu.

    • Output menunjukkan bahwa klien dapat mengonsumsi perubahan data dari instans sumber.

    • SDK client secara berkala menampilkan statistik konsumsi, yang mencakup jumlah total dan volume catatan data yang dikirim serta diterima, dan requests per second (RPS).

      Tabel 1. Statistik konsumsi

      Parameter

      Deskripsi

      outCounts

      Jumlah total catatan data yang dikonsumsi oleh SDK client.

      outBytes

      Volume total data yang dikonsumsi oleh SDK client, dalam satuan byte.

      outRps

      Laju konsumsi data SDK client, dalam requests per second (RPS).

      outBps

      Laju konsumsi data SDK client, dalam bits per second (bps).

      count

      Parameter ini dicadangkan.

      inBytes

      Volume total data yang dikirim oleh server DTS, dalam satuan byte.

      DStoreRecordQueue

      Ukuran saat ini dari antrian cache data di server DTS.

      inCounts

      Jumlah total catatan data yang dikirim oleh server DTS.

      inRps

      Laju pengiriman data server DTS, dalam requests per second (RPS).

      inBps

      Laju pengiriman data server DTS, dalam bits per second (bps).

      __dt

      Timestamp saat SDK client menerima data, dalam milidetik.

      DefaultUserRecordQueue

      Ukuran saat ini dari antrian cache data setelah serialisasi.

  8. Opsi: Untuk menyesuaikan cara pemrosesan data yang dikonsumsi, Anda dapat memodifikasi metode buildRecordListener() atau menggunakan kelas kustom.

    public static Map<String, RecordListener> buildRecordListener() {
            // Anda dapat mengimplementasikan listener sendiri.
            RecordListener mysqlRecordPrintListener = new RecordListener() {
                @Override
                public void consume(DefaultUserRecord record) {
                    OperationType operationType = record.getOperationType();
                    if (operationType.equals(OperationType.INSERT)
                            || operationType.equals(OperationType.UPDATE)
                            || operationType.equals(OperationType.DELETE)
                            || operationType.equals(OperationType.HEARTBEAT)) {
                        // Konsumsi catatan tersebut.
                        RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL);
                        recordPrintListener.consume(record);
                        // Metode commit mendorong pembaruan checkpoint.
                        record.commit("");
                    }
                }
            };
            return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener);
        }