All Products
Search
Document Center

Data Transmission Service:Sinkronisasi data dari instans kluster sharded ApsaraDB for MongoDB ke fungsi Function Compute

Last Updated:Mar 29, 2026

Layanan Transmisi Data (DTS) menangkap perubahan inkremental dari instans kluster sharded ApsaraDB for MongoDB dan mengirimkan setiap event perubahan ke fungsi Function Compute dalam format Canal JSON. Tulis kode fungsi Anda untuk memproses, mentransformasi, atau meneruskan event tersebut ke sistem downstream.

Apa yang diterima fungsi Anda

Setiap pemanggilan meneruskan objek dengan array Records. Setiap elemen dalam array merepresentasikan satu event perubahan:

FieldTypeDescription
isDdlBooleanTrue untuk operasi DDL; False untuk operasi DML
typeStringDML: INSERT, UPDATE, atau DELETE. DDL: DDL
databaseStringNama database MongoDB
tableStringNama koleksi
pkNamesStringNama kunci primer. Selalu _id untuk MongoDB
esLongStempel waktu UNIX 13 digit (ms) — saat operasi terjadi di sumber
tsLongStempel waktu UNIX 13 digit (ms) — saat DTS mulai menulis ke tujuan
dataObject ArrayArray berisi satu elemen. Elemen tersebut memiliki kunci doc yang nilainya adalah string JSON yang merepresentasikan dokumen. Deserialisasi nilai tersebut untuk membaca catatan
oldObject ArrayFormatnya sama seperti data. Hanya tersedia ketika type bernilai UPDATE; berisi status dokumen sebelum pembaruan
idIntNomor seri operasi

Fungsi menerima dua kategori operasi:

  • DDL — perubahan skema: CreateIndex, CreateCollection, DropIndex, DropCollection

  • DML — perubahan data: INSERT, UPDATE, DELETE

Contoh muatan

Membuat koleksi (DDL)

db.createCollection("testCollection")
{
  "Records": [{
    "data": [{"doc": "{\"create\": \"testCollection\", \"idIndex\": {\"v\": 2, \"key\": {\"_id\": 1}, \"name\": \"_id_\"}}"}],
    "pkNames": ["_id"],
    "type": "DDL",
    "es": 1694056437000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": true,
    "table": "testCollection",
    "ts": 1694056437510
  }]
}

Menghapus koleksi (DDL)

db.testCollection.drop()
{
  "Records": [{
    "data": [{"doc": "{\"drop\": \"testCollection\"}"}],
    "pkNames": ["_id"],
    "type": "DDL",
    "es": 1694056577000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": true,
    "table": "testCollection",
    "ts": 1694056577789
  }]
}

Membuat indeks (DDL)

db.testCollection.createIndex({name: 1})
{
  "Records": [{
    "data": [{"doc": "{\"createIndexes\": \"testCollection\", \"v\": 2, \"key\": {\"name\": 1}, \"name\": \"name_1\"}"}],
    "pkNames": ["_id"],
    "type": "DDL",
    "es": 1694056670000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": true,
    "table": "testCollection",
    "ts": 1694056670719
  }]
}

Menghapus indeks (DDL)

db.testCollection.dropIndex({name: 1})
{
  "Records": [{
    "data": [{"doc": "{\"dropIndexes\": \"testCollection\", \"index\": \"name_1\"}"}],
    "pkNames": ["_id"],
    "type": "DDL",
    "es": 1694056817000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": true,
    "table": "$cmd",
    "ts": 1694056818035
  }]
}

Menyisipkan dokumen (DML)

// Batch insert
db.runCommand({insert: "user", documents: [{"name": "jack", "age": 20}, {"name": "lili", "age": 20}]})

