Setelah mengonfigurasi tugas pelacakan perubahan, Anda dapat menggunakan kode demo SDK yang disediakan oleh Data Transmission Service (DTS) untuk mengonsumsi data terlacak. Topik ini menjelaskan cara menggunakan kode demo SDK untuk mengonsumsi data terlacak dari database terdistribusi seperti PolarDB-X 1.0 dan Data Management (DMS) LogicDB.
Prasyarat
- Java Development Kit (JDK) versi 1.8 telah diinstal.
- IntelliJ IDEA telah diinstal.
Perhatian
Jika Anda menggunakan pengguna Resource Access Management (RAM) untuk melacak data, pengguna RAM harus memiliki izin AliyunDTSFullAccess dan izin untuk mengakses objek sumber. Untuk informasi lebih lanjut, lihat Gunakan kebijakan sistem untuk memberi otorisasi kepada pengguna RAM mengelola instance DTS dan Berikan izin kepada pengguna RAM.
Prosedur
Topik ini menjelaskan cara menggunakan kode demo SDK untuk mengonsumsi data terlacak dari instance PolarDB-X 1.0. IntelliJ IDEA (Community Edition 2020.1 untuk Windows) digunakan dalam contoh ini.
- Buat instance pelacakan perubahan. Untuk informasi lebih lanjut, lihat Lacak perubahan data dari instance PolarDB-X 1.0.
- Buat satu atau beberapa grup konsumen. Untuk informasi lebih lanjut, lihat Buat grup konsumen.
- Unduh paket kode demo SDK dan ekstrak paket tersebut.
- Buka proyek tujuan di IntelliJ IDEA.
- Buka IntelliJ IDEA. Di jendela yang muncul, klik Open or Import.

- Di kotak dialog yang ditampilkan, masuk ke direktori tempat paket diekstraksi. Kemudian, buka folder dan klik dua kali file pom.xml.

- Di kotak dialog yang muncul, pilih Open as Project.
- Buka IntelliJ IDEA. Di jendela yang muncul, klik Open or Import.
- Di jendela IntelliJ IDEA, perluas folder untuk menemukan file Java. Kemudian, klik dua kali file Java berdasarkan mode penggunaan SDK client.Dalam skenario ini, pilih DistributedDTSConsumerDemo.

