All Products
Search
Document Center

Data Transmission Service:Synchronize data from an ApsaraDB for MongoDB sharded cluster instance to a Function Compute function

Last Updated:Mar 28, 2026

Data Transmission Service (DTS) captures incremental changes from an ApsaraDB for MongoDB sharded cluster and delivers each change event to a Function Compute function in Canal JSON format. Write your function code to process, transform, or forward the events downstream.

What your function receives

Each invocation passes an object with a Records array. Every element in the array represents one change event:

FieldTypeDescription
isDdlBooleanTrue for DDL operations; False for DML operations
typeStringDML: INSERT, UPDATE, or DELETE. DDL: DDL
databaseStringMongoDB database name
tableStringCollection name
pkNamesStringPrimary key name. Always _id for MongoDB
esLong13-digit UNIX timestamp (ms) — when the operation occurred on the source
tsLong13-digit UNIX timestamp (ms) — when DTS started writing to the destination
dataObject ArrayOne-element array. The element has a doc key whose value is a JSON string representing the document. Deserialize the value to read the record
oldObject ArraySame format as data. Present only when type is UPDATE; contains the document state before the update
idIntSerial number of the operation

The function receives two categories of operations:

  • DDL — schema changes: CreateIndex, CreateCollection, DropIndex, DropCollection

  • DML — data changes: INSERT, UPDATE, DELETE

Example payloads

Create a collection (DDL)

db.createCollection("testCollection")
{
  "Records": [{
    "data": [{"doc": "{\"create\": \"testCollection\", \"idIndex\": {\"v\": 2, \"key\": {\"_id\": 1}, \"name\": \"_id_\"}}"}],
    "pkNames": ["_id"],
    "type": "DDL",
    "es": 1694056437000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": true,
    "table": "testCollection",
    "ts": 1694056437510
  }]
}

Drop a collection (DDL)

db.testCollection.drop()
{
  "Records": [{
    "data": [{"doc": "{\"drop\": \"testCollection\"}"}],
    "pkNames": ["_id"],
    "type": "DDL",
    "es": 1694056577000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": true,
    "table": "testCollection",
    "ts": 1694056577789
  }]
}

Create an index (DDL)

db.testCollection.createIndex({name: 1})
{
  "Records": [{
    "data": [{"doc": "{\"createIndexes\": \"testCollection\", \"v\": 2, \"key\": {\"name\": 1}, \"name\": \"name_1\"}"}],
    "pkNames": ["_id"],
    "type": "DDL",
    "es": 1694056670000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": true,
    "table": "testCollection",
    "ts": 1694056670719
  }]
}

Drop an index (DDL)

db.testCollection.dropIndex({name: 1})
{
  "Records": [{
    "data": [{"doc": "{\"dropIndexes\": \"testCollection\", \"index\": \"name_1\"}"}],
    "pkNames": ["_id"],
    "type": "DDL",
    "es": 1694056817000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": true,
    "table": "$cmd",
    "ts": 1694056818035
  }]
}

Insert documents (DML)

// Batch insert
db.runCommand({insert: "user", documents: [{"name": "jack", "age": 20}, {"name": "lili", "age": 20}]})

// Single insert
db.user.insert({"name": "jack", "age": 20})
{
  "Records": [
    {
      "data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"jack\", \"age\": 20}"}],
      "pkNames": ["_id"],
      "type": "INSERT",
      "es": 1694054783000,
      "database": "MongoDBTest",
      "id": 0,
      "isDdl": false,
      "table": "user",
      "ts": 1694054784427
    },
    {
      "data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"lili\", \"age\": 20}"}],
      "pkNames": ["_id"],
      "type": "INSERT",
      "es": 1694054783000,
      "database": "MongoDBTest",
      "id": 0,
      "isDdl": false,
      "table": "user",
      "ts": 1694054784428
    }
  ]
}

Update a document (DML)

db.user.update({"name": "jack"}, {$set: {"age": 30}})
{
  "Records": [{
    "data": [{"doc": "{\"$set\": {\"age\": 30}}"}],
    "pkNames": ["_id"],
    "old": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
    "type": "UPDATE",
    "es": 1694054989000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": false,
    "table": "user",
    "ts": 1694054990555
  }]
}
For UPDATE operations, only the $set command runs synchronously when DTS synchronizes incremental data.

Delete a document (DML)

db.user.remove({"name": "jack"})
{
  "Records": [{
    "data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
    "pkNames": ["_id"],
    "type": "DELETE",
    "es": 1694055452000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": false,
    "table": "user",
    "ts": 1694055452852
  }]
}

Limitations

Review these constraints before creating your sync task.

Scope constraints:

  • Only incremental data synchronization is supported. Full data synchronization is not supported.

  • Cross-region synchronization is not supported.

  • DTS cannot synchronize data from the admin, config, or local databases.

  • Object mapping is not supported.

  • Transaction information is not retained. Transactions are converted to individual records at the destination.

