Layanan Transmisi Data (DTS) menangkap perubahan inkremental dari instans kluster sharded ApsaraDB for MongoDB dan mengirimkan setiap event perubahan ke fungsi Function Compute dalam format Canal JSON. Tulis kode fungsi Anda untuk memproses, mentransformasi, atau meneruskan event tersebut ke sistem downstream.
Apa yang diterima fungsi Anda
Setiap pemanggilan meneruskan objek dengan array Records. Setiap elemen dalam array merepresentasikan satu event perubahan:
| Field | Type | Description |
|---|---|---|
isDdl | Boolean | True untuk operasi DDL; False untuk operasi DML |
type | String | DML: INSERT, UPDATE, atau DELETE. DDL: DDL |
database | String | Nama database MongoDB |
table | String | Nama koleksi |
pkNames | String | Nama kunci primer. Selalu _id untuk MongoDB |
es | Long | Stempel waktu UNIX 13 digit (ms) — saat operasi terjadi di sumber |
ts | Long | Stempel waktu UNIX 13 digit (ms) — saat DTS mulai menulis ke tujuan |
data | Object Array | Array berisi satu elemen. Elemen tersebut memiliki kunci doc yang nilainya adalah string JSON yang merepresentasikan dokumen. Deserialisasi nilai tersebut untuk membaca catatan |
old | Object Array | Formatnya sama seperti data. Hanya tersedia ketika type bernilai UPDATE; berisi status dokumen sebelum pembaruan |
id | Int | Nomor seri operasi |
Fungsi menerima dua kategori operasi:
DDL — perubahan skema:
CreateIndex,CreateCollection,DropIndex,DropCollectionDML — perubahan data:
INSERT,UPDATE,DELETE
Contoh muatan
Membuat koleksi (DDL)
db.createCollection("testCollection"){
"Records": [{
"data": [{"doc": "{\"create\": \"testCollection\", \"idIndex\": {\"v\": 2, \"key\": {\"_id\": 1}, \"name\": \"_id_\"}}"}],
"pkNames": ["_id"],
"type": "DDL",
"es": 1694056437000,
"database": "MongoDBTest",
"id": 0,
"isDdl": true,
"table": "testCollection",
"ts": 1694056437510
}]
}Menghapus koleksi (DDL)
db.testCollection.drop(){
"Records": [{
"data": [{"doc": "{\"drop\": \"testCollection\"}"}],
"pkNames": ["_id"],
"type": "DDL",
"es": 1694056577000,
"database": "MongoDBTest",
"id": 0,
"isDdl": true,
"table": "testCollection",
"ts": 1694056577789
}]
}Membuat indeks (DDL)
db.testCollection.createIndex({name: 1}){
"Records": [{
"data": [{"doc": "{\"createIndexes\": \"testCollection\", \"v\": 2, \"key\": {\"name\": 1}, \"name\": \"name_1\"}"}],
"pkNames": ["_id"],
"type": "DDL",
"es": 1694056670000,
"database": "MongoDBTest",
"id": 0,
"isDdl": true,
"table": "testCollection",
"ts": 1694056670719
}]
}Menghapus indeks (DDL)
db.testCollection.dropIndex({name: 1}){
"Records": [{
"data": [{"doc": "{\"dropIndexes\": \"testCollection\", \"index\": \"name_1\"}"}],
"pkNames": ["_id"],
"type": "DDL",
"es": 1694056817000,
"database": "MongoDBTest",
"id": 0,
"isDdl": true,
"table": "$cmd",
"ts": 1694056818035
}]
}Menyisipkan dokumen (DML)
// Batch insert
db.runCommand({insert: "user", documents: [{"name": "jack", "age": 20}, {"name": "lili", "age": 20}]})
// Single insert
db.user.insert({"name": "jack", "age": 20}){
"Records": [
{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"jack\", \"age\": 20}"}],
"pkNames": ["_id"],
"type": "INSERT",
"es": 1694054783000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054784427
},
{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"lili\", \"age\": 20}"}],
"pkNames": ["_id"],
"type": "INSERT",
"es": 1694054783000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054784428
}
]
}Memperbarui dokumen (DML)
db.user.update({"name": "jack"}, {$set: {"age": 30}}){
"Records": [{
"data": [{"doc": "{\"$set\": {\"age\": 30}}"}],
"pkNames": ["_id"],
"old": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
"type": "UPDATE",
"es": 1694054989000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054990555
}]
}Untuk operasi UPDATE, hanya perintah $set yang disinkronkan secara sinkron oleh DTS saat menyinkronkan Data inkremental.Menghapus dokumen (DML)
db.user.remove({"name": "jack"}){
"Records": [{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
"pkNames": ["_id"],
"type": "DELETE",
"es": 1694055452000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694055452852
}]
}Batasan
Tinjau batasan berikut sebelum membuat tugas sinkronisasi Anda.
Batasan cakupan:
Hanya sinkronisasi data inkremental yang didukung. Sinkronisasi data penuh tidak didukung.
Sinkronisasi lintas wilayah tidak didukung.
DTS tidak dapat menyinkronkan data dari database
admin,config, ataulocal.Pemetaan objek tidak didukung.
Informasi transaksi tidak dipertahankan. Transaksi dikonversi menjadi catatan individual di tujuan.
Batasan database sumber:
Sumber harus menggunakan arsitektur kluster sharded dengan tidak lebih dari 10 node Mongos.
Sumber tidak boleh merupakan kluster Azure Cosmos DB for MongoDB atau kluster elastis Amazon DocumentDB.
Koleksi yang akan disinkronkan harus memiliki kendala PRIMARY KEY atau UNIQUE, dan semua field harus unik. Jika tidak, database tujuan mungkin berisi catatan data duplikat.
Ukuran dokumen tunggal tidak boleh melebihi 16 MB. Dokumen yang melebihi batas ini tidak dapat ditulis ke fungsi tujuan dan akan memicu error. Gunakan fitur ekstrak, transformasi, dan muat (ETL) untuk memfilter field besar jika diperlukan.
Sinkronkan hingga 1.000 koleksi per tugas. Untuk jumlah koleksi yang lebih banyak, buat beberapa tugas atau lakukan sinkronisasi pada tingkat database.
Instans sumber tidak boleh diskalakan selama tugas sinkronisasi berjalan.
DTS tidak dapat terhubung ke database MongoDB melalui titik akhir SRV.
Jika penyeimbang (balancer) database sumber aktif, tugas mungkin mengalami penundaan.
Jika database sumber adalah database MongoDB yang dikelola sendiri dengan arsitektur kluster sharded, atur parameter Access Method ke Express Connect, VPN Gateway, atau Smart Access Gateway atau Cloud Enterprise Network (CEN).
Tulis kendala:
Untuk operasi INSERT, data yang dimasukkan harus berisi kunci shard.
Untuk operasi UPDATE, kunci shard tidak boleh dimodifikasi.
Konfigurasikan hanya satu tugas DTS per fungsi tujuan. Beberapa tugas yang menulis ke fungsi yang sama dapat menyebabkan kesalahan data.
Persyaratan oplog dan change stream:
Oplog harus diaktifkan dan menyimpan data log minimal selama 7 hari, ATAU change stream harus diaktifkan untuk mencakup perubahan minimal 7 hari terakhir. Jika kedua kondisi tersebut tidak terpenuhi, DTS mungkin gagal menangkap perubahan sumber, yang dapat menyebabkan inkonsistensi atau kehilangan data yang tidak dicakup oleh perjanjian tingkat layanan (SLA) DTS.
Pembatasan change stream (jika berlaku):
Change stream memerlukan MongoDB 4.0 atau versi lebih baru.
Sinkronisasi dua arah tidak didukung saat menggunakan change stream.
Untuk kluster Amazon DocumentDB non-elastis, gunakan change stream: atur Migration Method ke ChangeStream dan Architecture ke Sharded Cluster.
Database baru:
DTS tidak menyinkronkan data inkremental dari database yang dibuat setelah tugas sinkronisasi dimulai.
Operasi yang didukung
Operasi yang ditangkap bergantung pada metode migrasi.
Menggunakan oplog (direkomendasikan):
CREATE COLLECTION,CREATE INDEXDROP DATABASE,DROP COLLECTION,DROP INDEXRENAME COLLECTIONINSERT, UPDATE, dan DELETE tingkat dokumen
Menggunakan aliran perubahan:
DROP DATABASE,DROP COLLECTIONRENAME COLLECTIONINSERT, UPDATE, dan DELETE tingkat dokumen
Penagihan
Sinkronisasi data inkremental dikenai biaya. Untuk detail harga, lihat Ikhtisar penagihan.
| Metode penagihan | Deskripsi |
|---|---|
| Langganan | Bayar di muka untuk 1–9 bulan, atau 1, 2, 3, atau 5 tahun. Lebih hemat biaya untuk penggunaan jangka panjang |
| Pay-as-you-go | Ditagih per jam. Lepaskan instans saat tidak lagi digunakan untuk menghentikan biaya |
DTS menagih instans selama periode percobaan koneksi ulang.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Instans kluster sharded ApsaraDB for MongoDB yang sedang berjalan. Lihat Buat instans kluster sharded
Layanan dan fungsi Function Compute, dengan Handler Type diatur ke Event Handler. Lihat Buat fungsi dengan cepat
Akun database pada instans ApsaraDB for MongoDB sumber dengan izin baca pada database sumber,
admin, danlocal. Lihat Kelola izin pengguna database MongoDB
Buat tugas sinkronisasi
Langkah 1: Buka halaman Sinkronisasi Data
Buka halaman Data Synchronization menggunakan salah satu konsol berikut:
Konsol DTS
Login ke Konsol DTS.Konsol DTS
Di panel navigasi kiri, klik Data Synchronization.
Di pojok kiri atas, pilih wilayah tempat tugas sinkronisasi akan dijalankan.
Konsol DMS
Jalur navigasi tepatnya bergantung pada tata letak konsol DMS Anda. Lihat Mode simple dan Sesuaikan tata letak dan gaya konsol DMS.
Login ke Konsol DMS.Konsol DMS
Di bilah navigasi atas, arahkan pointer ke Data + AI lalu pilih DTS (DTS) > Data Synchronization.
Dari dropdown di sebelah kanan Data Synchronization Tasks, pilih wilayah target.
Langkah 2: Konfigurasi database sumber dan tujuan
Klik Create Task, lalu konfigurasi parameter berikut:
| Bagian | Parameter | Deskripsi |
|---|---|---|
| N/A | Task Name | DTS menghasilkan nama secara otomatis. Tentukan nama deskriptif agar tugas mudah diidentifikasi. Tidak perlu unik |
| Source Database | Select Existing Connection | Pilih instans database terdaftar untuk mengisi parameter di bawah secara otomatis, atau konfigurasi secara manual |
| Database Type | Pilih MongoDB | |
| Access Method | Pilih Alibaba Cloud Instance | |
| Instance Region | Wilayah instans MongoDB sumber | |
| Replicate Data Across Alibaba Cloud Accounts | Pilih No untuk sinkronisasi dalam akun yang sama | |
| Architecture | Pilih Sharded Cluster | |
| Migration Method | Cara DTS menangkap data inkremental. Opsi: Oplog (direkomendasikan) atau ChangeStream (lihat detail di bawah) | |
| Instance ID | ID instans MongoDB sumber | |
| Authentication Database | Database yang menyimpan kredensial akun. Default: admin | |
| Database Account | Akun database sumber dengan izin baca yang diperlukan | |
| Database Password | Kata sandi untuk akun database | |
| Shard account | Akun untuk mengakses shard di instans sumber | |
| Shard password | Kata sandi untuk akun shard | |
| Encryption | Mode enkripsi koneksi. Opsi: Non-encrypted, SSL-encrypted, atau Mongo Atlas SSL. Opsi yang tersedia bergantung pada pilihan Access Method dan Architecture. SSL-encrypted tidak tersedia saat Architecture adalah Sharded Cluster dan Migration Method adalah Oplog | |
| Destination Database | Select Existing Connection | Pilih instans Function Compute terdaftar untuk mengisi parameter di bawah secara otomatis, atau konfigurasi secara manual |
| Database Type | Pilih Function Compute | |
| Access Method | Pilih Alibaba Cloud Instance | |
| Instance Region | Sesuai dengan wilayah sumber. Tidak dapat diubah | |
| Service | Layanan Function Compute yang berisi fungsi tujuan | |
| Function | Fungsi yang menerima data yang disinkronkan | |
| Service Version and Alias | Versi atau alias layanan. Opsi: Default Version (tetap ke LATEST), Specified Version (memerlukan Service Version), atau Specified Alias (memerlukan Service Alias). Lihat Terms |
Pilih metode migrasi:
| Metode | Kapan digunakan |
|---|---|
| Oplog (direkomendasikan) | Oplog diaktifkan di sumber (default baik untuk MongoDB yang dikelola sendiri maupun ApsaraDB for MongoDB). Memberikan latensi sinkronisasi lebih rendah karena penarikan log lebih cepat |
| ChangeStream | Oplog dinonaktifkan, atau Anda menggunakan kluster Amazon DocumentDB non-elastis (change stream wajib). Memerlukan MongoDB 4.0 atau lebih baru. Sinkronisasi dua arah tidak didukung |
Saat Architecture adalah Sharded Cluster dan Migration Method adalah ChangeStream, parameter Shard account dan Shard password tidak diperlukan.
Langkah 3: Uji konektivitas
Klik Test Connectivity and Proceed.
Pastikan blok CIDR server DTS telah ditambahkan ke grup keamanan atau daftar izin (allowlist) baik sumber maupun tujuan. Lihat Tambahkan blok CIDR server DTS. Jika sumber atau tujuan menggunakan metode akses yang dikelola sendiri, klik Test Connectivity di dialog CIDR Blocks of DTS Servers terlebih dahulu.
Langkah 4: Pilih objek untuk disinkronkan
Pada langkah Configure Objects, konfigurasi hal berikut:
| Parameter | Deskripsi |
|---|---|
| Synchronization Types | Tetap ke Incremental Data Synchronization. Tidak dapat diubah |
| Data Format | Tetap ke Canal Json. Untuk deskripsi field, lihat bagian Canal Json dalam format data topik kluster Kafka |
| Source Objects | Pilih database atau koleksi untuk disinkronkan, lalu klik |
| Selected Objects | Tinjau objek yang dipilih. Klik |
Langkah 5: Konfigurasi pengaturan lanjutan
Klik Next: Advanced Settings, lalu konfigurasi:
| Parameter | Deskripsi |
|---|---|
| Dedicated Cluster for Task Scheduling | Secara default, DTS menjadwalkan tugas ke kluster bersama. Beli kluster khusus untuk stabilitas lebih tinggi. Lihat Apa itu kluster khusus DTS |
| Retry Time for Failed Connections | Durasi DTS mencoba ulang saat sumber atau tujuan tidak dapat dijangkau. Rentang: 10–1440 menit. Default: 720. Atur minimal 30 menit. Jika beberapa tugas berbagi sumber atau tujuan yang sama, waktu retry terpendek yang berlaku |
| Retry Time for Other Issues | Durasi DTS mencoba ulang operasi DDL atau DML yang gagal. Rentang: 1–1440 menit. Default: 10. Atur minimal 10 menit. Harus lebih pendek dari Retry Time for Failed Connections |
| Obtain the entire document after it is updated | Hanya untuk ChangeStream. Yes: kirim dokumen lengkap setelah pembaruan. No: kirim hanya field yang berubah |
| Enable Throttling for Incremental Data Synchronization | Batasi throughput sinkronisasi untuk mengurangi beban pada tujuan. Konfigurasi RPS of Incremental Data Synchronization dan Data synchronization speed for incremental synchronization (MB/s) |
| Environment Tag | Tag untuk mengidentifikasi lingkungan instans DTS. Opsional |
| Configure ETL | Aktifkan fitur ETL untuk mentransformasi data saat transit. Lihat Apa itu ETL? dan Konfigurasi ETL dalam tugas migrasi atau sinkronisasi data |
| Monitoring and Alerting | Kirim peringatan saat tugas gagal atau latensi sinkronisasi melebihi ambang batas. Lihat Konfigurasi pemantauan dan peringatan saat membuat tugas DTS |
Langkah 6: Jalankan pemeriksaan awal
Klik Next: Save Task Settings and Precheck.
Untuk melihat pratinjau parameter API untuk konfigurasi tugas ini, arahkan kursor ke Next: Save Task Settings and Precheck lalu klik Preview OpenAPI parameters.
DTS menjalankan pemeriksaan awal sebelum memulai tugas sinkronisasi. Tugas hanya dimulai setelah pemeriksaan awal berhasil.
Jika suatu item gagal: klik View Details di sebelah item yang gagal, perbaiki masalah yang dilaporkan, lalu klik Precheck Again.
Jika peringatan dipicu:
Jika peringatan tidak dapat diabaikan: klik View Details, perbaiki masalah, dan jalankan ulang pemeriksaan awal.
Jika peringatan dapat diabaikan: klik Confirm Alert Details, klik Ignore di dialog, konfirmasi dengan OK, lalu klik Precheck Again. Mengabaikan peringatan dapat menyebabkan inkonsistensi data.
Langkah 7: Beli instans
Tunggu hingga Success Rate mencapai 100%, lalu klik Next: Purchase Instance.
Di halaman buy, konfigurasi:
| Bagian | Parameter | Deskripsi |
|---|---|---|
| New Instance Class | Billing Method | Subscription atau Pay-as-you-go |
| Resource Group Settings | Kelompok sumber daya untuk instans. Default: default resource group. Lihat Apa itu Resource Management? | |
| Instance Class | Menentukan kecepatan sinkronisasi. Lihat Kelas instans untuk instansi sinkronisasi data | |
| Subscription Duration | Hanya tersedia untuk penagihan Subscription. Opsi: 1–9 bulan, atau 1, 2, 3, atau 5 tahun |
Baca dan pilih Data Transmission Service (Pay-as-you-go) Service Terms.
Klik Buy and Start, lalu klik OK di dialog konfirmasi.
Lacak progres tugas di daftar tugas.
Langkah selanjutnya
Tulis penanganan event Function Compute Anda untuk memproses muatan Canal JSON. Gunakan field
isDdluntuk membedakan logika DDL dan DML, serta deserialisasi stringdocdi setiap elemendatauntuk mengakses field dokumen.Untuk mengurangi beban dokumen dengan field besar pada fungsi Anda, konfigurasi filter ETL sebelum data mencapai fungsi. Lihat Konfigurasi ETL dalam tugas migrasi atau sinkronisasi data.
Untuk memantau kesehatan sinkronisasi, konfigurasi peringatan di Advanced Settings atau setelah tugas dibuat. Lihat Konfigurasi pemantauan dan peringatan saat membuat tugas DTS.
Jika tugas gagal, dukungan teknis DTS akan berusaha memulihkannya dalam waktu 8 jam. Selama pemulihan, tugas mungkin dimulai ulang dan parameter tugas (bukan parameter database) mungkin disesuaikan. Untuk daftar parameter yang mungkin dimodifikasi, lihat Modifikasi parameter instans.