// Single insert
db.user.insert({"name": "jack", "age": 20})
{
  "Records": [
    {
      "data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"jack\", \"age\": 20}"}],
      "pkNames": ["_id"],
      "type": "INSERT",
      "es": 1694054783000,
      "database": "MongoDBTest",
      "id": 0,
      "isDdl": false,
      "table": "user",
      "ts": 1694054784427
    },
    {
      "data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"lili\", \"age\": 20}"}],
      "pkNames": ["_id"],
      "type": "INSERT",
      "es": 1694054783000,
      "database": "MongoDBTest",
      "id": 0,
      "isDdl": false,
      "table": "user",
      "ts": 1694054784428
    }
  ]
}

Memperbarui dokumen (DML)

db.user.update({"name": "jack"}, {$set: {"age": 30}})
{
  "Records": [{
    "data": [{"doc": "{\"$set\": {\"age\": 30}}"}],
    "pkNames": ["_id"],
    "old": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
    "type": "UPDATE",
    "es": 1694054989000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": false,
    "table": "user",
    "ts": 1694054990555
  }]
}
Untuk operasi UPDATE, hanya perintah $set yang disinkronkan secara sinkron oleh DTS saat menyinkronkan Data inkremental.

Menghapus dokumen (DML)

db.user.remove({"name": "jack"})
{
  "Records": [{
    "data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
    "pkNames": ["_id"],
    "type": "DELETE",
    "es": 1694055452000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": false,
    "table": "user",
    "ts": 1694055452852
  }]
}

Batasan

Tinjau batasan berikut sebelum membuat tugas sinkronisasi Anda.

Batasan cakupan:

  • Hanya sinkronisasi data inkremental yang didukung. Sinkronisasi data penuh tidak didukung.

  • Sinkronisasi lintas wilayah tidak didukung.

  • DTS tidak dapat menyinkronkan data dari database admin, config, atau local.

  • Pemetaan objek tidak didukung.

  • Informasi transaksi tidak dipertahankan. Transaksi dikonversi menjadi catatan individual di tujuan.

Batasan database sumber:

  • Sumber harus menggunakan arsitektur kluster sharded dengan tidak lebih dari 10 node Mongos.

  • Sumber tidak boleh merupakan kluster Azure Cosmos DB for MongoDB atau kluster elastis Amazon DocumentDB.

  • Koleksi yang akan disinkronkan harus memiliki kendala PRIMARY KEY atau UNIQUE, dan semua field harus unik. Jika tidak, database tujuan mungkin berisi catatan data duplikat.

  • Ukuran dokumen tunggal tidak boleh melebihi 16 MB. Dokumen yang melebihi batas ini tidak dapat ditulis ke fungsi tujuan dan akan memicu error. Gunakan fitur ekstrak, transformasi, dan muat (ETL) untuk memfilter field besar jika diperlukan.

  • Sinkronkan hingga 1.000 koleksi per tugas. Untuk jumlah koleksi yang lebih banyak, buat beberapa tugas atau lakukan sinkronisasi pada tingkat database.

  • Instans sumber tidak boleh diskalakan selama tugas sinkronisasi berjalan.

  • DTS tidak dapat terhubung ke database MongoDB melalui titik akhir SRV.

  • Jika penyeimbang (balancer) database sumber aktif, tugas mungkin mengalami penundaan.

  • Jika database sumber adalah database MongoDB yang dikelola sendiri dengan arsitektur kluster sharded, atur parameter Access Method ke Express Connect, VPN Gateway, atau Smart Access Gateway atau Cloud Enterprise Network (CEN).

Tulis kendala:

  • Untuk operasi INSERT, data yang dimasukkan harus berisi kunci shard.

  • Untuk operasi UPDATE, kunci shard tidak boleh dimodifikasi.

  • Konfigurasikan hanya satu tugas DTS per fungsi tujuan. Beberapa tugas yang menulis ke fungsi yang sama dapat menyebabkan kesalahan data.