Source database constraints:

  • The source must use the sharded cluster architecture, with no more than 10 Mongos nodes.

  • The source cannot be an Azure Cosmos DB for MongoDB cluster or an Amazon DocumentDB elastic cluster.

  • The collections to be synchronized must have PRIMARY KEY or UNIQUE constraints and all fields must be unique. Otherwise, the destination database may contain duplicate data records.

  • A single document cannot exceed 16 MB. Documents over this size cannot be written to the destination function and trigger an error. Use the extract, transform, and load (ETL) feature to filter large fields if needed.

  • Synchronize up to 1,000 collections per task. For more collections, create multiple tasks or synchronize at the database level.

  • The source instance cannot be scaled while a sync task is running.

  • DTS cannot connect to a MongoDB database over an SRV endpoint.

  • If the source database balancer is active, the task may experience delays.

  • If the source database is a self-managed MongoDB database that uses the sharded cluster architecture, set the Access Method parameter to Express Connect, VPN Gateway, or Smart Access Gateway or Cloud Enterprise Network (CEN).

Write constraints:

  • For INSERT operations, the data being inserted must contain shard keys.

  • For UPDATE operations, shard keys cannot be modified.

  • Configure only one DTS task per destination function. Multiple tasks writing to the same function may cause data errors.

Oplog and change stream requirements:

  • The oplog must be enabled and retain at least 7 days of log data, OR change streams must be enabled to cover at least the last 7 days of changes. If neither condition is met, DTS may fail to capture source changes, which can cause data inconsistency or loss not covered by the DTS service level agreement (SLA).

Change stream restrictions (if applicable):

  • Change streams require MongoDB 4.0 or later.

  • Two-way synchronization is not supported when using change streams.

  • For non-elastic Amazon DocumentDB clusters, use change streams: set Migration Method to ChangeStream and Architecture to Sharded Cluster.

New databases:

  • DTS does not synchronize incremental data from databases created after the sync task starts.

Supported operations

The operations captured depend on the migration method.

Using oplog (recommended):

  • CREATE COLLECTION, CREATE INDEX

  • DROP DATABASE, DROP COLLECTION, DROP INDEX

  • RENAME COLLECTION

  • Document-level INSERT, UPDATE, and DELETE

Using change streams:

  • DROP DATABASE, DROP COLLECTION

  • RENAME COLLECTION

  • Document-level INSERT, UPDATE, and DELETE

Billing

Incremental data synchronization is charged. For pricing details, see Billing overview.

Billing methodDescription
SubscriptionPay upfront for 1–9 months, or 1, 2, 3, or 5 years. More cost-effective for long-term use
Pay-as-you-goBilled hourly. Release the instance when no longer needed to stop charges
DTS charges for the instance during connection retry periods.

Prerequisites

Before you begin, make sure that you have:

Create a sync task

Step 1: Open the Data Synchronization page

Go to the Data Synchronization page using either console:

DTS console

  1. Log on to the DTS console.DTS console

  2. In the left-side navigation pane, click Data Synchronization.

  3. In the upper-left corner, select the region where the sync task will run.

DMS console

The exact navigation path depends on your DMS console layout. See Simple mode and Customize the layout and style of the DMS console.

  1. Log on to the DMS console.DMS console

  2. In the top navigation bar, move the pointer over Data + AI and choose DTS (DTS) > Data Synchronization.

  3. From the dropdown to the right of Data Synchronization Tasks, select the target region.

Step 2: Configure source and destination databases

Click Create Task, then configure the following parameters:

SectionParameterDescription
N/ATask NameDTS auto-generates a name. Specify a descriptive name to make the task easy to identify. Uniqueness is not required
Source DatabaseSelect Existing ConnectionSelect a registered database instance to auto-fill the parameters below, or configure them manually
Database TypeSelect MongoDB
Access MethodSelect Alibaba Cloud Instance
Instance RegionRegion of the source MongoDB instance
Replicate Data Across Alibaba Cloud AccountsSelect No for same-account synchronization
ArchitectureSelect Sharded Cluster
Migration MethodHow DTS captures incremental data. Options: Oplog (recommended) or ChangeStream (see details below)
Instance IDID of the source MongoDB instance
Authentication DatabaseDatabase that stores the account credentials. Default: admin
Database AccountSource database account with required read permissions
Database PasswordPassword for the database account
Shard accountAccount for accessing shards in the source instance
Shard passwordPassword for the shard account
EncryptionConnection encryption mode. Options: Non-encrypted, SSL-encrypted, or Mongo Atlas SSL. Available options depend on the Access Method and Architecture selections. SSL-encrypted is unavailable when Architecture is Sharded Cluster and Migration Method is Oplog
Destination DatabaseSelect Existing ConnectionSelect a registered Function Compute instance to auto-fill the parameters below, or configure them manually
Database TypeSelect Function Compute
Access MethodSelect Alibaba Cloud Instance
Instance RegionMatches the source region. Cannot be changed
ServiceThe Function Compute service that contains the destination function
FunctionThe function that receives the synchronized data
Service Version and AliasVersion or alias of the service. Options: Default Version (fixed to LATEST), Specified Version (requires Service Version), or Specified Alias (requires Service Alias). See Terms

