全部产品
Search
文档中心

Data Transmission Service:Gunakan demo SDK untuk mengonsumsi data yang dilacak dari instance PolarDB-X 1.0

更新时间:Jul 06, 2025

Setelah mengonfigurasi tugas pelacakan perubahan, Anda dapat menggunakan demo SDK yang disediakan oleh Data Transmission Service (DTS) untuk melacak dan mengonsumsi data. Topik ini menjelaskan cara menggunakan demo SDK untuk mengonsumsi data yang dilacak dari database terdistribusi. Database sumber yang didukung adalah instance PolarDB for Xscale (PolarDB-X) 1.0 dan database logis Data Management (DMS).

Prasyarat

  • Java Development Kit (JDK) V1.8 telah diinstal.
  • IntelliJ IDEA telah diinstal.

Catatan penggunaan

Jika Anda ingin melacak dan mengonsumsi data sebagai Pengguna Resource Access Management (RAM), pengguna RAM harus memiliki izin AliyunDTSFullAccess dan izin untuk mengakses objek sumber. Untuk informasi lebih lanjut tentang cara memberikan izin, lihat Gunakan Kebijakan Sistem untuk Mengotorisasi Pengguna RAM Mengelola Instance DTS dan Berikan Izin kepada Pengguna RAM.

Prosedur