Persyaratan oplog dan change stream:

  • Oplog harus diaktifkan dan menyimpan data log minimal selama 7 hari, ATAU change stream harus diaktifkan untuk mencakup perubahan minimal 7 hari terakhir. Jika kedua kondisi tersebut tidak terpenuhi, DTS mungkin gagal menangkap perubahan sumber, yang dapat menyebabkan inkonsistensi atau kehilangan data yang tidak dicakup oleh perjanjian tingkat layanan (SLA) DTS.

Pembatasan change stream (jika berlaku):

  • Change stream memerlukan MongoDB 4.0 atau versi lebih baru.

  • Sinkronisasi dua arah tidak didukung saat menggunakan change stream.

  • Untuk kluster Amazon DocumentDB non-elastis, gunakan change stream: atur Migration Method ke ChangeStream dan Architecture ke Sharded Cluster.

Database baru:

  • DTS tidak menyinkronkan data inkremental dari database yang dibuat setelah tugas sinkronisasi dimulai.

Operasi yang didukung

Operasi yang ditangkap bergantung pada metode migrasi.

Menggunakan oplog (direkomendasikan):

  • CREATE COLLECTION, CREATE INDEX

  • DROP DATABASE, DROP COLLECTION, DROP INDEX

  • RENAME COLLECTION

  • INSERT, UPDATE, dan DELETE tingkat dokumen

Menggunakan aliran perubahan:

  • DROP DATABASE, DROP COLLECTION

  • RENAME COLLECTION

  • INSERT, UPDATE, dan DELETE tingkat dokumen

Penagihan

Sinkronisasi data inkremental dikenai biaya. Untuk detail harga, lihat Ikhtisar penagihan.

Metode penagihanDeskripsi
LanggananBayar di muka untuk 1–9 bulan, atau 1, 2, 3, atau 5 tahun. Lebih hemat biaya untuk penggunaan jangka panjang
Pay-as-you-goDitagih per jam. Lepaskan instans saat tidak lagi digunakan untuk menghentikan biaya
DTS menagih instans selama periode percobaan koneksi ulang.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

Buat tugas sinkronisasi

Langkah 1: Buka halaman Sinkronisasi Data

Buka halaman Data Synchronization menggunakan salah satu konsol berikut:

Konsol DTS

  1. Login ke Konsol DTS.Konsol DTS

  2. Di panel navigasi kiri, klik Data Synchronization.

  3. Di pojok kiri atas, pilih wilayah tempat tugas sinkronisasi akan dijalankan.

Konsol DMS

Jalur navigasi tepatnya bergantung pada tata letak konsol DMS Anda. Lihat Mode simple dan Sesuaikan tata letak dan gaya konsol DMS.

  1. Login ke Konsol DMS.Konsol DMS

  2. Di bilah navigasi atas, arahkan pointer ke Data + AI lalu pilih DTS (DTS) > Data Synchronization.

  3. Dari dropdown di sebelah kanan Data Synchronization Tasks, pilih wilayah target.

Langkah 2: Konfigurasi database sumber dan tujuan

Klik Create Task, lalu konfigurasi parameter berikut:

