All Products
Search
Document Center

Data Transmission Service:Synchronize data from an ApsaraDB for MongoDB replica set instance to a Function Compute function

Last Updated:Mar 28, 2026

Use Data Transmission Service (DTS) to stream incremental changes from an ApsaraDB for MongoDB replica set instance directly into a Function Compute function. Each INSERT, UPDATE, DELETE, and DDL event is delivered in Canal JSON format, so your function code can process, transform, or forward the data downstream without building a separate consumer layer.

Prerequisites

Before you begin, make sure you have:

Billing

Incremental data synchronization is a paid feature. See Billing overview.

Synchronization type

Task configuration fee

Incremental data synchronization

Charged. For more information, see Billing overview.

Limitations

Source database

ConstraintDetails
BandwidthThe server hosting the source database must have sufficient outbound bandwidth; otherwise synchronization speed is affected.
Primary key or unique keyCollections to be synchronized must have a PRIMARY KEY constraint or UNIQUE constraint, and all fields must be unique. Without this, the destination function may receive duplicate records.
Single record size16 MB maximum per record. DTS reports an error for larger records. Use the extract, transform, and load (ETL) feature to filter out large fields.
Collection countUp to 1,000 collections per task. For more than 1,000 collections, configure multiple tasks or synchronize at the database level.
Unsupported sourcesAzure Cosmos DB for MongoDB clusters and Amazon DocumentDB elastic clusters are not supported.
oplog / change streamsThe oplog must be enabled on the source database and retain at least seven days of log data, OR change streams must be enabled to cover the past seven days. If neither condition is met, DTS may miss changes, which can cause data inconsistency or loss — this is not covered by the DTS service level agreement (SLA).
MongoDB version for change streamsChange streams require MongoDB 4.0 or later. Two-way synchronization is not supported when using change streams.
Amazon DocumentDB (non-elastic)Enable change streams and set Migration Method to ChangeStream and Architecture to Sharded Cluster.
SRV endpointsDTS cannot connect to MongoDB over a SRV endpoint.

Other limitations

  • DTS cannot synchronize the admin, config, or local databases.

  • Full data synchronization is not supported — incremental only.

  • Cross-region synchronization is not supported.

  • The object name mapping feature is not supported.

  • Assign only one DTS task per destination function to avoid data errors.

  • Transaction context is not preserved. Each transaction is converted to individual records.

  • If a DTS task fails, DTS technical support attempts to restore it within 8 hours. The task may be restarted and task parameters (not database parameters) may be modified during restoration.

Special cases (self-managed MongoDB)

  • A primary/secondary switchover while the task is running causes the task to fail.

  • Synchronization latency is calculated from the latest synchronized record's timestamp versus the current source timestamp. If no writes occur on the source for an extended period, the reported latency may be inaccurate. Write a record to the source database to refresh the latency reading.

If you synchronize an entire database, create a heartbeat table. DTS updates the heartbeat table every second, which keeps the latency reading accurate.

Supported operations

The operations DTS synchronizes depend on the Migration Method you choose.

OperationOplogChange streams
INSERTSupportedSupported
UPDATESupportedSupported
DELETESupportedSupported
CREATE COLLECTIONSupportedNot supported
CREATE INDEXSupportedNot supported
DROP DATABASESupportedSupported
DROP COLLECTIONSupportedSupported
DROP INDEXSupportedNot supported
RENAME COLLECTIONSupportedSupported

Use oplog

A DTS task does not synchronize incremental data from databases that are created after the task starts to run. DTS synchronizes incremental data generated by the following operations:

  • CREATE COLLECTION and INDEX

  • DROP DATABASE, COLLECTION, and INDEX

  • RENAME COLLECTION

  • The operations that are performed to insert, update, and delete documents in a collection.

    Note

    When you synchronize incremental data of documents, only update operations that use the $set command are supported.

Use change streams

DTS synchronizes incremental data generated by the following operations:

  • DROP DATABASE and COLLECTION

  • RENAME COLLECTION

  • The operations that are performed to insert, update, and delete documents in a collection.

    Note

    When you synchronize incremental data of documents, only update operations that use the $set command are supported.

When synchronizing incremental data of a file, only the $set command is supported. DTS does not synchronize incremental data from databases created after the task starts (applies to Oplog mode).

Required database account permissions

DatabaseRequired permissionsReference
Source MongoDB instanceRead on the source, admin, and local databasesManage the permissions of MongoDB database users

Create a synchronization task

Step 1: Open the Data Synchronization page