Choose a migration method:

MethodWhen to use
Oplog (recommended)The oplog is enabled on the source (the default for both self-managed MongoDB and ApsaraDB for MongoDB). Provides lower synchronization latency due to faster log pulling
ChangeStreamThe oplog is disabled, or you are using a non-elastic Amazon DocumentDB cluster (change streams required). Requires MongoDB 4.0 or later. Two-way sync is not supported
When Architecture is Sharded Cluster and Migration Method is ChangeStream, the Shard account and Shard password parameters are not required.

Step 3: Test connectivity

Click Test Connectivity and Proceed.

    Make sure the DTS server CIDR blocks are added to the security group or allowlist of both the source and destination. See Add the CIDR blocks of DTS servers. If the source or destination uses a self-managed access method, click Test Connectivity in the CIDR Blocks of DTS Servers dialog first.

    Step 4: Select objects to synchronize

    In the Configure Objects step, configure the following:

    ParameterDescription
    Synchronization TypesFixed to Incremental Data Synchronization. Cannot be changed
    Data FormatFixed to Canal Json. For field descriptions, see the Canal Json section of the Data formats of a Kafka cluster topic
    Source ObjectsSelect databases or collections to synchronize, then click 向右 to move them to Selected Objects
    Selected ObjectsReview the selected objects. Click zuoyi to remove objects. Right-click an object to configure synchronization granularity (database or collection level)

    Step 5: Configure advanced settings

    Click Next: Advanced Settings, then configure:

    ParameterDescription
    Dedicated Cluster for Task SchedulingBy default, DTS schedules the task to the shared cluster. Purchase a dedicated cluster for higher stability. See What is a DTS dedicated cluster
    Retry Time for Failed ConnectionsHow long DTS retries when the source or destination is unreachable. Range: 10–1440 minutes. Default: 720. Set to at least 30 minutes. If multiple tasks share the same source or destination, the shortest retry time takes precedence
    Retry Time for Other IssuesHow long DTS retries failed DDL or DML operations. Range: 1–1440 minutes. Default: 10. Set to at least 10 minutes. Must be less than Retry Time for Failed Connections
    Obtain the entire document after it is updatedChangeStream only. Yes: send the full document after an update. No: send only the changed fields
    Enable Throttling for Incremental Data SynchronizationLimit sync throughput to reduce load on the destination. Configure RPS of Incremental Data Synchronization and Data synchronization speed for incremental synchronization (MB/s)
    Environment TagTag to identify the DTS instance environment. Optional
    Configure ETLEnable the ETL feature to transform data in transit. See What is ETL? and Configure ETL in a data migration or data synchronization task
    Monitoring and AlertingSend alerts when the task fails or synchronization latency exceeds a threshold. See Configure monitoring and alerting when you create a DTS task

    Step 6: Run the precheck

    Click Next: Save Task Settings and Precheck.

    To preview the API parameters for this task configuration, hover over Next: Save Task Settings and Precheck and click Preview OpenAPI parameters.

    DTS runs a precheck before starting the sync task. The task starts only after the precheck passes.

    • If an item fails: click View Details next to the failed item, fix the reported issue, then click Precheck Again.

    • If an alert is triggered:

      • If the alert cannot be ignored: click View Details, fix the issue, and rerun the precheck.

      • If the alert can be ignored: click Confirm Alert Details, click Ignore in the dialog, confirm with OK, then click Precheck Again. Ignoring alerts may result in data inconsistency.

    Step 7: Purchase an instance

    1. Wait until the Success Rate reaches 100%, then click Next: Purchase Instance.

    2. On the buy page, configure:

    SectionParameterDescription
    New Instance ClassBilling MethodSubscription or Pay-as-you-go
    Resource Group SettingsResource group for the instance. Default: default resource group. See What is Resource Management?
    Instance ClassDetermines the synchronization speed. See Instance classes of data synchronization instances
    Subscription DurationAvailable only for Subscription billing. Options: 1–9 months, or 1, 2, 3, or 5 years
    1. Read and select Data Transmission Service (Pay-as-you-go) Service Terms.

    2. Click Buy and Start, then click OK in the confirmation dialog.

    Track task progress in the task list.

    What's next

    • Write your Function Compute event handler to process the Canal JSON payload. Use the isDdl field to branch between DDL and DML logic, and deserialize the doc string in each data element to access the document fields.

    • To reduce the load of large-field documents on your function, configure ETL filters before the data reaches the function. See Configure ETL in a data migration or data synchronization task.

    • To monitor synchronization health, configure alerting in Advanced Settings or after the task is created. See Configure monitoring and alerting when you create a DTS task.

    • If a task fails, DTS technical support will attempt to restore it within 8 hours. During restoration, the task may be restarted and task parameters (not database parameters) may be adjusted. For a list of parameters that may be modified, see Modify instance parameters.