Data Transmission Service (DTS) captures incremental changes from an ApsaraDB for MongoDB sharded cluster and delivers each change event to a Function Compute function in Canal JSON format. Write your function code to process, transform, or forward the events downstream.
What your function receives
Each invocation passes an object with a Records array. Every element in the array represents one change event:
| Field | Type | Description |
|---|---|---|
isDdl | Boolean | True for DDL operations; False for DML operations |
type | String | DML: INSERT, UPDATE, or DELETE. DDL: DDL |
database | String | MongoDB database name |
table | String | Collection name |
pkNames | String | Primary key name. Always _id for MongoDB |
es | Long | 13-digit UNIX timestamp (ms) — when the operation occurred on the source |
ts | Long | 13-digit UNIX timestamp (ms) — when DTS started writing to the destination |
data | Object Array | One-element array. The element has a doc key whose value is a JSON string representing the document. Deserialize the value to read the record |
old | Object Array | Same format as data. Present only when type is UPDATE; contains the document state before the update |
id | Int | Serial number of the operation |
The function receives two categories of operations:
DDL — schema changes:
CreateIndex,CreateCollection,DropIndex,DropCollectionDML — data changes:
INSERT,UPDATE,DELETE
Example payloads
Create a collection (DDL)
db.createCollection("testCollection"){
"Records": [{
"data": [{"doc": "{\"create\": \"testCollection\", \"idIndex\": {\"v\": 2, \"key\": {\"_id\": 1}, \"name\": \"_id_\"}}"}],
"pkNames": ["_id"],
"type": "DDL",
"es": 1694056437000,
"database": "MongoDBTest",
"id": 0,
"isDdl": true,
"table": "testCollection",
"ts": 1694056437510
}]
}Drop a collection (DDL)
db.testCollection.drop(){
"Records": [{
"data": [{"doc": "{\"drop\": \"testCollection\"}"}],
"pkNames": ["_id"],
"type": "DDL",
"es": 1694056577000,
"database": "MongoDBTest",
"id": 0,
"isDdl": true,
"table": "testCollection",
"ts": 1694056577789
}]
}Create an index (DDL)
db.testCollection.createIndex({name: 1}){
"Records": [{
"data": [{"doc": "{\"createIndexes\": \"testCollection\", \"v\": 2, \"key\": {\"name\": 1}, \"name\": \"name_1\"}"}],
"pkNames": ["_id"],
"type": "DDL",
"es": 1694056670000,
"database": "MongoDBTest",
"id": 0,
"isDdl": true,
"table": "testCollection",
"ts": 1694056670719
}]
}Drop an index (DDL)
db.testCollection.dropIndex({name: 1}){
"Records": [{
"data": [{"doc": "{\"dropIndexes\": \"testCollection\", \"index\": \"name_1\"}"}],
"pkNames": ["_id"],
"type": "DDL",
"es": 1694056817000,
"database": "MongoDBTest",
"id": 0,
"isDdl": true,
"table": "$cmd",
"ts": 1694056818035
}]
}Insert documents (DML)
// Batch insert
db.runCommand({insert: "user", documents: [{"name": "jack", "age": 20}, {"name": "lili", "age": 20}]})
// Single insert
db.user.insert({"name": "jack", "age": 20}){
"Records": [
{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"jack\", \"age\": 20}"}],
"pkNames": ["_id"],
"type": "INSERT",
"es": 1694054783000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054784427
},
{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"lili\", \"age\": 20}"}],
"pkNames": ["_id"],
"type": "INSERT",
"es": 1694054783000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054784428
}
]
}Update a document (DML)
db.user.update({"name": "jack"}, {$set: {"age": 30}}){
"Records": [{
"data": [{"doc": "{\"$set\": {\"age\": 30}}"}],
"pkNames": ["_id"],
"old": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
"type": "UPDATE",
"es": 1694054989000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054990555
}]
}For UPDATE operations, only the $set command runs synchronously when DTS synchronizes incremental data.Delete a document (DML)
db.user.remove({"name": "jack"}){
"Records": [{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
"pkNames": ["_id"],
"type": "DELETE",
"es": 1694055452000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694055452852
}]
}Limitations
Review these constraints before creating your sync task.
Scope constraints:
Only incremental data synchronization is supported. Full data synchronization is not supported.
Cross-region synchronization is not supported.
DTS cannot synchronize data from the
admin,config, orlocaldatabases.Object mapping is not supported.
Transaction information is not retained. Transactions are converted to individual records at the destination.
Source database constraints:
The source must use the sharded cluster architecture, with no more than 10 Mongos nodes.
The source cannot be an Azure Cosmos DB for MongoDB cluster or an Amazon DocumentDB elastic cluster.
The collections to be synchronized must have PRIMARY KEY or UNIQUE constraints and all fields must be unique. Otherwise, the destination database may contain duplicate data records.
A single document cannot exceed 16 MB. Documents over this size cannot be written to the destination function and trigger an error. Use the extract, transform, and load (ETL) feature to filter large fields if needed.
Synchronize up to 1,000 collections per task. For more collections, create multiple tasks or synchronize at the database level.
The source instance cannot be scaled while a sync task is running.
DTS cannot connect to a MongoDB database over an SRV endpoint.
If the source database balancer is active, the task may experience delays.
If the source database is a self-managed MongoDB database that uses the sharded cluster architecture, set the Access Method parameter to Express Connect, VPN Gateway, or Smart Access Gateway or Cloud Enterprise Network (CEN).
Write constraints:
For INSERT operations, the data being inserted must contain shard keys.
For UPDATE operations, shard keys cannot be modified.
Configure only one DTS task per destination function. Multiple tasks writing to the same function may cause data errors.
Oplog and change stream requirements:
The oplog must be enabled and retain at least 7 days of log data, OR change streams must be enabled to cover at least the last 7 days of changes. If neither condition is met, DTS may fail to capture source changes, which can cause data inconsistency or loss not covered by the DTS service level agreement (SLA).
Change stream restrictions (if applicable):
Change streams require MongoDB 4.0 or later.
Two-way synchronization is not supported when using change streams.
For non-elastic Amazon DocumentDB clusters, use change streams: set Migration Method to ChangeStream and Architecture to Sharded Cluster.
New databases:
DTS does not synchronize incremental data from databases created after the sync task starts.
Supported operations
The operations captured depend on the migration method.
Using oplog (recommended):
CREATE COLLECTION,CREATE INDEXDROP DATABASE,DROP COLLECTION,DROP INDEXRENAME COLLECTIONDocument-level INSERT, UPDATE, and DELETE
Using change streams:
DROP DATABASE,DROP COLLECTIONRENAME COLLECTIONDocument-level INSERT, UPDATE, and DELETE
Billing
Incremental data synchronization is charged. For pricing details, see Billing overview.
| Billing method | Description |
|---|---|
| Subscription | Pay upfront for 1–9 months, or 1, 2, 3, or 5 years. More cost-effective for long-term use |
| Pay-as-you-go | Billed hourly. Release the instance when no longer needed to stop charges |
DTS charges for the instance during connection retry periods.
Prerequisites
Before you begin, make sure that you have:
A running ApsaraDB for MongoDB sharded cluster instance. See Create a sharded cluster instance
A Function Compute service and function, with Handler Type set to Event Handler. See Quickly create a function
A database account on the source ApsaraDB for MongoDB instance with read permissions on the source,
admin, andlocaldatabases. See Manage the permissions of MongoDB database users
Create a sync task
Step 1: Open the Data Synchronization page
Go to the Data Synchronization page using either console:
DTS console
Log on to the DTS console.DTS console
In the left-side navigation pane, click Data Synchronization.
In the upper-left corner, select the region where the sync task will run.
DMS console
The exact navigation path depends on your DMS console layout. See Simple mode and Customize the layout and style of the DMS console.
Log on to the DMS console.DMS console
In the top navigation bar, move the pointer over Data + AI and choose DTS (DTS) > Data Synchronization.
From the dropdown to the right of Data Synchronization Tasks, select the target region.
Step 2: Configure source and destination databases
Click Create Task, then configure the following parameters:
| Section | Parameter | Description |
|---|---|---|
| N/A | Task Name | DTS auto-generates a name. Specify a descriptive name to make the task easy to identify. Uniqueness is not required |
| Source Database | Select Existing Connection | Select a registered database instance to auto-fill the parameters below, or configure them manually |
| Database Type | Select MongoDB | |
| Access Method | Select Alibaba Cloud Instance | |
| Instance Region | Region of the source MongoDB instance | |
| Replicate Data Across Alibaba Cloud Accounts | Select No for same-account synchronization | |
| Architecture | Select Sharded Cluster | |
| Migration Method | How DTS captures incremental data. Options: Oplog (recommended) or ChangeStream (see details below) | |
| Instance ID | ID of the source MongoDB instance | |
| Authentication Database | Database that stores the account credentials. Default: admin | |
| Database Account | Source database account with required read permissions | |
| Database Password | Password for the database account | |
| Shard account | Account for accessing shards in the source instance | |
| Shard password | Password for the shard account | |
| Encryption | Connection encryption mode. Options: Non-encrypted, SSL-encrypted, or Mongo Atlas SSL. Available options depend on the Access Method and Architecture selections. SSL-encrypted is unavailable when Architecture is Sharded Cluster and Migration Method is Oplog | |
| Destination Database | Select Existing Connection | Select a registered Function Compute instance to auto-fill the parameters below, or configure them manually |
| Database Type | Select Function Compute | |
| Access Method | Select Alibaba Cloud Instance | |
| Instance Region | Matches the source region. Cannot be changed | |
| Service | The Function Compute service that contains the destination function | |
| Function | The function that receives the synchronized data | |
| Service Version and Alias | Version or alias of the service. Options: Default Version (fixed to LATEST), Specified Version (requires Service Version), or Specified Alias (requires Service Alias). See Terms |
Choose a migration method:
| Method | When to use |
|---|---|
| Oplog (recommended) | The oplog is enabled on the source (the default for both self-managed MongoDB and ApsaraDB for MongoDB). Provides lower synchronization latency due to faster log pulling |
| ChangeStream | The oplog is disabled, or you are using a non-elastic Amazon DocumentDB cluster (change streams required). Requires MongoDB 4.0 or later. Two-way sync is not supported |
When Architecture is Sharded Cluster and Migration Method is ChangeStream, the Shard account and Shard password parameters are not required.
Step 3: Test connectivity
Click Test Connectivity and Proceed.
Make sure the DTS server CIDR blocks are added to the security group or allowlist of both the source and destination. See Add the CIDR blocks of DTS servers. If the source or destination uses a self-managed access method, click Test Connectivity in the CIDR Blocks of DTS Servers dialog first.
Step 4: Select objects to synchronize
In the Configure Objects step, configure the following:
| Parameter | Description |
|---|---|
| Synchronization Types | Fixed to Incremental Data Synchronization. Cannot be changed |
| Data Format | Fixed to Canal Json. For field descriptions, see the Canal Json section of the Data formats of a Kafka cluster topic |
| Source Objects | Select databases or collections to synchronize, then click |
| Selected Objects | Review the selected objects. Click |
Step 5: Configure advanced settings
Click Next: Advanced Settings, then configure:
| Parameter | Description |
|---|---|
| Dedicated Cluster for Task Scheduling | By default, DTS schedules the task to the shared cluster. Purchase a dedicated cluster for higher stability. See What is a DTS dedicated cluster |
| Retry Time for Failed Connections | How long DTS retries when the source or destination is unreachable. Range: 10–1440 minutes. Default: 720. Set to at least 30 minutes. If multiple tasks share the same source or destination, the shortest retry time takes precedence |
| Retry Time for Other Issues | How long DTS retries failed DDL or DML operations. Range: 1–1440 minutes. Default: 10. Set to at least 10 minutes. Must be less than Retry Time for Failed Connections |
| Obtain the entire document after it is updated | ChangeStream only. Yes: send the full document after an update. No: send only the changed fields |
| Enable Throttling for Incremental Data Synchronization | Limit sync throughput to reduce load on the destination. Configure RPS of Incremental Data Synchronization and Data synchronization speed for incremental synchronization (MB/s) |
| Environment Tag | Tag to identify the DTS instance environment. Optional |
| Configure ETL | Enable the ETL feature to transform data in transit. See What is ETL? and Configure ETL in a data migration or data synchronization task |
| Monitoring and Alerting | Send alerts when the task fails or synchronization latency exceeds a threshold. See Configure monitoring and alerting when you create a DTS task |
Step 6: Run the precheck
Click Next: Save Task Settings and Precheck.
To preview the API parameters for this task configuration, hover over Next: Save Task Settings and Precheck and click Preview OpenAPI parameters.
DTS runs a precheck before starting the sync task. The task starts only after the precheck passes.
If an item fails: click View Details next to the failed item, fix the reported issue, then click Precheck Again.
If an alert is triggered:
If the alert cannot be ignored: click View Details, fix the issue, and rerun the precheck.
If the alert can be ignored: click Confirm Alert Details, click Ignore in the dialog, confirm with OK, then click Precheck Again. Ignoring alerts may result in data inconsistency.
Step 7: Purchase an instance
Wait until the Success Rate reaches 100%, then click Next: Purchase Instance.
On the buy page, configure:
| Section | Parameter | Description |
|---|---|---|
| New Instance Class | Billing Method | Subscription or Pay-as-you-go |
| Resource Group Settings | Resource group for the instance. Default: default resource group. See What is Resource Management? | |
| Instance Class | Determines the synchronization speed. See Instance classes of data synchronization instances | |
| Subscription Duration | Available only for Subscription billing. Options: 1–9 months, or 1, 2, 3, or 5 years |
Read and select Data Transmission Service (Pay-as-you-go) Service Terms.
Click Buy and Start, then click OK in the confirmation dialog.
Track task progress in the task list.
What's next
Write your Function Compute event handler to process the Canal JSON payload. Use the
isDdlfield to branch between DDL and DML logic, and deserialize thedocstring in eachdataelement to access the document fields.To reduce the load of large-field documents on your function, configure ETL filters before the data reaches the function. See Configure ETL in a data migration or data synchronization task.
To monitor synchronization health, configure alerting in Advanced Settings or after the task is created. See Configure monitoring and alerting when you create a DTS task.
If a task fails, DTS technical support will attempt to restore it within 8 hours. During restoration, the task may be restarted and task parameters (not database parameters) may be adjusted. For a list of parameters that may be modified, see Modify instance parameters.