Setelah mengonfigurasi instans Data Subscription, gunakan kode contoh SDK yang disediakan oleh Data Transmission Service (DTS) untuk mengonsumsi data perubahan.
Prosedur
-
Jika sumber data adalah instans PolarDB-X 1.0 atau database logis DMS, lihat Mengonsumsi data Langganan Data dari PolarDB-X 1.0 menggunakan kode contoh SDK.
-
Jika Anda menggunakan pengguna RAM untuk mengonsumsi data, pengguna RAM tersebut harus memiliki izin AliyunDTSFullAccess serta izin untuk mengakses objek langganan. Untuk informasi lebih lanjut tentang cara memberikan izin, lihat Memberikan otorisasi kepada pengguna RAM untuk mengelola instans DTS menggunakan kebijakan sistem dan Mengelola izin pengguna RAM.
-
Setiap konsumen beroperasi secara independen.
-
Topik ini menyediakan klien SDK contoh dalam Java. Untuk kode contoh dalam Python dan Go, lihat dts-subscribe-demo.
Prosedur berikut menjelaskan cara menjalankan kode contoh SDK untuk mengonsumsi data Langganan Data di IntelliJ IDEA (Community Edition 2020.1 untuk Windows).
-
Buat instans Data Subscription. Untuk informasi lebih lanjut, lihat Membuat saluran Langganan Data untuk instans ApsaraDB RDS for MySQL, Membuat saluran Langganan Data untuk kluster PolarDB for MySQL, atau Membuat saluran Langganan Data untuk database Oracle.
-
Buat satu atau beberapa kelompok konsumen. Untuk informasi lebih lanjut, lihat Membuat kelompok konsumen.
PentingSaat mengonsumsi data Langganan Data, Anda harus memanggil metode
commitdariDefaultUserRecorduntuk melakukan commit checkpoint. Jika tidak, hal ini dapat menyebabkan konsumsi data duplikat. -
Gunakan kode contoh SDK sesuai kebutuhan bisnis Anda.
-
Gunakan paket SDK Langganan Data versi baru (disarankan)
-
Buka IntelliJ IDEA, lalu klik Create New Project untuk membuat proyek bagi aplikasi Anda.
-
Dalam proyek tersebut, temukan file project object model (POM): pom.xml.
-
Tambahkan dependensi berikut ke file pom.xml:
<dependency> <groupId>com.aliyun.dts</groupId> <artifactId>dts-new-subscribe-sdk</artifactId> <version>{dts_new_sdk_version}</version> </dependency>CatatanAnda dapat menemukan dependensi Maven terbaru di halaman dts-new-subscribe-sdk.
-
Untuk informasi lebih lanjut tentang cara menggunakan SDK langganan versi baru, lihat Gunakan kode contoh.
-
-
Gunakan versi kustom SDK Langganan Data versi baru
-
Unduh paket kode contoh SDK, lalu ekstrak.
CatatanKlik
dan pilih Download ZIP untuk mengunduh paket tersebut. -
Buka direktori kode contoh SDK yang telah diekstrak. Gunakan editor teks untuk membuka file pom.xml dan perbarui SDK Langganan Data ke versi terbaru.
PentingAnda dapat memperoleh versi terbaru SDK Langganan Data dari situs web Maven. Untuk informasi lebih lanjut, lihat halaman Maven untuk SDK Langganan Data.
-
Buka IntelliJ IDEA dan klik Open or Import.

-
Pada kotak dialog yang muncul, buka direktori kode contoh SDK yang telah diekstrak, buka folder-folder tersebut, lalu temukan file pom.xml.

-
Pada kotak dialog yang muncul, pilih Open as Project.
-
Di IntelliJ IDEA, buka folder-folder tersebut. Berdasarkan mode penggunaan klien SDK, pilih dan klik ganda file Java yang sesuai: DTSConsumerAssignDemo.java atau DTSConsumerSubscribeDemo.java.
CatatanDTS mendukung mode penggunaan klien SDK berikut:
-
Mode ASSIGN: Untuk memastikan urutan global pesan, DTS hanya menetapkan satu partisi (partisi 0) untuk setiap topik langganan. Saat menggunakan klien SDK dalam mode ASSIGN, kami menyarankan hanya menjalankan satu klien.
-
Mode SUBSCRIBE: Untuk memastikan urutan global pesan, DTS hanya menetapkan satu partisi (partisi 0) untuk setiap topik langganan. Jika Anda menggunakan klien SDK dalam mode SUBSCRIBE, Anda dapat menjalankan beberapa klien SDK dalam satu kelompok konsumen untuk pemulihan bencana. Jika klien aktif gagal, klien SDK lain akan secara otomatis ditetapkan ke partisi 0 untuk melanjutkan konsumsi.
-
-
-
-
Atur parameter yang diperlukan dalam file Java.

