Setelah membuat tugas pelacakan perubahan, gunakan demo SDK yang disediakan oleh Data Transmission Service (DTS) untuk mengonsumsi data perubahan yang dihasilkan. Topik ini menjelaskan cara menggunakan demo SDK dengan sumber data terdistribusi seperti PolarDB-X 1.0 dan DMS LogicDB.
Prasyarat
-
JDK 1.8 telah diinstal.
-
IntelliJ IDEA telah diinstal.
Peringatan
Untuk mengonsumsi data perubahan sebagai pengguna RAM, Anda harus memberikan izin AliyunDTSFullAccess dan izin akses ke objek sumber kepada pengguna tersebut. Untuk informasi selengkapnya, lihat Otorisasi pengguna RAM untuk mengelola DTS dan Mengelola izin pengguna RAM.
Prosedur
Topik ini menunjukkan cara menjalankan demo SDK untuk mengonsumsi data perubahan dari instans PolarDB-X 1.0, dengan menggunakan IntelliJ IDEA Community Edition 2020.1 untuk Windows sebagai contoh.
-
Buat instans pelacakan perubahan. Untuk informasi selengkapnya, lihat Buat tugas pelacakan perubahan untuk PolarDB-X 1.0.
-
Buat satu atau beberapa kelompok konsumen. Untuk informasi selengkapnya, lihat Buat kelompok konsumen.
-
Unduh dan ekstrak demo SDK. Untuk tautan unduhan, lihat Kode Demo SDK.
PentingSaat mengonsumsi data perubahan, Anda harus memanggil metode commit dari DefaultUserRecord untuk mengirimkan offset konsumen. Jika tidak, Anda berisiko mengonsumsi data secara berulang.
-
Buka proyek di IntelliJ IDEA.
-
Buka IntelliJ IDEA dan klik Open or Import.
-
Di kotak dialog, arahkan ke direktori tempat Anda mengekstrak demo SDK, buka folder-folder tersebut, lalu klik ganda file pom.xml.
-
Di kotak dialog yang muncul, pilih Open as Project.
-
-
Di IntelliJ IDEA, buka folder-folder tersebut. Berdasarkan mode penggunaan SDK client, pilih dan klik ganda untuk membuka file Java yang sesuai: DistributedDTSConsumerDemo.
aliyun-dts-subscribe-sdk-java-master [dts-new-subscr...] .idea src main test java com.aliyun.dts.subscribe.clients DBMapperTest DistributedDTSConsumerDemo DTSConsumerAssignDemo DTSConsumerSubscribeDemo UserMetaStore .gitignore dts-new-subscribe-sdk.iml LICENSE pom.xml README.md External Libraries Scratches and Consoles -
Atur parameter yang diperlukan dalam file Java.
public static void main(String[] args) throws ClientException { // Konfigurasikan pengaturan pelacakan perubahan untuk sumber data terdistribusi, seperti instans PolarDB-X 1.0. Atur parameter seperti pasangan AccessKey, ID instans, ID tugas, dan kelompok konsumen. String accessKeyId = "LTA***********99reZ"; String accessKeySecret = "****************"; String regionId = "cn-hangzhou"; String dtsInstanceId = "dtse5212sed162****"; String jobId = "l791216x16d****"; String sid = "dtsip412t13160****"; String userName = "xftest"; String password = "******"; String proxyUrl = "dts-cn-****.com:18001"; // Offset konsumen awal, berupa timestamp Unix dalam satuan detik. Contoh: 1566180200 untuk Mon Aug 19 10:03:21 CST 2019. String checkpoint = "1639620090"; // Konversi nama database/tabel fisik menjadi nama database/tabel logis boolean mapping = true; // Atur ke true untuk memaksa klien memulai dari checkpoint yang ditentukan. Pengaturan ulang checkpoint hanya berlaku dalam mode ASSIGN. boolean isForceUseInitCheckpoint = false; ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN; DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId, jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl, checkpoint, isForceUseInitCheckpoint, mapping); demo.start(); }Parameter
Deskripsi
Cara mendapatkan
accessKeyId
ID AccessKey.
Untuk informasi selengkapnya, lihat Dapatkan pasangan AccessKey.
accessKeySecret
Rahasia AccessKey.
regionId
ID wilayah dari instans pelacakan perubahan.
Di Konsol DTS, klik ID instans pelacakan perubahan yang dituju. Di halaman Task Management, temukan informasi wilayahnya. Misalnya, jika wilayahnya China (Hangzhou), atur parameter ini menjadi
cn-hangzhou. Untuk daftar wilayah, lihat Daftar wilayah yang didukung.dtsInstanceId
ID instans pelacakan perubahan.
Di Konsol DTS, klik ID instans pelacakan perubahan yang dituju. Di halaman Task Management, Anda dapat menemukan ID instans dan ID tugas.
jobId
ID tugas pelacakan perubahan.
sid
ID kelompok konsumen.
Di Konsol DTS, klik ID instans pelacakan perubahan yang dituju, lalu klik Consume Data. Anda dapat menemukan SID dan Account dari kelompok konsumen.
CatatanKata sandi untuk akun kelompok konsumen ditentukan saat Anda membuat kelompok konsumen.
userName
Akun kelompok konsumen.
password
Kata sandi akun kelompok konsumen.
proxyUrl
Titik akhir dan port dari instans pelacakan perubahan.
Catatan-
Untuk meminimalkan latensi jaringan, gunakan titik akhir internal jika Instance ECS yang menjalankan SDK client berada dalam jaringan klasik atau VPC yang sama dengan instans pelacakan perubahan.
-
Kami tidak merekomendasikan penggunaan titik akhir publik karena potensi ketidakstabilan jaringan.
Di Konsol DTS, klik ID instans pelacakan perubahan yang dituju. Di halaman Task Management, Anda dapat menemukan titik akhir dan nomor port-nya.
checkpoint
Offset konsumen, ditentukan sebagai timestamp Unix dalam satuan detik. SDK client mulai mengonsumsi data dari titik waktu ini.
CatatanOffset konsumen digunakan dalam skenario berikut:
-
Jika aplikasi Anda terganggu, berikan offset konsumen terakhir untuk melanjutkan konsumsi tanpa kehilangan data.
-
Saat klien dimulai, Anda dapat memberikan offset konsumen tertentu untuk mengonsumsi data sesuai kebutuhan.
Offset konsumen harus berada dalam rentang data instans pelacakan perubahan dan harus berupa timestamp Unix.
CatatanAnda dapat menggunakan mesin pencari untuk menemukan konverter timestamp Unix.
-
-
Di bilah menu atas IntelliJ IDEA, pilih untuk menjalankan klien.
CatatanPada jalankan pertama, IntelliJ IDEA secara otomatis menginstal dependensi yang diperlukan, yang mungkin memerlukan waktu.
-
Output menunjukkan bahwa klien dapat mengonsumsi perubahan data dari instans sumber.
-
SDK client secara berkala menampilkan statistik konsumsi, yang mencakup jumlah total dan volume catatan data yang dikirim serta diterima, dan requests per second (RPS).
Tabel 1. Statistik konsumsi
Parameter
Deskripsi
outCountsJumlah total catatan data yang dikonsumsi oleh SDK client.
outBytesVolume total data yang dikonsumsi oleh SDK client, dalam satuan byte.
outRpsLaju konsumsi data SDK client, dalam requests per second (RPS).
outBpsLaju konsumsi data SDK client, dalam bits per second (bps).
countParameter ini dicadangkan.
inBytesVolume total data yang dikirim oleh server DTS, dalam satuan byte.
DStoreRecordQueueUkuran saat ini dari antrian cache data di server DTS.
inCountsJumlah total catatan data yang dikirim oleh server DTS.
inRpsLaju pengiriman data server DTS, dalam requests per second (RPS).
inBpsLaju pengiriman data server DTS, dalam bits per second (bps).
__dtTimestamp saat SDK client menerima data, dalam milidetik.
DefaultUserRecordQueueUkuran saat ini dari antrian cache data setelah serialisasi.
-
-
Opsi: Untuk menyesuaikan cara pemrosesan data yang dikonsumsi, Anda dapat memodifikasi metode
buildRecordListener()atau menggunakan kelas kustom.public static Map<String, RecordListener> buildRecordListener() { // Anda dapat mengimplementasikan listener sendiri. RecordListener mysqlRecordPrintListener = new RecordListener() { @Override public void consume(DefaultUserRecord record) { OperationType operationType = record.getOperationType(); if (operationType.equals(OperationType.INSERT) || operationType.equals(OperationType.UPDATE) || operationType.equals(OperationType.DELETE) || operationType.equals(OperationType.HEARTBEAT)) { // Konsumsi catatan tersebut. RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL); recordPrintListener.consume(record); // Metode commit mendorong pembaruan checkpoint. record.commit(""); } } }; return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener); }