BagianParameterDeskripsi
N/ATask NameDTS menghasilkan nama secara otomatis. Tentukan nama deskriptif agar tugas mudah diidentifikasi. Tidak perlu unik
Source DatabaseSelect Existing ConnectionPilih instans database terdaftar untuk mengisi parameter di bawah secara otomatis, atau konfigurasi secara manual
Database TypePilih MongoDB
Access MethodPilih Alibaba Cloud Instance
Instance RegionWilayah instans MongoDB sumber
Replicate Data Across Alibaba Cloud AccountsPilih No untuk sinkronisasi dalam akun yang sama
ArchitecturePilih Sharded Cluster
Migration MethodCara DTS menangkap data inkremental. Opsi: Oplog (direkomendasikan) atau ChangeStream (lihat detail di bawah)
Instance IDID instans MongoDB sumber
Authentication DatabaseDatabase yang menyimpan kredensial akun. Default: admin
Database AccountAkun database sumber dengan izin baca yang diperlukan
Database PasswordKata sandi untuk akun database
Shard accountAkun untuk mengakses shard di instans sumber
Shard passwordKata sandi untuk akun shard
EncryptionMode enkripsi koneksi. Opsi: Non-encrypted, SSL-encrypted, atau Mongo Atlas SSL. Opsi yang tersedia bergantung pada pilihan Access Method dan Architecture. SSL-encrypted tidak tersedia saat Architecture adalah Sharded Cluster dan Migration Method adalah Oplog
Destination DatabaseSelect Existing ConnectionPilih instans Function Compute terdaftar untuk mengisi parameter di bawah secara otomatis, atau konfigurasi secara manual
Database TypePilih Function Compute
Access MethodPilih Alibaba Cloud Instance
Instance RegionSesuai dengan wilayah sumber. Tidak dapat diubah
ServiceLayanan Function Compute yang berisi fungsi tujuan
FunctionFungsi yang menerima data yang disinkronkan
Service Version and AliasVersi atau alias layanan. Opsi: Default Version (tetap ke LATEST), Specified Version (memerlukan Service Version), atau Specified Alias (memerlukan Service Alias). Lihat Terms

Pilih metode migrasi:

MetodeKapan digunakan
Oplog (direkomendasikan)Oplog diaktifkan di sumber (default baik untuk MongoDB yang dikelola sendiri maupun ApsaraDB for MongoDB). Memberikan latensi sinkronisasi lebih rendah karena penarikan log lebih cepat
ChangeStreamOplog dinonaktifkan, atau Anda menggunakan kluster Amazon DocumentDB non-elastis (change stream wajib). Memerlukan MongoDB 4.0 atau lebih baru. Sinkronisasi dua arah tidak didukung
Saat Architecture adalah Sharded Cluster dan Migration Method adalah ChangeStream, parameter Shard account dan Shard password tidak diperlukan.

Langkah 3: Uji konektivitas