Tabel 1. Parameter yang diperlukan
Parameter
Deskripsi
Sumber
brokerUrlTitik akhir dan nomor port instans Data Subscription.
Catatan-
Jika instans ECS yang menjalankan klien SDK dan instans Data Subscription berada dalam jaringan klasik atau Virtual Private Cloud (VPC) yang sama, kami menyarankan Anda menggunakan titik akhir internal untuk meminimalkan latensi jaringan.
-
Kami tidak menyarankan menggunakan titik akhir publik karena potensi ketidakstabilan jaringan.
Di Konsol DTS, klik ID instans Data Subscription yang dituju. Pada halaman Basic Information, Anda dapat memperoleh titik akhir dan nomor port di bagian Network.
topicTopik langganan untuk instans tersebut.
Di Konsol DTS, klik ID instans Data Subscription yang dituju. Pada halaman Basic Information, Anda dapat memperoleh Topic di bagian Basic Information.
sidID kelompok konsumen.
Di Konsol DTS, klik ID instans Data Subscription yang dituju, lalu klik Consume Data. Anda dapat memperoleh Consumer Group ID dan Account kelompok konsumen.
CatatanKata sandi untuk username kelompok konsumen ditentukan saat Anda membuat kelompok konsumen tersebut.
userNameUsername untuk kelompok konsumen.
PeringatanJika Anda tidak menggunakan klien yang disediakan dalam topik ini, Anda harus mengatur username dalam format
<Username>-<Consumer Group ID>. Contoh:dtstest-dtsae******bpv. Jika tidak, koneksi akan gagal.passwordKata sandi untuk username tersebut.
initCheckpointTitik pemeriksaan konsumsi, yang ditentukan sebagai Stempel waktu UNIX, tempat klien SDK mulai mengonsumsi data. Contoh: 1620962769.
CatatanAnda dapat menggunakan informasi titik pemeriksaan konsumsi dalam skenario berikut:
-
Untuk melanjutkan konsumsi dan mencegah kehilangan data setelah gangguan aplikasi, berikan titik pemeriksaan konsumsi terakhir yang diketahui.
-
Saat memulai klien, Anda dapat memberikan titik pemeriksaan konsumsi tertentu untuk mengonsumsi data dari posisi yang diinginkan.
Titik pemeriksaan konsumsi harus berada dalam rentang data instans Data Subscription (seperti yang ditunjukkan pada gambar) dan harus dikonversi ke Stempel waktu UNIX.
CatatanAnda dapat menggunakan mesin pencari untuk menemukan konverter Stempel waktu UNIX.
ConsumerContext.ConsumerSubscribeMode subscribeModeMode penggunaan klien SDK. Nilai yang valid:
-
ConsumerContext.ConsumerSubscribeMode.ASSIGN: Mode ASSIGN. Hanya satu klien SDK dalam satu kelompok konsumen yang dapat mengonsumsi data Langganan Data. -
ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: Mode SUBSCRIBE. Anda dapat menjalankan beberapa klien SDK dalam kelompok konsumen yang sama untuk disaster recovery.
N/A
-
-
Di bilah navigasi atas IntelliJ IDEA, pilih untuk menjalankan klien.
CatatanPertama kali menjalankan klien, mungkin memerlukan waktu untuk memuat dan menginstal dependensi yang diperlukan secara otomatis.
-
Gambar berikut menunjukkan hasilnya. Klien berhasil mengonsumsi data perubahan dari database sumber.

-
Klien SDK secara berkala mengumpulkan dan menampilkan statistik tentang konsumsi data, termasuk jumlah total dan volume catatan yang dikirim serta diterima, dan requests per second (RPS).

