Use Data Transmission Service (DTS) to stream incremental changes from an ApsaraDB for MongoDB replica set instance directly into a Function Compute function. Each INSERT, UPDATE, DELETE, and DDL event is delivered in Canal JSON format, so your function code can process, transform, or forward the data downstream without building a separate consumer layer.
Prerequisites
Before you begin, make sure you have:
An ApsaraDB for MongoDB replica set instance. See Create a replica set instance.
A Function Compute service and function with Handler Type set to Event Handler. See Quickly create a function.
Billing
Incremental data synchronization is a paid feature. See Billing overview.
Synchronization type | Task configuration fee |
Incremental data synchronization | Charged. For more information, see Billing overview. |
Limitations
Source database
| Constraint | Details |
|---|---|
| Bandwidth | The server hosting the source database must have sufficient outbound bandwidth; otherwise synchronization speed is affected. |
| Primary key or unique key | Collections to be synchronized must have a PRIMARY KEY constraint or UNIQUE constraint, and all fields must be unique. Without this, the destination function may receive duplicate records. |
| Single record size | 16 MB maximum per record. DTS reports an error for larger records. Use the extract, transform, and load (ETL) feature to filter out large fields. |
| Collection count | Up to 1,000 collections per task. For more than 1,000 collections, configure multiple tasks or synchronize at the database level. |
| Unsupported sources | Azure Cosmos DB for MongoDB clusters and Amazon DocumentDB elastic clusters are not supported. |
| oplog / change streams | The oplog must be enabled on the source database and retain at least seven days of log data, OR change streams must be enabled to cover the past seven days. If neither condition is met, DTS may miss changes, which can cause data inconsistency or loss — this is not covered by the DTS service level agreement (SLA). |
| MongoDB version for change streams | Change streams require MongoDB 4.0 or later. Two-way synchronization is not supported when using change streams. |
| Amazon DocumentDB (non-elastic) | Enable change streams and set Migration Method to ChangeStream and Architecture to Sharded Cluster. |
| SRV endpoints | DTS cannot connect to MongoDB over a SRV endpoint. |
Other limitations
DTS cannot synchronize the
admin,config, orlocaldatabases.Full data synchronization is not supported — incremental only.
Cross-region synchronization is not supported.
The object name mapping feature is not supported.
Assign only one DTS task per destination function to avoid data errors.
Transaction context is not preserved. Each transaction is converted to individual records.
If a DTS task fails, DTS technical support attempts to restore it within 8 hours. The task may be restarted and task parameters (not database parameters) may be modified during restoration.
Special cases (self-managed MongoDB)
A primary/secondary switchover while the task is running causes the task to fail.
Synchronization latency is calculated from the latest synchronized record's timestamp versus the current source timestamp. If no writes occur on the source for an extended period, the reported latency may be inaccurate. Write a record to the source database to refresh the latency reading.
If you synchronize an entire database, create a heartbeat table. DTS updates the heartbeat table every second, which keeps the latency reading accurate.
Supported operations
The operations DTS synchronizes depend on the Migration Method you choose.
| Operation | Oplog | Change streams |
|---|---|---|
| INSERT | Supported | Supported |
| UPDATE | Supported | Supported |
| DELETE | Supported | Supported |
| CREATE COLLECTION | Supported | Not supported |
| CREATE INDEX | Supported | Not supported |
| DROP DATABASE | Supported | Supported |
| DROP COLLECTION | Supported | Supported |
| DROP INDEX | Supported | Not supported |
| RENAME COLLECTION | Supported | Supported |
Use oplog
A DTS task does not synchronize incremental data from databases that are created after the task starts to run. DTS synchronizes incremental data generated by the following operations:
CREATE COLLECTION and INDEX
DROP DATABASE, COLLECTION, and INDEX
RENAME COLLECTION
The operations that are performed to insert, update, and delete documents in a collection.
NoteWhen you synchronize incremental data of documents, only update operations that use the
$setcommand are supported.
Use change streams
DTS synchronizes incremental data generated by the following operations:
DROP DATABASE and COLLECTION
RENAME COLLECTION
The operations that are performed to insert, update, and delete documents in a collection.
NoteWhen you synchronize incremental data of documents, only update operations that use the
$setcommand are supported.
When synchronizing incremental data of a file, only the $set command is supported. DTS does not synchronize incremental data from databases created after the task starts (applies to Oplog mode).Required database account permissions
| Database | Required permissions | Reference |
|---|---|---|
| Source MongoDB instance | Read on the source, admin, and local databases | Manage the permissions of MongoDB database users |
Create a synchronization task
Step 1: Open the Data Synchronization page
Use one of the following consoles:
DTS console
Log on to the DTS console.DTS console
In the left-side navigation pane, click Data Synchronization.
In the upper-left corner, select the region where the synchronization task will run.
DMS console
The exact navigation steps vary based on the DMS console mode and layout. See Simple mode and Customize the layout and style of the DMS console.
Log on to the DMS console.DMS console
In the top navigation bar, move the pointer over Data + AI and choose DTS (DTS) > Data Synchronization.
From the drop-down list to the right of Data Synchronization Tasks, select the region.
Step 2: Configure source and destination databases
Click Create Task, then configure the parameters described in the following tables.
General
| Parameter | Description |
|---|---|
| Task Name | A name for the DTS task. DTS generates a default name. Use a descriptive name to identify the task — uniqueness is not required. |
Source database
| Parameter | Description |
|---|---|
| Select Existing Connection | Select a registered database instance from the drop-down list, or configure the connection manually. |
| Database Type | Select MongoDB. |
| Access Method | Select Alibaba Cloud Instance. |
| Instance Region | The region where the source MongoDB instance resides. |
| Replicate Data Across Alibaba Cloud Accounts | Select No for same-account synchronization. |
| Architecture | Select Replica Set. |
| Migration Method | How DTS reads incremental data from the source. Oplog (recommended): Available when the oplog feature is enabled on the source. Oplog provides low latency because of fast log pulling. ChangeStream: Available when change streams are enabled. See Change Streams. If Sharded Cluster is selected for Architecture, the Shard account and Shard password parameters are not required. |
| Instance ID | The ID of the source MongoDB instance. |
| Authentication Database | The database that stores accounts and passwords. Default: admin. |
| Database Account | The account with the required read permissions. |
| Database Password | The password for the database account. |
| Encryption | Connection encryption: Non-encrypted, SSL-encrypted, or Mongo Atlas SSL. Available options depend on the Access Method and Architecture values. SSL-encrypted is unavailable when Architecture is Sharded Cluster and Migration Method is Oplog. For a self-managed MongoDB replica set not using Alibaba Cloud Instance access, you can upload a CA certificate when SSL-encrypted is selected. |
Destination database
| Parameter | Description |
|---|---|
| Select Existing Connection | Select a registered instance from the drop-down list, or configure manually. |
| Database Type | Select Function Compute. |
| Access Method | Select Alibaba Cloud Instance. |
| Instance Region | Defaults to the same region as the source and cannot be changed. |
| Service | The Function Compute service containing the destination function. |
| Function | The destination function that receives the synchronized data. |
| Service Version and Alias | Determines which version of the function receives data: Default Version fixes the service version to LATEST. Specified Version requires configuring the Service Version parameter. Specified Alias requires configuring the Service Alias parameter. See Terms for Function Compute terminology. |
Step 3: Test connectivity
Click Test Connectivity and Proceed at the bottom of the page.
DTS server CIDR blocks must be added to the security settings of the source and destination databases. See Add the CIDR blocks of DTS servers. If the source or destination is a self-managed database not using Alibaba Cloud Instance access, click Test Connectivity in the CIDR Blocks of DTS Servers dialog box.
Step 4: Configure objects to synchronize
In the Configure Objects step, set the following parameters.
| Parameter | Description |
|---|---|
| Synchronization Types | Fixed to Incremental Data Synchronization. |
| Data Format | Fixed to Canal Json. See Data formats of a Kafka cluster for field descriptions. |
| Source Objects | Select databases or collections, then click the right-arrow icon to move them to Selected Objects. |
| Selected Objects | Review the objects to synchronize. To remove an object, select it and click the left-arrow icon. To configure database- or collection-level synchronization, right-click in Selected Objects. |
Click Next: Advanced Settings.
Step 5: Configure advanced settings
| Parameter | Description |
|---|---|
| Dedicated Cluster for Task Scheduling | By default, DTS schedules the task to a shared cluster. Purchase a dedicated cluster to improve stability. See What is a DTS dedicated cluster. |
| Retry Time for Failed Connections | How long DTS retries when the source or destination database is unreachable. Valid range: 10–1,440 minutes. Default: 720. Set this to more than 30 minutes. If multiple tasks share the same database, the shortest retry time takes effect. DTS charges for the instance during retry periods. |
| Retry Time for Other Issues | How long DTS retries when DDL or DML operations fail. Valid range: 1–1,440 minutes. Default: 10. Set this to more than 10 minutes. This value must be less than Retry Time for Failed Connections. |
| Obtain the entire document after it is updated | Available only when Migration Method is ChangeStream. Yes: Synchronize the complete document after an update. No: Synchronize only the changed fields. |
| Enable Throttling for Incremental Data Synchronization | Limit throughput to reduce load on the destination. Configure RPS of Incremental Data Synchronization and Data synchronization speed for incremental synchronization (MB/s). |
| Environment Tag | An optional tag to identify the DTS instance. |
| Configure ETL | Enable the ETL feature to filter or transform data before it reaches the destination. See What is ETL? and Configure ETL in a data migration or data synchronization task. |
| Monitoring and Alerting | Configure alerts for task failures or latency exceeding a threshold. See Configure monitoring and alerting when you create a DTS task. |
Step 6: Run the precheck
Click Next: Save Task Settings and Precheck.
To preview the OpenAPI parameters for this task configuration, move the pointer over Next: Save Task Settings and Precheck and click Preview OpenAPI parameters.
DTS runs a precheck before starting the task. If the precheck fails:
For each failed item, click View Details, resolve the issue, then click Precheck Again.
For alert items that can be ignored: click Confirm Alert Details > Ignore > OK > Precheck Again. Ignoring alerts may result in data inconsistency.
Step 7: Purchase an instance
Wait until Success Rate reaches 100%, then click Next: Purchase Instance.
On the purchase page, configure the following parameters.
| Parameter | Description |
|---|---|
| Billing Method | Subscription: Prepaid; more cost-effective for long-term use. Pay-as-you-go: Billed hourly; suitable for short-term use. Release the instance when no longer needed to avoid charges. |
| Resource Group Settings | The resource group for the instance. Default: default resource group. See What is Resource Management? |
| Instance Class | The synchronization throughput class. See Instance classes of data synchronization instances. |
| Subscription Duration | Available when Billing Method is Subscription. Options: 1–9 months, or 1, 2, 3, or 5 years. |
Accept Data Transmission Service (Pay-as-you-go) Service Terms.
Click Buy and Start, then click OK in the dialog box.
The task appears in the task list. Monitor its progress from there.
What's next
If a record exceeds 16 MB, the task reports an error. Modify the synchronized objects or use ETL to filter large records. See Modify the ETL configurations of an existing data synchronization task and Modify the objects to be synchronized.
Write your function code to process the incoming data. See Overview.
Data format received by the function
DTS delivers data to the function as an Object. Incremental records are in the Records field as an array. Each element in the array is an Object with the following fields.
The function receives two categories of operations:
DDL: Schema changes — CreateIndex, CreateCollection, DropIndex, DropCollection.
DML: Data changes — INSERT, UPDATE, DELETE.
| Field | Type | Description |
|---|---|---|
isDdl | Boolean | True if DDL; False if DML. |
type | String | DML: DELETE, UPDATE, or INSERT. DDL: DDL. |
database | String | The MongoDB database name. |
table | String | The collection name. |
pkNames | String | The primary key name. Always _id for MongoDB. |
es | Long | UNIX timestamp (milliseconds) when the operation ran on the source database. |
ts | Long | UNIX timestamp (milliseconds) when DTS began writing to the destination. |
data | Object Array | One-element array. The element has key doc and a JSON string as its value. Deserialize the value to get the record. |
old | Object Array | The array in which the original data is stored. The format of the field is the same as that of the data field. This field is available only when the value of the type field is UPDATE. |
id | Int | The serial number of the operation. |
DDL examples
Create a collection
Delete a collection
Create an index
Delete an index
DML examples
Insert data
SQL statements:
// Insert multiple records at once
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})
// Insert one record at a time
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})Data received by the function:
{
"Records": [
{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"jack\", \"age\": 20}"}],
"pkNames": ["_id"],
"type": "INSERT",
"es": 1694054783000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054784427
},
{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"lili\", \"age\": 20}"}],
"pkNames": ["_id"],
"type": "INSERT",
"es": 1694054783000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054784428
}
]
}Update data
SQL statement:
db.user.update({"name":"jack"},{$set:{"age":30}})Data received by the function:
{
"Records": [{
"data": [{"doc": "{\"$set\": {\"age\": 30}}"}],
"pkNames": ["_id"],
"old": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
"type": "UPDATE",
"es": 1694054989000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694054990555
}]
}Delete data
SQL statement:
db.user.remove({"name":"jack"})Data received by the function:
{
"Records": [{
"data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
"pkNames": ["_id"],
"type": "DELETE",
"es": 1694055452000,
"database": "MongoDBTest",
"id": 0,
"isDdl": false,
"table": "user",
"ts": 1694055452852
}]
}