Gunakan Data Transmission Service (DTS) untuk mengalirkan perubahan inkremental dari instans set replika ApsaraDB for MongoDB langsung ke fungsi Function Compute. Setiap event INSERT, UPDATE, DELETE, dan DDL dikirimkan dalam format Canal JSON, sehingga kode fungsi Anda dapat memproses, mentransformasi, atau meneruskan data tersebut ke sistem downstream tanpa perlu membangun lapisan konsumen terpisah.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Instans set replika ApsaraDB for MongoDB. Lihat Buat instans set replika.
Layanan dan fungsi Function Compute dengan Handler Type diatur ke Event Handler. Lihat Buat fungsi dengan cepat.
Penagihan
Sinkronisasi data inkremental adalah fitur berbayar. Lihat Ikhtisar penagihan.
Jenis sinkronisasi | Biaya konfigurasi tugas |
Sinkronisasi data inkremental | Dikenai biaya. Untuk informasi selengkapnya, lihat Ikhtisar penagihan. |
Batasan
Database sumber
| Kendala | Detail |
|---|---|
| Bandwidth | Server yang menghosting database sumber harus memiliki bandwidth keluar yang mencukupi; jika tidak, kecepatan sinkronisasi akan terpengaruh. |
| Kunci utama atau kunci unik | Koleksi yang akan disinkronkan harus memiliki kendala PRIMARY KEY atau kendala UNIQUE, dan semua field-nya harus unik. Tanpa ini, fungsi tujuan mungkin menerima catatan duplikat. |
| Ukuran catatan tunggal | Maksimum 16 MB per catatan. DTS melaporkan error untuk catatan yang lebih besar. Gunakan fitur ekstrak, transformasi, dan muat (ETL) untuk menyaring field besar. |
| Jumlah koleksi | Maksimal 1.000 koleksi per tugas. Untuk lebih dari 1.000 koleksi, konfigurasikan beberapa tugas atau lakukan sinkronisasi pada tingkat database. |
| Sumber yang tidak didukung | Klaster Azure Cosmos DB for MongoDB dan klaster elastis Amazon DocumentDB tidak didukung. |
| oplog / change streams | oplog harus diaktifkan pada database sumber dan menyimpan data log minimal selama tujuh hari, ATAU change streams harus diaktifkan untuk mencakup tujuh hari terakhir. Jika kedua kondisi ini tidak terpenuhi, DTS mungkin melewatkan perubahan, yang dapat menyebabkan inkonsistensi atau kehilangan data — hal ini tidak dicakup oleh perjanjian tingkat layanan (SLA) DTS. |
| Versi MongoDB untuk change streams | Change streams memerlukan MongoDB 4.0 atau versi yang lebih baru. Sinkronisasi dua arah tidak didukung saat menggunakan change streams. |
| Amazon DocumentDB (non-elastis) | Aktifkan change streams dan atur Migration Method ke ChangeStream dan Architecture ke Sharded Cluster. |
| Titik akhir SRV | DTS tidak dapat terhubung ke MongoDB melalui titik akhir SRV. |
Batasan lainnya
DTS tidak dapat menyinkronkan database
admin,config, ataulocal.Sinkronisasi data penuh tidak didukung — hanya inkremental.
Sinkronisasi lintas wilayah tidak didukung.
Fitur pemetaan nama objek tidak didukung.
Tetapkan hanya satu tugas DTS per fungsi tujuan untuk menghindari kesalahan data.
Konteks transaksi tidak dipertahankan. Setiap transaksi dikonversi menjadi catatan individual.
Jika tugas DTS gagal, dukungan teknis DTS berusaha memulihkannya dalam waktu 8 jam. Tugas mungkin dimulai ulang dan parameter tugas (bukan parameter database) dapat dimodifikasi selama pemulihan.
Kasus khusus (MongoDB yang dikelola sendiri)
Alih bencana primer/sekunder saat tugas sedang berjalan menyebabkan tugas gagal.
Latensi sinkronisasi dihitung dari stempel waktu catatan terbaru yang disinkronkan dibandingkan dengan stempel waktu sumber saat ini. Jika tidak ada operasi tulis pada sumber dalam periode yang lama, latensi yang dilaporkan mungkin tidak akurat. Tulis catatan ke database sumber untuk memperbarui pembacaan latensi.
Jika Anda menyinkronkan seluruh database, buat tabel heartbeat. DTS memperbarui tabel heartbeat setiap detik, sehingga menjaga akurasi pembacaan latensi.
Operasi yang didukung
Operasi yang disinkronkan DTS bergantung pada Migration Method yang Anda pilih.
| Operasi | Oplog | Change streams |
|---|---|---|
| INSERT | Didukung | Didukung |
| UPDATE | Didukung | Didukung |
| DELETE | Didukung | Didukung |
| CREATE COLLECTION | Didukung | Tidak didukung |
| CREATE INDEX | Didukung | Tidak didukung |
| DROP DATABASE | Didukung | Didukung |
| DROP COLLECTION | Didukung | Didukung |
| DROP INDEX | Didukung | Tidak didukung |
| RENAME COLLECTION | Didukung | Didukung |
Gunakan oplog
Tugas DTS tidak menyinkronkan data inkremental dari database yang dibuat setelah tugas mulai berjalan. DTS menyinkronkan data inkremental yang dihasilkan oleh operasi berikut:
CREATE COLLECTION dan INDEX
DROP DATABASE, COLLECTION, dan INDEX
RENAME COLLECTION
Operasi yang dilakukan untuk menyisipkan, memperbarui, dan menghapus dokumen dalam koleksi.
CatatanSaat menyinkronkan data inkremental dokumen, hanya operasi update yang menggunakan perintah
$setyang didukung.
Gunakan change streams
DTS menyinkronkan data inkremental yang dihasilkan oleh operasi berikut:
DROP DATABASE dan COLLECTION
RENAME COLLECTION
Operasi yang dilakukan untuk menyisipkan, memperbarui, dan menghapus dokumen dalam koleksi.
CatatanSaat menyinkronkan data inkremental dokumen, hanya operasi update yang menggunakan perintah
$setyang didukung.
Saat menyinkronkan data inkremental file, hanya perintah $set yang didukung. DTS tidak menyinkronkan data inkremental dari database yang dibuat setelah tugas dimulai (berlaku untuk mode Oplog).Izin akun database yang diperlukan
| Database | Izin yang diperlukan | Referensi |
|---|---|---|
| Instans MongoDB sumber | Izin baca pada database sumber, admin, dan local | Kelola izin pengguna database MongoDB |
Buat tugas sinkronisasi
Langkah 1: Buka halaman Sinkronisasi Data
Gunakan salah satu konsol berikut:
Konsol DTS
Masuk ke Konsol DTS.Konsol DTS
Di panel navigasi sebelah kiri, klik Data Synchronization.
Di pojok kiri atas, pilih wilayah tempat tugas sinkronisasi akan dijalankan.
Konsol DMS
Langkah navigasi yang tepat bervariasi tergantung pada mode dan tata letak konsol DMS. Lihat Mode simple dan Sesuaikan tata letak dan gaya konsol DMS.
Masuk ke Konsol DMS.Konsol DMS
Di bilah navigasi atas, arahkan pointer ke Data + AI lalu pilih DTS (DTS) > Data Synchronization.
Dari daftar drop-down di sebelah kanan Data Synchronization Tasks, pilih wilayah.
Langkah 2: Konfigurasi database sumber dan tujuan
Klik Create Task, lalu konfigurasikan parameter yang dijelaskan dalam tabel berikut.
Umum
| Parameter | Deskripsi |
|---|---|
| Task Name | Nama untuk tugas DTS. DTS menghasilkan nama default. Gunakan nama deskriptif untuk mengidentifikasi tugas — tidak perlu unik. |
Database sumber
| Parameter | Deskripsi |
|---|---|
| Select Existing Connection | Pilih instans database terdaftar dari daftar drop-down, atau konfigurasikan koneksi secara manual. |
| Database Type | Pilih MongoDB. |
| Access Method | Pilih Alibaba Cloud Instance. |
| Instance Region | Wilayah tempat instans MongoDB sumber berada. |
| Replicate Data Across Alibaba Cloud Accounts | Pilih No untuk sinkronisasi dalam akun yang sama. |
| Architecture | Pilih Replica Set. |
| Migration Method | Cara DTS membaca data inkremental dari sumber. Oplog (direkomendasikan): Tersedia ketika fitur oplog diaktifkan pada sumber. Oplog memberikan latensi rendah karena penarikan log yang cepat. ChangeStream: Tersedia ketika change streams diaktifkan. Lihat Change Streams. Jika Sharded Cluster dipilih untuk Architecture, parameter Shard account dan Shard password tidak diperlukan. |
| Instance ID | ID instans MongoDB sumber. |
| Authentication Database | Database yang menyimpan akun dan kata sandi. Default: admin. |
| Database Account | Akun dengan izin baca yang diperlukan. |
| Database Password | Kata sandi untuk akun database. |
| Encryption | Enkripsi koneksi: Non-encrypted, SSL-encrypted, atau Mongo Atlas SSL. Opsi yang tersedia bergantung pada nilai Access Method dan Architecture. Opsi SSL-encrypted tidak tersedia jika Architecture diatur ke Sharded Cluster dan Migration Method diatur ke Oplog. Untuk set replika MongoDB yang dikelola sendiri tanpa menggunakan akses Instance Alibaba Cloud, Anda dapat mengunggah sertifikat CA saat memilih opsi SSL-encrypted |
Database tujuan
| Parameter | Deskripsi |
|---|---|
| Select Existing Connection | Pilih instans terdaftar dari daftar drop-down, atau konfigurasikan secara manual. |
| Database Type | Pilih Function Compute. |
| Access Method | Pilih Alibaba Cloud Instance. |
| Instance Region | Default ke wilayah yang sama dengan sumber dan tidak dapat diubah. |
| Service | Layanan Function Compute yang berisi fungsi tujuan. |
| Function | Fungsi tujuan yang menerima data yang disinkronkan. |
| Service Version and Alias | Menentukan versi fungsi mana yang menerima data: Default Version memperbaiki versi layanan ke LATEST. Specified Version memerlukan konfigurasi parameter Service Version. Specified Alias memerlukan konfigurasi parameter Service Alias. Lihat Terms untuk terminologi Function Compute. |
Langkah 3: Uji konektivitas
Klik Test Connectivity and Proceed di bagian bawah halaman.
Blok CIDR server DTS harus ditambahkan ke pengaturan keamanan database sumber dan tujuan. Lihat Tambahkan blok CIDR server DTS. Jika sumber atau tujuan adalah database yang dikelola sendiri yang tidak menggunakan akses Alibaba Cloud Instance, klik Test Connectivity di kotak dialog CIDR Blocks of DTS Servers.
Langkah 4: Konfigurasi objek untuk disinkronkan
Pada langkah Configure Objects, atur parameter berikut.
| Parameter | Deskripsi |
|---|---|
| Synchronization Types | Ditetapkan ke Incremental Data Synchronization. |
| Data Format | Ditetapkan ke Canal Json. Lihat Format data klaster Kafka untuk deskripsi field. |
| Source Objects | Pilih database atau koleksi, lalu klik ikon panah kanan untuk memindahkannya ke Selected Objects. |
| Selected Objects | Tinjau objek yang akan disinkronkan. Untuk menghapus objek, pilih objek tersebut lalu klik ikon panah kiri. Untuk mengonfigurasi sinkronisasi tingkat database atau koleksi, klik kanan di Selected Objects. |
Klik Next: Advanced Settings.
Langkah 5: Konfigurasi pengaturan lanjutan
| Parameter | Deskripsi |
|---|---|
| Dedicated Cluster for Task Scheduling | Secara default, DTS menjadwalkan tugas ke klaster bersama. Beli klaster khusus untuk meningkatkan stabilitas. Lihat Apa itu klaster khusus DTS. |
| Retry Time for Failed Connections | Durasi DTS mencoba ulang saat database sumber atau tujuan tidak dapat dijangkau. Rentang valid: 10–1.440 menit. Default: 720. Atur nilai ini lebih dari 30 menit. Jika beberapa tugas berbagi database yang sama, waktu coba ulang terpendek yang berlaku. DTS menagih instance selama periode coba ulang. |
| Retry Time for Other Issues | Durasi DTS mencoba ulang saat operasi DDL atau DML gagal. Rentang valid: 1–1.440 menit. Default: 10. Atur nilai ini lebih dari 10 menit. Nilai ini harus kurang dari Retry Time for Failed Connections. |
| Obtain the entire document after it is updated | Hanya tersedia saat Migration Method adalah ChangeStream. Yes: Menyinkronkan dokumen lengkap setelah diperbarui. No: Hanya menyinkronkan field yang berubah. |
| Enable Throttling for Incremental Data Synchronization | Batasi throughput untuk mengurangi beban pada tujuan. Konfigurasikan RPS of Incremental Data Synchronization dan Data synchronization speed for incremental synchronization (MB/s). |
| Environment Tag | Tag opsional untuk mengidentifikasi instans DTS. |
| Configure ETL | Aktifkan fitur ETL untuk menyaring atau mentransformasi data sebelum mencapai tujuan. Lihat Apa itu ETL? dan Konfigurasi ETL dalam tugas migrasi data atau sinkronisasi data. |
| Monitoring and Alerting | Konfigurasikan peringatan untuk kegagalan tugas atau latensi yang melebihi ambang batas. Lihat Konfigurasi pemantauan dan peringatan saat membuat tugas DTS. |
Langkah 6: Jalankan pemeriksaan awal
Klik Next: Save Task Settings and Precheck.
Untuk melihat pratinjau parameter OpenAPI untuk konfigurasi tugas ini, arahkan pointer ke Next: Save Task Settings and Precheck lalu klik Preview OpenAPI parameters.
DTS menjalankan pemeriksaan awal sebelum memulai tugas. Jika pemeriksaan awal gagal:
Untuk setiap item yang gagal, klik View Details, selesaikan masalahnya, lalu klik Precheck Again.
Untuk item peringatan yang dapat diabaikan: klik Confirm Alert Details > Ignore > OK > Precheck Again. Mengabaikan peringatan dapat menyebabkan inkonsistensi data.
Langkah 7: Beli instans
Tunggu hingga Success Rate mencapai 100%, lalu klik Next: Purchase Instance.
Di halaman pembelian, konfigurasikan parameter berikut.
| Parameter | Deskripsi |
|---|---|
| Billing Method | Subscription: Prabayar; lebih hemat biaya untuk penggunaan jangka panjang. Pay-as-you-go: Ditagih per jam; cocok untuk penggunaan jangka pendek. Lepaskan instans saat tidak lagi digunakan untuk menghindari biaya. |
| Resource Group Settings | Kelompok sumber daya untuk instans. Default: default resource group. Lihat Apa itu Resource Management? |
| Instance Class | Kelas throughput sinkronisasi. Lihat Kelas instans instansi sinkronisasi data. |
| Subscription Duration | Tersedia saat Billing Method adalah Subscription. Opsi: 1–9 bulan, atau 1, 2, 3, atau 5 tahun. |
Setujui Ketentuan Layanan Data Transmission Service (Pay-as-you-go).
Klik Buy and Start, lalu klik OK di kotak dialog.
Tugas muncul di daftar tugas. Pantau perkembangannya dari sana.
Langkah berikutnya
Jika catatan melebihi 16 MB, tugas melaporkan error. Ubah objek yang disinkronkan atau gunakan ETL untuk menyaring catatan besar. Lihat Ubah konfigurasi ETL tugas sinkronisasi data yang sudah ada dan Ubah objek yang akan disinkronkan.
Tulis kode fungsi Anda untuk memproses data yang masuk. Lihat Ikhtisar.
Format data yang diterima fungsi
DTS mengirimkan data ke fungsi sebagai Object. Catatan inkremental berada di field Records sebagai array. Setiap elemen dalam array adalah Object dengan field berikut.
Fungsi menerima dua kategori operasi:
DDL: Perubahan skema — CreateIndex, CreateCollection, DropIndex, DropCollection.
DML: Perubahan data — INSERT, UPDATE, DELETE.
| Field | Tipe | Deskripsi |
|---|---|---|
isDdl | Boolean | True jika DDL; False jika DML. |
type | String | DML: DELETE, UPDATE, atau INSERT. DDL: DDL. |
database | String | Nama database MongoDB. |
table | String | Nama koleksi. |
pkNames | String | Nama kunci utama. Selalu _id untuk MongoDB. |
es | Long | Stempel waktu UNIX (milidetik) saat operasi dijalankan pada database sumber. |
ts | Long | Stempel waktu UNIX (milidetik) saat DTS mulai menulis ke tujuan. |
data | Object Array | Array satu elemen. Elemen tersebut memiliki kunci doc dan string JSON sebagai nilainya. Deserialisasi nilai tersebut untuk mendapatkan catatan. |
old | Object Array | Array tempat data asli disimpan. Format field-nya sama dengan field data. Field ini hanya tersedia saat nilai field type adalah UPDATE. |
id | Int | Nomor seri operasi. |
Contoh DDL
Buat koleksi
Hapus koleksi
Buat indeks
Hapus indeks
Contoh DML
Sisipkan data
Pernyataan SQL:
// Sisipkan beberapa catatan sekaligus
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})
// Sisipkan satu catatan per waktu
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})Data yang diterima fungsi:
{
"Records": [
{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"jack\", \"age\": 20}"}],
"pkNames": ["_id"],
"type": "INSERT",
"es": 1694054783000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054784427
},
{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"lili\", \"age\": 20}"}],
"pkNames": ["_id"],
"type": "INSERT",
"es": 1694054783000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054784428
}
]
}Perbarui data
Pernyataan SQL:
db.user.update({"name":"jack"},{$set:{"age":30}})Data yang diterima fungsi:
{
"Records": [{
"data": [{"doc": "{\"$set\": {\"age\": 30}}"}],
"pkNames": ["_id"],
"old": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
"type": "UPDATE",
"es": 1694054989000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054990555
}]
}Hapus data
Pernyataan SQL:
db.user.remove({"name":"jack"})Data yang diterima fungsi:
{
"Records": [{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
"pkNames": ["_id"],
"type": "DELETE",
"es": 1694055452000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694055452852
}]
}