Tabel 2. Statistik konsumsi data
Parameter
Deskripsi
outCountsJumlah total catatan data yang dikonsumsi oleh klien SDK.
outBytesVolume total data yang dikonsumsi oleh klien SDK. Satuan: byte.
outRpsJumlah requests per second (RPS) saat klien SDK mengonsumsi data.
outBpsLaju konsumsi data klien SDK, dalam bit per detik (bps).
inBytesVolume total data yang dikirim oleh server DTS. Satuan: byte.
DStoreRecordQueueUkuran antrean cache data internal untuk catatan masuk dari server DTS.
inCountsJumlah total catatan data yang dikirim oleh server DTS.
inRpsRPS saat server DTS mengirim data.
__dtStempel waktu saat klien SDK menerima data. Satuan: milidetik.
DefaultUserRecordQueueUkuran antrean data yang menyimpan catatan siap diproses oleh aplikasi konsumen.
-
Menyimpan dan mengkueri titik pemeriksaan konsumsi
Untuk memulai atau melanjutkan konsumsi data (misalnya, saat peluncuran pertama, restart, atau retry internal), klien SDK memerlukan titik pemeriksaan konsumsi. Tabel berikut menjelaskan cara mengelola dan mengkueri checkpoint dalam berbagai skenario untuk mencegah kehilangan data, meminimalkan konsumsi duplikat, dan memungkinkan konsumsi sesuai permintaan.
|
Skenario |
Mode penggunaan SDK |
Metode kueri |
|
Mengkueri titik pemeriksaan konsumsi |
Mode ASSIGN, mode SUBSCRIBE |
|
|
Startup awal: Memberikan checkpoint untuk memulai konsumsi. |
Mode ASSIGN, mode SUBSCRIBE |
Berdasarkan mode penggunaan klien SDK, pilih file DTSConsumerAssignDemo.java atau DTSConsumerSubscribeDemo.java, lalu konfigurasikan parameter |
|
Klien SDK perlu memberikan kembali titik pemeriksaan konsumsi terakhir yang dicatat untuk melanjutkan konsumsi setelah retry internal. |
Mode ASSIGN |
Cari titik pemeriksaan konsumsi terakhir yang dicatat dalam urutan berikut. Pencarian berhenti dan mengembalikan informasi checkpoint segera setelah ditemukan:
|
|
Mode SUBSCRIBE |
Cari titik pemeriksaan konsumsi terakhir yang dicatat dalam urutan berikut. Pencarian berhenti dan mengembalikan informasi checkpoint segera setelah ditemukan:
|
|
|
Klien SDK telah direstart dan perlu memberikan kembali titik pemeriksaan konsumsi terakhir yang dicatat untuk melanjutkan konsumsi. |
Mode ASSIGN |
Kueri titik pemeriksaan konsumsi berdasarkan konfigurasi
|
|
Mode SUBSCRIBE |
Dalam mode ini, konfigurasi
|
Menyimpan titik pemeriksaan konsumsi secara persisten
Saat terjadi kejadian pemulihan bencana pada modul pengumpulan data inkremental (terutama dalam mode SUBSCRIBE), modul baru tidak menyimpan titik pemeriksaan konsumsi terbaru dari klien. Klien mungkin melanjutkan dari checkpoint yang lebih lama, sehingga menyebabkan konsumsi duplikat data historis. Misalnya, sebelum alih bencana, rentang checkpoint modul lama adalah dari 08:00:00 pada 11 November 2023 hingga 08:00:00 pada 12 November 2023, dan checkpoint klien adalah 08:00:00 pada 12 November 2023. Setelah alih bencana, rentang checkpoint modul baru adalah dari 10:00:00 pada 8 November 2023 hingga 08:01:00 pada 12 November 2023. Klien mulai dari checkpoint awal modul baru (10:00:00 pada 8 November 2023), sehingga menyebabkan konsumsi duplikat.
Untuk menghindari konsumsi duplikat dalam skenario ini, konfigurasikan penyimpanan checkpoint persisten di sisi klien. Contoh berikut memberikan salah satu implementasi yang mungkin yang dapat Anda sesuaikan dengan kebutuhan Anda.
-
Buat kelas
UserMetaStoreyang memperluasAbstractUserMetaStore.Sebagai contoh, untuk menyimpan informasi checkpoint dalam database MySQL, gunakan kode Java berikut:
public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); ResultSet rs = pres.executeQuery() String checkpoint = rs.getString("checkpoint"); return checkpoint; } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } } -
Dalam file consumerContext.java, konfigurasikan media penyimpanan eksternal dengan menggunakan metode
setUserRegisteredStore(new UserMetaStore()).
FAQ
-
Bagaimana cara mengatasi masalah koneksi dengan instans Langganan Data?
Atasi masalah tersebut berdasarkan pesan error. Untuk informasi lebih lanjut, lihat Pemecahan Masalah.
-
Dalam format apa titik pemeriksaan konsumsi disimpan secara persisten?
Data titik pemeriksaan konsumsi yang disimpan secara persisten disimpan dalam format JSON. Titik pemeriksaan yang disimpan adalah Stempel waktu UNIX yang dapat langsung diberikan ke SDK. Dalam contoh tanggapan berikut, nilai
1700709977untuk kunci"timestamp"adalah titik pemeriksaan konsumsi yang disimpan secara persisten.{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}
Pemecahan Masalah
|
Masalah |
Pesan error |
Penyebab |
Solusi |
|
Tidak dapat terhubung |
|
|
Masukkan nilai yang benar untuk parameter |
|
Alamat broker tidak dapat terhubung ke alamat IP aktual. |
||
|
Username atau kata sandi salah. |
||
|
Dalam file consumerContext.java, parameter |
Berikan titik pemeriksaan konsumsi yang berada dalam rentang data instans Langganan Data. Untuk informasi lebih lanjut, lihat Parameter yang diperlukan. |
|
|
Konsumsi melambat |
N/A |
|
|