Use one of the following consoles:

DTS console

  1. Log on to the DTS console.DTS console

  2. In the left-side navigation pane, click Data Synchronization.

  3. In the upper-left corner, select the region where the synchronization task will run.

DMS console

The exact navigation steps vary based on the DMS console mode and layout. See Simple mode and Customize the layout and style of the DMS console.
  1. Log on to the DMS console.DMS console

  2. In the top navigation bar, move the pointer over Data + AI and choose DTS (DTS) > Data Synchronization.

  3. From the drop-down list to the right of Data Synchronization Tasks, select the region.

Step 2: Configure source and destination databases

Click Create Task, then configure the parameters described in the following tables.

General

ParameterDescription
Task NameA name for the DTS task. DTS generates a default name. Use a descriptive name to identify the task — uniqueness is not required.

Source database

ParameterDescription
Select Existing ConnectionSelect a registered database instance from the drop-down list, or configure the connection manually.
Database TypeSelect MongoDB.
Access MethodSelect Alibaba Cloud Instance.
Instance RegionThe region where the source MongoDB instance resides.
Replicate Data Across Alibaba Cloud AccountsSelect No for same-account synchronization.
ArchitectureSelect Replica Set.
Migration MethodHow DTS reads incremental data from the source. Oplog (recommended): Available when the oplog feature is enabled on the source. Oplog provides low latency because of fast log pulling. ChangeStream: Available when change streams are enabled. See Change Streams. If Sharded Cluster is selected for Architecture, the Shard account and Shard password parameters are not required.
Instance IDThe ID of the source MongoDB instance.
Authentication DatabaseThe database that stores accounts and passwords. Default: admin.
Database AccountThe account with the required read permissions.
Database PasswordThe password for the database account.
EncryptionConnection encryption: Non-encrypted, SSL-encrypted, or Mongo Atlas SSL. Available options depend on the Access Method and Architecture values. SSL-encrypted is unavailable when Architecture is Sharded Cluster and Migration Method is Oplog. For a self-managed MongoDB replica set not using Alibaba Cloud Instance access, you can upload a CA certificate when SSL-encrypted is selected.

Destination database

ParameterDescription
Select Existing ConnectionSelect a registered instance from the drop-down list, or configure manually.
Database TypeSelect Function Compute.
Access MethodSelect Alibaba Cloud Instance.
Instance RegionDefaults to the same region as the source and cannot be changed.
ServiceThe Function Compute service containing the destination function.
FunctionThe destination function that receives the synchronized data.
Service Version and AliasDetermines which version of the function receives data: Default Version fixes the service version to LATEST. Specified Version requires configuring the Service Version parameter. Specified Alias requires configuring the Service Alias parameter. See Terms for Function Compute terminology.

Step 3: Test connectivity

Click Test Connectivity and Proceed at the bottom of the page.

DTS server CIDR blocks must be added to the security settings of the source and destination databases. See Add the CIDR blocks of DTS servers. If the source or destination is a self-managed database not using Alibaba Cloud Instance access, click Test Connectivity in the CIDR Blocks of DTS Servers dialog box.

Step 4: Configure objects to synchronize

In the Configure Objects step, set the following parameters.

ParameterDescription
Synchronization TypesFixed to Incremental Data Synchronization.
Data FormatFixed to Canal Json. See Data formats of a Kafka cluster for field descriptions.
Source ObjectsSelect databases or collections, then click the right-arrow icon to move them to Selected Objects.
Selected ObjectsReview the objects to synchronize. To remove an object, select it and click the left-arrow icon. To configure database- or collection-level synchronization, right-click in Selected Objects.

Click Next: Advanced Settings.

Step 5: Configure advanced settings

ParameterDescription
Dedicated Cluster for Task SchedulingBy default, DTS schedules the task to a shared cluster. Purchase a dedicated cluster to improve stability. See What is a DTS dedicated cluster.
Retry Time for Failed ConnectionsHow long DTS retries when the source or destination database is unreachable. Valid range: 10–1,440 minutes. Default: 720. Set this to more than 30 minutes. If multiple tasks share the same database, the shortest retry time takes effect. DTS charges for the instance during retry periods.
Retry Time for Other IssuesHow long DTS retries when DDL or DML operations fail. Valid range: 1–1,440 minutes. Default: 10. Set this to more than 10 minutes. This value must be less than Retry Time for Failed Connections.
Obtain the entire document after it is updatedAvailable only when Migration Method is ChangeStream. Yes: Synchronize the complete document after an update. No: Synchronize only the changed fields.
Enable Throttling for Incremental Data SynchronizationLimit throughput to reduce load on the destination. Configure RPS of Incremental Data Synchronization and Data synchronization speed for incremental synchronization (MB/s).
Environment TagAn optional tag to identify the DTS instance.
Configure ETLEnable the ETL feature to filter or transform data before it reaches the destination. See What is ETL? and Configure ETL in a data migration or data synchronization task.
Monitoring and AlertingConfigure alerts for task failures or latency exceeding a threshold. See Configure monitoring and alerting when you create a DTS task.

