Setelah mengonfigurasi saluran pelacakan perubahan dengan membuat tugas pelacakan dan kelompok konsumen, Anda dapat menggunakan kit pengembangan perangkat lunak (SDK) yang disediakan oleh DTS untuk mengonsumsi data yang telah berlangganan. Topik ini menjelaskan cara menggunakan kode contoh.
Untuk mengonsumsi data yang telah berlangganan dari sumber data PolarDB-X 1.0 atau DMS LogicDB, lihat Menggunakan SDK untuk mengonsumsi data yang telah berlangganan dari PolarDB-X 1.0.
Topik ini menyediakan kode contoh untuk klien SDK dalam Java. Untuk kode contoh dalam Python dan Go, lihat dts-subscribe-demo.
Prasyarat
Instansi langganan telah dibuat dan berjalan dalam status Normal.
CatatanUntuk petunjuk membuat instansi langganan, lihat Ikhtisar Rencana Langganan.
Anda telah membuat kelompok konsumen untuk instansi langganan Anda.
Jika Anda menggunakan RAM user untuk mengonsumsi data yang telah berlangganan, RAM user tersebut harus memiliki izin AliyunDTSFullAccess dan izin akses ke objek yang telah berlangganan. Untuk informasi selengkapnya, lihat Memberikan izin kepada RAM user untuk mengelola DTS menggunakan kebijakan sistem dan Mengelola izin RAM user.
Perhatian
Saat mengonsumsi data yang telah berlangganan, Anda harus memanggil metode commit dari DefaultUserRecord untuk melakukan commit informasi offset. Jika tidak, data mungkin dikonsumsi berulang kali.
Proses konsumsi yang berbeda bersifat independen satu sama lain.
Di konsol, Current Offset menunjukkan offset yang dilanggan oleh pelacakan tugas, bukan offset yang dikomit oleh klien.
Prosedur
Unduh file kode SDK contoh dan ekstrak paketnya.
Verifikasi versi kode SDK.
Buka direktori tempat Anda mengekstrak kode SDK contoh.
Gunakan editor teks untuk membuka file pom.xml di direktori tersebut.
Perbarui SDK pelacakan perubahan ke versi terbaru.
CatatanAnda dapat menemukan dependensi Maven terbaru di halaman dts-new-subscribe-sdk.
Edit kode SDK.
Gunakan integrated development environment (IDE) untuk membuka file yang telah diekstrak.
Buka file Java yang sesuai dengan mode penggunaan klien SDK Anda.
CatatanJalur file Java adalah
aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/.Mode penggunaan
File Java
Deskripsi
Skenario
ASSIGN mode
DTSConsumerAssignDemo.java
Untuk memastikan urutan pesan global, DTS hanya menetapkan satu partisi (partisi 0) untuk setiap topik pelacakan. Jika Anda menggunakan klien SDK dalam ASSIGN mode, jalankan hanya satu klien SDK.
Hanya satu klien SDK dalam kelompok konsumen yang mengonsumsi data yang telah berlangganan.
SUBSCRIBE mode
DTSConsumerSubscribeDemo.java
Untuk memastikan urutan pesan global, DTS hanya menetapkan satu partisi (partisi 0) untuk setiap topik pelacakan. Jika Anda menggunakan klien SDK dalam SUBSCRIBE mode, Anda dapat menjalankan beberapa klien SDK dalam kelompok konsumen yang sama untuk disaster recovery. Jika klien yang sedang mengonsumsi data gagal, klien SDK lain akan secara acak dan otomatis ditetapkan ke partisi 0 untuk melanjutkan konsumsi.
Beberapa klien SDK dalam kelompok konsumen yang sama mengonsumsi data yang telah berlangganan. Ini merupakan skenario pemulihan bencana data.
Atur parameter dalam kode Java.
Parameter
Deskripsi
Cara mendapatkan
brokerUrlMenentukan alamat jaringan dan nomor port saluran pelacakan perubahan.
CatatanJika server tempat Anda men-deploy klien SDK, seperti Instance ECS, dan instansi pelacakan perubahan berada dalam virtual private cloud (VPC) yang sama, Anda dapat mengonsumsi data melalui VPC untuk mengurangi latensi jaringan.
Kami tidak merekomendasikan penggunaan titik akhir publik karena potensi ketidakstabilan jaringan.
Di Konsol DTS, klik ID instansi langganan target. Di halaman Basic Information, dapatkan alamat jaringan dan nomor port dari bagian Network.
topicTopik saluran pelacakan perubahan.
Di Konsol DTS, klik ID instansi langganan target. Di halaman Basic Information, buka bagian Basic Information dan dapatkan Topic.
sidID kelompok konsumen.
Di Konsol DTS, klik ID instansi langganan target. Di halaman Consume Data, dapatkan Consumer Group ID/Name dan Account.
userNameUsername kelompok konsumen.
PeringatanJika Anda tidak menggunakan klien yang disediakan dalam topik ini, atur username dalam format
<consumer group username>-<consumer group ID>(misalnya,dtstest-dtsae******bpv). Jika tidak, koneksi akan gagal.passwordPassword untuk akun tersebut.
Password yang Anda tetapkan untuk username kelompok konsumen saat membuat kelompok konsumen.
initCheckpointOffset konsumen. Ini adalah stempel waktu UNIX dari catatan data pertama yang akan dikonsumsi oleh klien SDK, misalnya 1620962769.
CatatanAnda dapat menggunakan offset konsumen untuk:
Melanjutkan konsumsi data dari offset tertentu setelah aplikasi Anda terganggu guna mencegah kehilangan data.
Menyesuaikan offset awal untuk konsumsi data sesuai kebutuhan.
Offset konsumen harus berada dalam rentang waktu instansi pelacakan dan harus dikonversi ke stempel waktu UNIX.
CatatanKolom Data Range dalam daftar tugas pelacakan menunjukkan rentang waktu instansi langganan target.
subscribeModeMode penggunaan klien SDK. Anda tidak perlu mengubah parameter ini.
ConsumerContext.ConsumerSubscribeMode.ASSIGN: ASSIGN mode.ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: SUBSCRIBE mode.
N/A
Buka struktur proyek di IDE Anda dan pastikan versi OpenJDK untuk proyek tersebut adalah 1.8.
Jalankan kode klien.
CatatanSaat menjalankan kode untuk pertama kalinya, IDE memerlukan waktu untuk memuat plugin dan dependensi yang diperlukan secara otomatis.
Klien SDK secara berkala mengumpulkan dan menampilkan statistik tentang konsumsi data. Statistik ini mencakup jumlah total catatan data yang dikirim dan diterima, volume data total, serta jumlah catatan per detik (RPS).
[2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}Parameter
Deskripsi
outCountsJumlah total catatan data yang dikonsumsi oleh klien SDK.
outBytesVolume total data yang dikonsumsi oleh klien SDK, dalam byte.
outRpsJumlah permintaan per detik untuk konsumsi data oleh klien SDK.
outBpsJumlah bit yang ditransmisikan per detik untuk konsumsi data oleh klien SDK.
countJumlah total parameter dalam informasi konsumsi data (metrik).
CatatanTidak termasuk
countitu sendiri.inBytesVolume total data yang dikirim oleh server DTS, dalam byte.
DStoreRecordQueueUkuran antrian cache data saat server DTS mengirim data.
inCountsJumlah total catatan data yang dikirim oleh server DTS.
inBpsJumlah bit yang ditransmisikan per detik saat server DTS mengirim data.
inRpsJumlah permintaan per detik saat server DTS mengirim data.
__dtStempel waktu saat klien SDK menerima data, dalam milidetik.
DefaultUserRecordQueueUkuran antrian cache data setelah serialisasi.
Edit kode sesuai kebutuhan untuk mengonsumsi data yang telah berlangganan.
Saat mengonsumsi data yang telah berlangganan, Anda perlu mengelola offset konsumen guna mencegah kehilangan data, meminimalkan duplikasi data, dan memungkinkan konsumsi sesuai permintaan.
FAQ
Apa yang harus saya lakukan jika tidak dapat terhubung ke instansi langganan?
Pecahkan masalah berdasarkan pesan error. Untuk informasi selengkapnya, lihat Pemecahan Masalah.
Apa format data offset konsumen setelah dipersistensi?
Setelah offset konsumen dipersistensi, data dikembalikan dalam format JSON. Offset konsumen yang dipersistensi adalah stempel waktu Unix yang dapat Anda teruskan langsung ke SDK. Misalnya, dalam data yang dikembalikan, nilai
1700709977untuk kunci"timestamp"adalah offset konsumen yang dipersistensi.{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}Apakah tugas pelacakan dapat dikonsumsi oleh beberapa klien secara paralel?
Tidak. Meskipun mode SUBSCRIBE memungkinkan beberapa klien berjalan secara paralel, hanya satu klien yang dapat mengonsumsi data dalam satu waktu.
Versi klien Kafka mana yang dienkapsulasi dalam kode SDK?
Versi 2.0.0 dan yang lebih baru dari dts-new-subscribe-sdk mengenkapsulasi klien Kafka (kafka-clients) 2.7.0. Versi sebelum 2.0.0 mengenkapsulasi klien Kafka 1.0.0.
CatatanJika Anda menggunakan tool deteksi kerentanan paket dependensi dalam proses pengembangan aplikasi Anda dan menemukan bahwa klien Kafka (kafka-clients) yang dienkapsulasi oleh dts-new-subscribe-sdk memiliki kerentanan keamanan, Anda dapat mengatasi kerentanan ini dengan mengganti klien tersebut menggunakan versi
2.1.4-shaded.<dependency> <groupId>com.aliyun.dts</groupId> <artifactId>dts-new-subscribe-sdk</artifactId> <version>2.1.4-shaded</version> </dependency>
Lampiran
Mengelola offset konsumen
Saat klien SDK pertama kali dijalankan, di-restart, atau melakukan retry internal, Anda harus menanyakan dan meneruskan offset konsumen untuk memulai atau melanjutkan konsumsi data. Offset konsumen adalah stempel waktu UNIX dari catatan data pertama yang akan dikonsumsi oleh klien SDK.
Untuk mengatur ulang offset konsumen klien, Anda dapat menanyakan dan memodifikasi offset konsumen berdasarkan mode konsumsi (mode penggunaan SDK) seperti yang dijelaskan dalam tabel berikut.
Skenario | Mode penggunaan SDK | Metode pengelolaan offset |
Menanyakan offset konsumen | ASSIGN mode, SUBSCRIBE mode |
|
Klien SDK dijalankan pertama kali. Anda harus meneruskan offset konsumen untuk mengonsumsi data. | ASSIGN mode, SUBSCRIBE mode | Bergantung pada pola penggunaan klien SDK, pilih file Java DTSConsumerAssignDemo.java atau DTSConsumerSubscribeDemo.java, dan konfigurasikan offset konsumen ( |
Klien SDK melakukan retry internal. Anda harus meneruskan offset konsumen terakhir yang dicatat untuk melanjutkan konsumsi data. | ASSIGN mode | Cari offset konsumen terakhir yang dicatat dalam urutan berikut. Jika offset ditemukan, informasi offset tersebut dikembalikan:
|
SUBSCRIBE mode | Cari offset konsumen terakhir yang dicatat dalam urutan berikut. Jika offset ditemukan, informasi offset tersebut dikembalikan:
| |
Klien SDK di-restart. Anda harus meneruskan offset konsumen terakhir yang dicatat untuk melanjutkan konsumsi data. | ASSIGN mode | Berdasarkan konfigurasi
|
SUBSCRIBE mode | Dalam mode ini, dalam file consumerContext.java, konfigurasi
|
Menyimpan offset konsumen dengan persistensi
Jika terjadi alih bencana pada modul ingestion data inkremental, modul baru tidak dapat menyimpan offset konsumen terakhir klien. Hal ini terutama berlaku dalam mode SUBSCRIBE. Akibatnya, klien mungkin mulai mengonsumsi data dari offset yang lebih awal, menyebabkan konsumsi berulang data historis. Misalnya, asumsikan bahwa sebelum alih layanan, rentang offset modul lama adalah dari pukul 08:00:00 pada 11 November 2023 hingga 08:00:00 pada 12 November 2023, dan offset konsumen klien adalah 08:00:00 pada 12 November 2023. Setelah alih layanan, rentang offset modul baru adalah dari pukul 10:00:00 pada 8 November 2023 hingga 08:01:00 pada 12 November 2023. Dalam skenario ini, klien memulai konsumsi dari offset awal modul baru (10:00:00 pada 8 November 2023), yang mengakibatkan konsumsi data berulang.
Untuk mencegah konsumsi berulang data historis dalam skenario alih layanan ini, kami merekomendasikan agar Anda mengonfigurasi metode penyimpanan persisten untuk offset konsumen di klien. Metode contoh berikut disediakan sebagai referensi. Anda dapat memodifikasinya sesuai kebutuhan.
Buat metode
UserMetaStore()yang mewarisi dan mengimplementasikan metodeAbstractUserMetaStore().Sebagai contoh, Anda dapat menggunakan database MySQL untuk menyimpan informasi offset. Kode Java contoh berikut menunjukkan cara melakukannya:
public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); rs = pres.executeQuery(); if (rs.next()) { String checkpoint = rs.getString("checkpoint"); return checkpoint; } } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } return null; } }Dalam file consumerContext.java, panggil metode
setUserRegisteredStore(new UserMetaStore())untuk mengonfigurasi media penyimpanan eksternal.
Pemecahan Masalah
Pengecualian | Pesan error | Penyebab | Solusi |
Kegagalan koneksi | |
| Masukkan |
| Anda tidak dapat terhubung ke alamat IP nyata menggunakan alamat broker. | ||
| Username atau password salah. | ||
| Dalam file consumerContext.java, | Masukkan offset konsumen yang berada dalam rentang waktu instansi langganan. Untuk metode penanyaan, lihat Deskripsi parameter. | |
Perlambatan konsumsi langganan | N/A |
| |