Klik Test Connectivity and Proceed.

    Pastikan blok CIDR server DTS telah ditambahkan ke grup keamanan atau daftar izin (allowlist) baik sumber maupun tujuan. Lihat Tambahkan blok CIDR server DTS. Jika sumber atau tujuan menggunakan metode akses yang dikelola sendiri, klik Test Connectivity di dialog CIDR Blocks of DTS Servers terlebih dahulu.

    Langkah 4: Pilih objek untuk disinkronkan

    Pada langkah Configure Objects, konfigurasi hal berikut:

    ParameterDeskripsi
    Synchronization TypesTetap ke Incremental Data Synchronization. Tidak dapat diubah
    Data FormatTetap ke Canal Json. Untuk deskripsi field, lihat bagian Canal Json dalam format data topik kluster Kafka
    Source ObjectsPilih database atau koleksi untuk disinkronkan, lalu klik 向右 untuk memindahkannya ke Selected Objects
    Selected ObjectsTinjau objek yang dipilih. Klik zuoyi untuk menghapus objek. Klik kanan objek untuk mengonfigurasi granularitas sinkronisasi (tingkat database atau koleksi)

    Langkah 5: Konfigurasi pengaturan lanjutan

    Klik Next: Advanced Settings, lalu konfigurasi:

    ParameterDeskripsi
    Dedicated Cluster for Task SchedulingSecara default, DTS menjadwalkan tugas ke kluster bersama. Beli kluster khusus untuk stabilitas lebih tinggi. Lihat Apa itu kluster khusus DTS
    Retry Time for Failed ConnectionsDurasi DTS mencoba ulang saat sumber atau tujuan tidak dapat dijangkau. Rentang: 10–1440 menit. Default: 720. Atur minimal 30 menit. Jika beberapa tugas berbagi sumber atau tujuan yang sama, waktu retry terpendek yang berlaku
    Retry Time for Other IssuesDurasi DTS mencoba ulang operasi DDL atau DML yang gagal. Rentang: 1–1440 menit. Default: 10. Atur minimal 10 menit. Harus lebih pendek dari Retry Time for Failed Connections
    Obtain the entire document after it is updatedHanya untuk ChangeStream. Yes: kirim dokumen lengkap setelah pembaruan. No: kirim hanya field yang berubah
    Enable Throttling for Incremental Data SynchronizationBatasi throughput sinkronisasi untuk mengurangi beban pada tujuan. Konfigurasi RPS of Incremental Data Synchronization dan Data synchronization speed for incremental synchronization (MB/s)
    Environment TagTag untuk mengidentifikasi lingkungan instans DTS. Opsional
    Configure ETLAktifkan fitur ETL untuk mentransformasi data saat transit. Lihat Apa itu ETL? dan Konfigurasi ETL dalam tugas migrasi atau sinkronisasi data
    Monitoring and AlertingKirim peringatan saat tugas gagal atau latensi sinkronisasi melebihi ambang batas. Lihat Konfigurasi pemantauan dan peringatan saat membuat tugas DTS

    Langkah 6: Jalankan pemeriksaan awal

    Klik Next: Save Task Settings and Precheck.

    Untuk melihat pratinjau parameter API untuk konfigurasi tugas ini, arahkan kursor ke Next: Save Task Settings and Precheck lalu klik Preview OpenAPI parameters.

    DTS menjalankan pemeriksaan awal sebelum memulai tugas sinkronisasi. Tugas hanya dimulai setelah pemeriksaan awal berhasil.

    • Jika suatu item gagal: klik View Details di sebelah item yang gagal, perbaiki masalah yang dilaporkan, lalu klik Precheck Again.

    • Jika peringatan dipicu:

      • Jika peringatan tidak dapat diabaikan: klik View Details, perbaiki masalah, dan jalankan ulang pemeriksaan awal.

      • Jika peringatan dapat diabaikan: klik Confirm Alert Details, klik Ignore di dialog, konfirmasi dengan OK, lalu klik Precheck Again. Mengabaikan peringatan dapat menyebabkan inkonsistensi data.

    Langkah 7: Beli instans

    1. Tunggu hingga Success Rate mencapai 100%, lalu klik Next: Purchase Instance.

    2. Di halaman buy, konfigurasi:

    BagianParameterDeskripsi
    New Instance ClassBilling MethodSubscription atau Pay-as-you-go
    Resource Group SettingsKelompok sumber daya untuk instans. Default: default resource group. Lihat Apa itu Resource Management?
    Instance ClassMenentukan kecepatan sinkronisasi. Lihat Kelas instans untuk instansi sinkronisasi data
    Subscription DurationHanya tersedia untuk penagihan Subscription. Opsi: 1–9 bulan, atau 1, 2, 3, atau 5 tahun
    1. Baca dan pilih Data Transmission Service (Pay-as-you-go) Service Terms.

    2. Klik Buy and Start, lalu klik OK di dialog konfirmasi.

    Lacak progres tugas di daftar tugas.

    Langkah selanjutnya

    • Tulis penanganan event Function Compute Anda untuk memproses muatan Canal JSON. Gunakan field isDdl untuk membedakan logika DDL dan DML, serta deserialisasi string doc di setiap elemen data untuk mengakses field dokumen.

    • Untuk mengurangi beban dokumen dengan field besar pada fungsi Anda, konfigurasi filter ETL sebelum data mencapai fungsi. Lihat Konfigurasi ETL dalam tugas migrasi atau sinkronisasi data.

    • Untuk memantau kesehatan sinkronisasi, konfigurasi peringatan di Advanced Settings atau setelah tugas dibuat. Lihat Konfigurasi pemantauan dan peringatan saat membuat tugas DTS.

    • Jika tugas gagal, dukungan teknis DTS akan berusaha memulihkannya dalam waktu 8 jam. Selama pemulihan, tugas mungkin dimulai ulang dan parameter tugas (bukan parameter database) mungkin disesuaikan. Untuk daftar parameter yang mungkin dimodifikasi, lihat Modifikasi parameter instans.