Step 6: Run the precheck

Click Next: Save Task Settings and Precheck.

To preview the OpenAPI parameters for this task configuration, move the pointer over Next: Save Task Settings and Precheck and click Preview OpenAPI parameters.

DTS runs a precheck before starting the task. If the precheck fails:

  • For each failed item, click View Details, resolve the issue, then click Precheck Again.

  • For alert items that can be ignored: click Confirm Alert Details > Ignore > OK > Precheck Again. Ignoring alerts may result in data inconsistency.

Step 7: Purchase an instance

  1. Wait until Success Rate reaches 100%, then click Next: Purchase Instance.

  2. On the purchase page, configure the following parameters.

ParameterDescription
Billing MethodSubscription: Prepaid; more cost-effective for long-term use. Pay-as-you-go: Billed hourly; suitable for short-term use. Release the instance when no longer needed to avoid charges.
Resource Group SettingsThe resource group for the instance. Default: default resource group. See What is Resource Management?
Instance ClassThe synchronization throughput class. See Instance classes of data synchronization instances.
Subscription DurationAvailable when Billing Method is Subscription. Options: 1–9 months, or 1, 2, 3, or 5 years.
  1. Accept Data Transmission Service (Pay-as-you-go) Service Terms.

  2. Click Buy and Start, then click OK in the dialog box.

The task appears in the task list. Monitor its progress from there.

What's next

Data format received by the function

DTS delivers data to the function as an Object. Incremental records are in the Records field as an array. Each element in the array is an Object with the following fields.

The function receives two categories of operations:
DDL: Schema changes — CreateIndex, CreateCollection, DropIndex, DropCollection.
DML: Data changes — INSERT, UPDATE, DELETE.
FieldTypeDescription
isDdlBooleanTrue if DDL; False if DML.
typeStringDML: DELETE, UPDATE, or INSERT. DDL: DDL.
databaseStringThe MongoDB database name.
tableStringThe collection name.
pkNamesStringThe primary key name. Always _id for MongoDB.
esLongUNIX timestamp (milliseconds) when the operation ran on the source database.
tsLongUNIX timestamp (milliseconds) when DTS began writing to the destination.
dataObject ArrayOne-element array. The element has key doc and a JSON string as its value. Deserialize the value to get the record.
oldObject ArrayThe array in which the original data is stored. The format of the field is the same as that of the data field. This field is available only when the value of the type field is UPDATE.
idIntThe serial number of the operation.

DDL examples

Create a collection

Delete a collection

Create an index

Delete an index

DML examples

Insert data

SQL statements:

// Insert multiple records at once
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})

// Insert one record at a time
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})

Data received by the function:

{
  "Records": [
    {
      "data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"jack\", \"age\": 20}"}],
      "pkNames": ["_id"],
      "type": "INSERT",
      "es": 1694054783000,
      "database": "MongoDBTest",
      "id": 0,
      "isDdl": false,
      "table": "user",
      "ts": 1694054784427
    },
    {
      "data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"lili\", \"age\": 20}"}],
      "pkNames": ["_id"],
      "type": "INSERT",
      "es": 1694054783000,
      "database": "MongoDBTest",
      "id": 0,
      "isDdl": false,
      "table": "user",
      "ts": 1694054784428
    }
  ]
}

Update data

SQL statement:

db.user.update({"name":"jack"},{$set:{"age":30}})

Data received by the function:

{
  "Records": [{
    "data": [{"doc": "{\"$set\": {\"age\": 30}}"}],
    "pkNames": ["_id"],
    "old": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
    "type": "UPDATE",
    "es": 1694054989000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": false,
    "table": "user",
    "ts": 1694054990555
  }]
}

Delete data

SQL statement:

db.user.remove({"name":"jack"})

Data received by the function:

{
  "Records": [{
    "data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
    "pkNames": ["_id"],
    "type": "DELETE",
    "es": 1694055452000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": false,
    "table": "user",
    "ts": 1694055452852
  }]
}