Topik ini menjelaskan cara menggunakan demo SDK untuk mengonsumsi data yang dilacak dari instance PolarDB-X 1.0. Contoh ini menggunakan IntelliJ IDEA Community Edition 2020.1 untuk Windows.

  1. Buat tugas pelacakan perubahan. Untuk informasi lebih lanjut, lihat Lacak Perubahan Data dari Instance ApsaraDB RDS for MySQL.
  2. Buat satu atau beberapa grup konsumen. Untuk informasi lebih lanjut, lihat Buat Grup Konsumen.
  3. Unduh paket demo SDK dan ekstrak paket tersebut.
  4. Buka file yang ingin digunakan sebagai proyek di IntelliJ IDEA.
    1. Buka IntelliJ IDEA. Di jendela yang muncul, klik Open or Import.
      Open a project
    2. Di kotak dialog yang muncul, buka direktori tempat paket diekstraksi. Kemudian, buka folder dan klik dua kali file pom.xml.
      1
    3. Di kotak dialog yang muncul, pilih Open as Project.
  5. Di IntelliJ IDEA, perluas folder untuk menemukan file Java. Kemudian, klik dua kali file Java berdasarkan mode penggunaan klien SDK. Dalam skenario ini, pilih DistributedDTSConsumerDemo.
    Find the Java file
  6. Atur parameter yang diperlukan dalam kode file Java.
    public static void main(String[] args) throws ClientException {
            // Konfigurasikan tugas pelacakan perubahan untuk database terdistribusi seperti instance PolarDB-X 1.0. Atur parameter yang digunakan untuk menentukan pasangan AccessKey, ID instance, ID tugas, dan grup konsumen.
            String accessKeyId = "LTA***********99reZ";
            String accessKeySecret = "****************";
            String regionId = "cn-hangzhou";
            String dtsInstanceId = "dtse5212sed162****";
            String jobId = "l791216x16d****";
            String sid = "dtsip412t13160****";
            String userName = "xftest";
            String password = "******";
            String proxyUrl = "dts-cn-****.com:18001";
            // checkpoint awal untuk pencarian pertama (timestamp yang ditetapkan, misalnya 1566180200 jika Anda ingin (Sen, 19 Agustus 10:03:21 CST 2019))
            String checkpoint = "1639620090";
    
            // Ubah nama database/tabel fisik menjadi nama database/tabel logis
            boolean mapping = true;
            // jika memaksa menggunakan konfigurasi checkpoint saat mulai. untuk reset checkpoint, hanya mode assign yang bekerja
            boolean isForceUseInitCheckpoint = false;
    
            ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN;
            DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId,
                    jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl,
                    checkpoint, isForceUseInitCheckpoint, mapping);
            demo.start();
        }
    ParameterDeskripsiCara Mendapatkan Nilai Parameter
    accessKeyIdID AccessKey.Untuk informasi lebih lanjut tentang cara mendapatkan pasangan AccessKey, lihat Buat dan Dapatkan Pasangan AccessKey.
    accessKeySecretRahasia AccessKey.
    regionIdID wilayah tempat tugas pelacakan perubahan berada.Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di halaman Basic Information, Anda bisa mendapatkan wilayah tempat instance berada. Misalnya, jika instance berada di wilayah China (Hangzhou), atur parameter ke cn-hangzhou. Untuk informasi lebih lanjut, lihat Wilayah yang Didukung.
    dtsInstanceIdID instance pelacakan perubahan.Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di halaman Basic Information, Anda dapat melihat DTS Instance ID.
    jobIdID tugas pelacakan perubahan.Anda dapat memanggil operasi DescribeDtsJobs untuk menanyakan ID tugas.
    sidID grup konsumen.Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di panel navigasi di sebelah kiri, klik Consume Data. Anda dapat melihat Consumer Group ID/Name dari instance dan Account dari grup konsumen.
    Catatan Kata sandi akun grup konsumen ditentukan saat Anda membuat grup konsumen.
    userNameAkun grup konsumen.
    passwordKata sandi grup konsumen.
    proxyUrlTitik akhir dan nomor port instance pelacakan perubahan.
    Catatan Jika Anda melacak perubahan data melalui jaringan internal, latensi jaringan minimal. Ini berlaku jika instance Elastic Compute Service (ECS) tempat Anda menyebarkan klien SDK berada di jaringan klasik atau di virtual private cloud (VPC) yang sama dengan instance pelacakan perubahan.
    Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di halaman Basic Information, Anda dapat melihat Network dari instance.
    checkpointOffset konsumen. Ini adalah timestamp yang dihasilkan ketika klien SDK mengonsumsi catatan data pertama. Nilainya adalah timestamp UNIX dalam detik.
    Catatan Offset konsumen dapat digunakan dalam skenario berikut:
    • Jika proses konsumsi terganggu, Anda dapat menentukan offset konsumen untuk melanjutkan konsumsi data. Ini memungkinkan Anda mencegah kehilangan data.
    • Saat Anda memulai klien pelacakan perubahan, Anda dapat menentukan offset konsumen untuk mengonsumsi data berdasarkan kebutuhan bisnis Anda.
    Offset konsumen dari data yang dikonsumsi harus berada dalam rentang data instance pelacakan perubahan. Offset konsumen harus dikonversi ke timestamp UNIX.
    Catatan
    • Anda dapat melihat rentang data instance pelacakan perubahan di kolom Data Range di halaman Tugas Pelacakan Perubahan.
    • Anda dapat menggunakan mesin pencari untuk mendapatkan konverter timestamp UNIX.
  7. Di bilah menu atas IntelliJ IDEA, pilih Run > Run untuk menjalankan klien.
    Catatan Saat Anda menjalankan IntelliJ IDEA untuk pertama kali, dibutuhkan waktu untuk memuat dan menginstal dependensi yang relevan.
    • Hasil menunjukkan bahwa klien SDK dapat melacak perubahan data dari instance sumber.
    • Klien SDK menghitung dan menampilkan informasi tentang data yang dikonsumsi secara berkala. Informasi tersebut mencakup jumlah total catatan data yang dikirim dan diterima, jumlah total data, dan jumlah permintaan per detik (RPS).
      Tabel 1. Tabel berikut menjelaskan parameter dalam informasi.
      ParameterDeskripsi
      outCountsJumlah total catatan data yang dikonsumsi oleh klien SDK.
      outBytesJumlah total data yang dikonsumsi oleh klien SDK. Unit: byte.
      outRpsRPS di mana klien SDK mengonsumsi data.
      outBpsJumlah bit yang ditransmisikan per detik di mana klien SDK mengonsumsi data.
      countTidak ada.
      inBytesJumlah total data yang dikirim oleh server DTS. Unit: byte.
      DStoreRecordQueueUkuran antrian cache data saat ini ketika server DTS mengirim data.
      inCountsJumlah total catatan data yang dikirim oleh server DTS.
      inRpsRPS di mana server DTS mengirim data.
      inBpsJumlah bit yang ditransmisikan per detik ketika server DTS mengirim data.
      __dtTimestamp yang dihasilkan ketika klien SDK menerima data. Unit: milidetik.
      DefaultUserRecordQueueUkuran antrian cache data saat ini setelah serialisasi.
  8. Opsional:Untuk memodifikasi tipe data dari data yang dilacak, Anda dapat memodifikasi kode di metode buildRecordListener() atau menggunakan kelas kustom.
    public static Map<String, RecordListener> buildRecordListener() {
            // pengguna dapat mengimplementasikan pendengar mereka sendiri
            RecordListener mysqlRecordPrintListener = new RecordListener() {
                @Override
                public void consume(DefaultUserRecord record) {
    
                    OperationType operationType = record.getOperationType();
    
                    if (operationType.equals(OperationType.INSERT)
                            || operationType.equals(OperationType.UPDATE)
                            || operationType.equals(OperationType.DELETE)
                            || operationType.equals(OperationType.HEARTBEAT)) {
    
                        // konsumsi rekaman
                        RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL);
    
                        recordPrintListener.consume(record);
    
                        // metode commit mendorong pembaruan checkpoint
                        record.commit("");
                    }
                }
            };
            return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener);
        }