- Atur parameter yang diperlukan di dalam kode file Java.
public static void main(String[] args) throws ClientException { // Konfigurasikan tugas pelacakan perubahan untuk database terdistribusi seperti instance PolarDB-X 1.0. Tentukan parameter terkait AccessKey, instance DTS, pekerjaan DTS, dan grup konsumen. String accessKeyId = "LTA***********99reZ"; String accessKeySecret = "****************"; String regionId = "cn-hangzhou"; String dtsInstanceId = "dtse5212sed162****"; String jobId = "l791216x16d****"; String sid = "dtsip412t13160****"; String userName = "xftest"; String password = "******"; String proxyUrl = "dts-cn-****.com:18001"; // checkpoint awal untuk pencarian pertama (timestamp yang ditentukan, misalnya 1566180200 jika Anda ingin (Sen, 19 Agustus 10:03:21 CST 2019)) String checkpoint = "1639620090"; // Ubah nama database/tabel fisik menjadi nama database/tabel logis boolean mapping = true; // jika memaksa menggunakan checkpoint konfigurasi saat mulai. untuk reset checkpoint, hanya mode assign yang bekerja boolean isForceUseInitCheckpoint = false; ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN; DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId, jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl, checkpoint, isForceUseInitCheckpoint, mapping); demo.start(); }Parameter Deskripsi Cara mendapatkan accessKeyId ID AccessKey. Untuk informasi lebih lanjut tentang cara mendapatkan pasangan AccessKey, lihat Buat dan dapatkan pasangan AccessKey. accessKeySecret Rahasia AccessKey. regionId ID wilayah tempat instance pelacakan perubahan berada. Di konsol DTS baru, klik ID instance. Di halaman Task Management, Anda bisa mendapatkan informasi wilayah instance. Misalnya, jika instance berada di wilayah China (Hangzhou), atur parameter ke cn-hangzhou. Untuk informasi lebih lanjut, lihat Daftar wilayah yang didukung.dtsInstanceId ID instance pelacakan perubahan. Di konsol DTS baru, klik ID instance. Di halaman Task Management, Anda bisa mendapatkan ID instance dan ID tugas. jobId ID tugas pelacakan perubahan. sid ID grup konsumen. Di konsol DTS baru, klik ID instance. Di panel navigasi di sebelah kiri, klik Consume Data. Anda bisa mendapatkan Sid dan Account yang sesuai. Catatan Kata sandi akun grup konsumen secara otomatis ditentukan saat Anda membuat grup konsumen.userName Akun grup konsumen. password Kata sandi yang sesuai dengan akun grup konsumen. proxyUrl Titik akhir dan nomor port instance pelacakan perubahan. Catatan Jika Anda melacak perubahan data melalui jaringan internal, latensi jaringan minimal. Ini berlaku jika instance ECS tempat Anda menyebarkan klien SDK termasuk dalam jaringan klasik atau virtual private cloud (VPC) yang sama dengan instance pelacakan perubahan.Di konsol DTS baru, klik ID instance. Di halaman Task Management, Anda bisa mendapatkan titik akhir dan nomor port. checkpoint Offset konsumen. Ini adalah timestamp ketika klien SDK mengonsumsi catatan data pertama. Nilainya adalah timestamp UNIX dalam detik. Catatan Offset konsumen dapat digunakan dalam skenario berikut:- Setelah proses konsumsi terganggu, Anda dapat menentukan offset konsumen untuk melanjutkan konsumsi data. Ini memungkinkan Anda mencegah kehilangan data.
- Saat Anda memulai klien pelacakan perubahan, Anda dapat menentukan offset konsumen untuk mengonsumsi data sesuai permintaan.
Offset konsumen harus berada dalam rentang data instance pelacakan perubahan. Offset konsumen harus dikonversi menjadi timestamp UNIX. Catatan Anda dapat menggunakan mesin pencari untuk mendapatkan konverter timestamp UNIX. - Di bilah menu atas IntelliJ IDEA, pilih untuk menjalankan klien.Catatan Saat Anda menjalankan IntelliJ IDEA untuk pertama kalinya, dibutuhkan waktu untuk memuat dan menginstal dependensi yang relevan.
- Hasil yang ditampilkan menunjukkan bahwa klien SDK dapat melacak perubahan data dari instance sumber.
- Klien SDK mengumpulkan dan menampilkan statistik tentang data yang dikonsumsi pada interval reguler. Informasi statistik mencakup jumlah total catatan data yang dikirim dan diterima, jumlah total data, dan jumlah permintaan per detik (RPS).
Tabel 1. Tabel berikut menjelaskan parameter dalam informasi. Parameter Deskripsi outCountsJumlah total catatan data yang dikonsumsi oleh klien SDK. outBytesJumlah total data yang dikonsumsi oleh klien SDK. Unit: byte. outRpsJumlah RPS ketika klien SDK mengonsumsi data. outBpsJumlah bit yang ditransmisikan per detik ketika klien SDK mengonsumsi data. countTidak ada inBytesJumlah total data yang dikirim oleh server DTS. Unit: byte. DStoreRecordQueueUkuran antrian cache data saat ini ketika server DTS mengirim data. inCountsJumlah total catatan data yang dikirim oleh server DTS. inRpsJumlah RPS ketika server DTS mengirim data. inBpsJumlah bit yang ditransmisikan per detik ketika server DTS mengirim data. __dtTimestamp saat ini ketika klien SDK menerima data. Unit: milidetik. DefaultUserRecordQueueUkuran antrian cache data saat ini setelah serialisasi.
- Opsional: Untuk memodifikasi tipe data dari data yang dilacak, Anda dapat memodifikasi kode di metode
buildRecordListener()atau menggunakan kelas kustom.public static Map<String, RecordListener> buildRecordListener() { // pengguna dapat mengimplementasikan pendengar mereka sendiri RecordListener mysqlRecordPrintListener = new RecordListener() { @Override public void consume(DefaultUserRecord record) { OperationType operationType = record.getOperationType(); if (operationType.equals(OperationType.INSERT) || operationType.equals(OperationType.UPDATE) || operationType.equals(OperationType.DELETE) || operationType.equals(OperationType.HEARTBEAT)) { // konsumsi rekaman RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL); recordPrintListener.consume(record); // metode commit mendorong pembaruan checkpoint record.commit(""); } } }; return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener); }