全部产品
Search
文档中心

Data Transmission Service:Gunakan kode demo SDK untuk mengonsumsi data terlacak dari instance PolarDB-X 1.0

更新时间:Jul 02, 2025

Setelah mengonfigurasi tugas pelacakan perubahan, Anda dapat menggunakan kode demo SDK yang disediakan oleh Data Transmission Service (DTS) untuk mengonsumsi data terlacak. Topik ini menjelaskan cara menggunakan kode demo SDK untuk mengonsumsi data terlacak dari database terdistribusi seperti PolarDB-X 1.0 dan Data Management (DMS) LogicDB.

Prasyarat

  • Java Development Kit (JDK) versi 1.8 telah diinstal.
  • IntelliJ IDEA telah diinstal.

Perhatian

Jika Anda menggunakan pengguna Resource Access Management (RAM) untuk melacak data, pengguna RAM harus memiliki izin AliyunDTSFullAccess dan izin untuk mengakses objek sumber. Untuk informasi lebih lanjut, lihat Gunakan kebijakan sistem untuk memberi otorisasi kepada pengguna RAM mengelola instance DTS dan Berikan izin kepada pengguna RAM.

Prosedur

Topik ini menjelaskan cara menggunakan kode demo SDK untuk mengonsumsi data terlacak dari instance PolarDB-X 1.0. IntelliJ IDEA (Community Edition 2020.1 untuk Windows) digunakan dalam contoh ini.

  1. Buat instance pelacakan perubahan. Untuk informasi lebih lanjut, lihat Lacak perubahan data dari instance PolarDB-X 1.0.
  2. Buat satu atau beberapa grup konsumen. Untuk informasi lebih lanjut, lihat Buat grup konsumen.
  3. Unduh paket kode demo SDK dan ekstrak paket tersebut.
  4. Buka proyek tujuan di IntelliJ IDEA.
    1. Buka IntelliJ IDEA. Di jendela yang muncul, klik Open or Import.
      Open a project
    2. Di kotak dialog yang ditampilkan, masuk ke direktori tempat paket diekstraksi. Kemudian, buka folder dan klik dua kali file pom.xml.
      pom.xml
    3. Di kotak dialog yang muncul, pilih Open as Project.
  5. Di jendela IntelliJ IDEA, perluas folder untuk menemukan file Java. Kemudian, klik dua kali file Java berdasarkan mode penggunaan SDK client.Dalam skenario ini, pilih DistributedDTSConsumerDemo.
    java
  6. Atur parameter yang diperlukan di dalam kode file Java.
    public static void main(String[] args) throws ClientException {
            // Konfigurasikan tugas pelacakan perubahan untuk database terdistribusi seperti instance PolarDB-X 1.0. Tentukan parameter terkait AccessKey, instance DTS, pekerjaan DTS, dan grup konsumen.
            String accessKeyId = "LTA***********99reZ";
            String accessKeySecret = "****************";
            String regionId = "cn-hangzhou";
            String dtsInstanceId = "dtse5212sed162****";
            String jobId = "l791216x16d****";
            String sid = "dtsip412t13160****";
            String userName = "xftest";
            String password = "******";
            String proxyUrl = "dts-cn-****.com:18001";
            // checkpoint awal untuk pencarian pertama (timestamp yang ditentukan, misalnya 1566180200 jika Anda ingin (Sen, 19 Agustus 10:03:21 CST 2019))
            String checkpoint = "1639620090";
    
            // Ubah nama database/tabel fisik menjadi nama database/tabel logis
            boolean mapping = true;
            // jika memaksa menggunakan checkpoint konfigurasi saat mulai. untuk reset checkpoint, hanya mode assign yang bekerja
            boolean isForceUseInitCheckpoint = false;
    
            ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN;
            DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId,
                    jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl,
                    checkpoint, isForceUseInitCheckpoint, mapping);
            demo.start();
        }
    ParameterDeskripsiCara mendapatkan
    accessKeyIdID AccessKey.Untuk informasi lebih lanjut tentang cara mendapatkan pasangan AccessKey, lihat Buat dan dapatkan pasangan AccessKey.
    accessKeySecretRahasia AccessKey.
    regionIdID wilayah tempat instance pelacakan perubahan berada.Di konsol DTS baru, klik ID instance. Di halaman Task Management, Anda bisa mendapatkan informasi wilayah instance. Misalnya, jika instance berada di wilayah China (Hangzhou), atur parameter ke cn-hangzhou. Untuk informasi lebih lanjut, lihat Daftar wilayah yang didukung.
    dtsInstanceIdID instance pelacakan perubahan.Di konsol DTS baru, klik ID instance. Di halaman Task Management, Anda bisa mendapatkan ID instance dan ID tugas.
    jobIdID tugas pelacakan perubahan.
    sidID grup konsumen.Di konsol DTS baru, klik ID instance. Di panel navigasi di sebelah kiri, klik Consume Data. Anda bisa mendapatkan Sid dan Account yang sesuai.
    Catatan Kata sandi akun grup konsumen secara otomatis ditentukan saat Anda membuat grup konsumen.
    userNameAkun grup konsumen.
    passwordKata sandi yang sesuai dengan akun grup konsumen.
    proxyUrlTitik akhir dan nomor port instance pelacakan perubahan.
    Catatan Jika Anda melacak perubahan data melalui jaringan internal, latensi jaringan minimal. Ini berlaku jika instance ECS tempat Anda menyebarkan klien SDK termasuk dalam jaringan klasik atau virtual private cloud (VPC) yang sama dengan instance pelacakan perubahan.
    Di konsol DTS baru, klik ID instance. Di halaman Task Management, Anda bisa mendapatkan titik akhir dan nomor port.
    checkpointOffset konsumen. Ini adalah timestamp ketika klien SDK mengonsumsi catatan data pertama. Nilainya adalah timestamp UNIX dalam detik.
    Catatan Offset konsumen dapat digunakan dalam skenario berikut:
    • Setelah proses konsumsi terganggu, Anda dapat menentukan offset konsumen untuk melanjutkan konsumsi data. Ini memungkinkan Anda mencegah kehilangan data.
    • Saat Anda memulai klien pelacakan perubahan, Anda dapat menentukan offset konsumen untuk mengonsumsi data sesuai permintaan.
    Offset konsumen harus berada dalam rentang data instance pelacakan perubahan. Offset konsumen harus dikonversi menjadi timestamp UNIX.
    Catatan Anda dapat menggunakan mesin pencari untuk mendapatkan konverter timestamp UNIX.
  7. Di bilah menu atas IntelliJ IDEA, pilih Run > Run untuk menjalankan klien.
    Catatan Saat Anda menjalankan IntelliJ IDEA untuk pertama kalinya, dibutuhkan waktu untuk memuat dan menginstal dependensi yang relevan.
    • Hasil yang ditampilkan menunjukkan bahwa klien SDK dapat melacak perubahan data dari instance sumber.
    • Klien SDK mengumpulkan dan menampilkan statistik tentang data yang dikonsumsi pada interval reguler. Informasi statistik mencakup jumlah total catatan data yang dikirim dan diterima, jumlah total data, dan jumlah permintaan per detik (RPS).
      Tabel 1. Tabel berikut menjelaskan parameter dalam informasi.
      ParameterDeskripsi
      outCountsJumlah total catatan data yang dikonsumsi oleh klien SDK.
      outBytesJumlah total data yang dikonsumsi oleh klien SDK. Unit: byte.
      outRpsJumlah RPS ketika klien SDK mengonsumsi data.
      outBpsJumlah bit yang ditransmisikan per detik ketika klien SDK mengonsumsi data.
      countTidak ada
      inBytesJumlah total data yang dikirim oleh server DTS. Unit: byte.
      DStoreRecordQueueUkuran antrian cache data saat ini ketika server DTS mengirim data.
      inCountsJumlah total catatan data yang dikirim oleh server DTS.
      inRpsJumlah RPS ketika server DTS mengirim data.
      inBpsJumlah bit yang ditransmisikan per detik ketika server DTS mengirim data.
      __dtTimestamp saat ini ketika klien SDK menerima data. Unit: milidetik.
      DefaultUserRecordQueueUkuran antrian cache data saat ini setelah serialisasi.
  8. Opsional: Untuk memodifikasi tipe data dari data yang dilacak, Anda dapat memodifikasi kode di metode buildRecordListener() atau menggunakan kelas kustom.
    public static Map<String, RecordListener> buildRecordListener() {
            // pengguna dapat mengimplementasikan pendengar mereka sendiri
            RecordListener mysqlRecordPrintListener = new RecordListener() {
                @Override
                public void consume(DefaultUserRecord record) {
    
                    OperationType operationType = record.getOperationType();
    
                    if (operationType.equals(OperationType.INSERT)
                            || operationType.equals(OperationType.UPDATE)
                            || operationType.equals(OperationType.DELETE)
                            || operationType.equals(OperationType.HEARTBEAT)) {
    
                        // konsumsi rekaman
                        RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL);
    
                        recordPrintListener.consume(record);
    
                        // metode commit mendorong pembaruan checkpoint
                        record.commit("");
                    }
                }
            };
            return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener);
        }