Data Transmission Service (DTS) lets you stream change data from ApsaraDB for MongoDB to a Message Queue for Apache Kafka instance. The following steps walk through creating a synchronization task using a MongoDB replica set as the source and a Kafka instance as the destination.
What this task covers:
Supported architectures: Replica set and sharded cluster
Incremental sync methods: Oplog (recommended) and Change Streams
Data format: Canal JSON delivered to Kafka topics
Sync scope: Collection-level; full and incremental synchronization
Billing: Full data synchronization is free; incremental data synchronization is charged
Prerequisites
Before you begin, ensure that you have:
A Message Queue for Apache Kafka instance
A topic created in the destination Kafka instance to receive data
(Sharded cluster only) Endpoints applied for all shard nodes; all shard nodes must share the same account password and endpoint. See Apply for an endpoint for a shard
For supported source and destination database versions, see Synchronization solution overview.
Limitations
Review the following limitations before creating a synchronization task.
Source database limits
The source server must have enough outbound bandwidth. Insufficient bandwidth reduces synchronization speed.
When configuring name mapping for collections, a single task can synchronize up to 1,000 collections. Exceeding this limit causes a request error. To synchronize more than 1,000 collections, create multiple tasks or synchronize the entire database without name mapping.
For sharded cluster sources: the
_idfield in each synchronized collection must be unique, or data inconsistency may occur.For sharded cluster sources: the number of mongos nodes cannot exceed 10, and the instance must not contain orphaned documents. See the FAQ topic for how to remove orphaned documents.
Standalone ApsaraDB for MongoDB instances, Azure Cosmos DB for MongoDB clusters, and Amazon DocumentDB elastic clusters are not supported as sources.
DTS cannot connect to MongoDB over an SRV endpoint.
The oplog must be enabled and retain log data for at least 7 days, OR change streams must be enabled so that DTS can subscribe to data changes from the last 7 days. If neither condition is met, DTS may fail to fetch source changes, and data loss or inconsistency may occur outside the scope of the service level agreement (SLA).
Important- Use the oplog to record source database changes (recommended). - Change streams require MongoDB 4.0 or later. Two-way synchronization is not supported when using change streams. - For non-elastic Amazon DocumentDB clusters, enable change streams and set Migration Method to ChangeStream and Architecture to Sharded Cluster.
During full data synchronization, do not modify database or collection schemas, or change data of the ARRAY type.
For sharded cluster sources, do not run the following commands during synchronization:
shardCollection,reshardCollection,unshardCollection,moveCollection, ormovePrimary. These commands can cause data inconsistency.If the source database is a MongoDB instance that uses the sharded cluster architecture and the balancer of the source database balances data, latency may occur in the instance.
Other limits
Only collection-level synchronization is supported.
The
admin,config, andlocaldatabases cannot be synchronized.A single record exceeding 10 MB causes the task to fail.
Transactions are not preserved. DTS converts each transaction into individual records at the destination.
If broker nodes are added or removed in the destination Kafka instance while a DTS task is running, restart the DTS task.
Make sure that DTS can connect to the source and destination instances. For example, the security settings of the database instances and the
listenersandadvertised.listenersparameters in theserver.propertiesfile of a self-managed Kafka instance do not restrict access from DTS.Run synchronization during off-peak hours when CPU utilization on both source and destination databases is below 30%.
DTS automatically retries failed instances that have been running for less than 7 days. Before switching traffic to the destination, stop or release the synchronization instance to prevent automatic resumption from overwriting destination data.
DTS calculates the latency of incremental data synchronization based on the timestamp of the latest synchronized data in the destination database and the current timestamp in the source database. If no update operation is performed on the source database for an extended period of time, the synchronization latency may be inaccurate. If the latency of the data synchronization task is excessively high, you can perform an update operation on the source database to update the latency.
If a DTS task fails to run, DTS technical support will try to restore the task within 8 hours. During the restoration, the task may be restarted, and the parameters of the task may be modified. Only the parameters of the DTS task may be modified. The parameters of databases are not modified. The parameters that may be modified include but are not limited to the parameters in the "Modify instance parameters" section.
For sharded cluster sources with Oplog as the incremental sync method, DTS does not guarantee write order across shards to the target Kafka topic.
For sharded cluster sources, disable the MongoDB balancer during full data synchronization. Re-enable it only after full sync completes and incremental sync begins. See Manage the ApsaraDB for MongoDB balancer.
Billing
| Synchronization type | Cost |
|---|---|
| Full data synchronization | Free |
| Incremental data synchronization | Charged. See Billing overview |
Synchronization types and supported operations
Full synchronization copies all historical data from the selected MongoDB collections to the destination Kafka topic. DTS supports DATABASE and COLLECTION objects.
Incremental synchronization continuously delivers change events after full sync completes. Supported operations depend on the incremental sync method:
| Operation | Oplog | Change streams |
|---|---|---|
| INSERT | Supported | Supported |
| UPDATE | Supported | Supported |
| DELETE | Supported | Supported |
| CREATE COLLECTION / INDEX | Supported | Not supported |
| DROP DATABASE / COLLECTION / INDEX | Supported | DROP DATABASE and COLLECTION only |
| RENAME COLLECTION | Supported | Supported |
Incremental synchronization does not pick up databases created after the task starts.
Required database account permissions
| Database | Required permissions |
|---|---|
| Source ApsaraDB for MongoDB | Read permission on the databases to be synchronized, the admin database, and the local database |
For account creation instructions, see Account management.
Create a synchronization task
Step 1: Open the Data Synchronization page
Use either the DTS console or the DMS console.
DTS console
Log on to the DTS console.DTS console
In the left-side navigation pane, click Data Synchronization.
In the upper-left corner, select the region where the synchronization task resides.
DMS console
Steps may vary depending on the DMS console mode. See Simple mode and Customize the layout and style of the DMS console.
Log on to the DMS console.DMS console
In the top navigation bar, move the pointer over Data + AI and choose DTS (DTS) > Data Synchronization.
From the drop-down list to the right of Data Synchronization Tasks, select the region.
Step 2: Configure source and destination databases
Click Create Task, then configure the parameters described in the following tables.
General
| Parameter | Description |
|---|---|
| Task Name | A name for the DTS task. DTS generates a name automatically. Specify a descriptive name to identify the task. The name does not need to be unique. |
Source database
| Parameter | Description |
|---|---|
| Select Existing Connection | If the source instance is registered with DTS, select it from the drop-down list. DTS populates the remaining fields automatically. Otherwise, configure the fields below. In the DMS console, select from the Select a DMS database instance list. |
| Database Type | Select MongoDB. |
| Access Method | Select Alibaba Cloud Instance. |
| Instance Region | Select the region of the source ApsaraDB for MongoDB instance. |
| Replicate Data Across Alibaba Cloud Accounts | Select No if the source database belongs to the current Alibaba Cloud account. |
| Architecture | Select Replica Set for this example. If the source is a Sharded Cluster, also enter the Shard account and Shard password. |
| Migration Method | The method for synchronizing incremental data. Oplog is recommended when the oplog feature is enabled. ChangeStream is available when change streams are enabled on the source. See Change Streams for details. Notes: For non-elastic Amazon DocumentDB clusters, only ChangeStream is supported. When Architecture is set to Sharded Cluster and Migration Method is set to ChangeStream, the Shard account and Shard password fields are not required. |
| Instance ID | Select the instance ID of the source ApsaraDB for MongoDB instance. |
| Authentication Database | The database that contains the account credentials. The default is admin. |
| Database Account | The account for the source database. See Required database account permissions. |
| Database Password | The password for the database account. |
| Encryption | Specifies the connection encryption type: Non-encrypted, SSL-encrypted, or Mongo Atlas SSL. Available options depend on the Access Method and Architecture values. Notes: SSL-encrypted is unavailable when Architecture is Sharded Cluster and Migration Method is Oplog. For self-managed MongoDB with Replica Set architecture and SSL-encrypted selected, upload a CA certificate to verify the connection. |
Destination database
| Parameter | Description |
|---|---|
| Select Existing Connection | If the destination Kafka instance is registered with DTS, select it from the drop-down list. Otherwise, configure the fields below. |
| Database Type | Select Kafka. |
| Access Method | Select Alibaba Cloud Instance. |
| Instance Region | Select the region of the destination Kafka instance. |
| Kafka Instance ID | Select the destination Kafka instance ID. |
| Encryption | Select Non-encrypted or SCRAM-SHA-256 based on your security requirements. |
| Topic | Select the topic to receive the synchronized data. |
| Use Kafka Schema Registry | Kafka Schema Registry is a RESTful metadata service for storing and retrieving Avro schemas. Select No to skip, or Yesalert notification settings and provide the Schema Registry URL or IP address. |
Step 3: Test connectivity
Click Test Connectivity and Proceed at the bottom of the page.
DTS adds its server CIDR blocks to the security settings of the source and destination databases automatically, if allowed. For manual setup, see Add the CIDR blocks of DTS servers.
For self-managed databases not using Alibaba Cloud Instance as the access method, click Test Connectivity in the CIDR Blocks of DTS Servers dialog box.
Step 4: Configure objects to synchronize
In the Configure Objects step, set the following parameters.
| Parameter | Description |
|---|---|
| Synchronization Types | Incremental Data Synchronization is selected by default. Optionally select Full Data Synchronization as well. Schema Synchronization is not available. When full sync is enabled, DTS copies all historical data first before starting incremental sync. |
| Processing Mode of Conflicting Tables | Precheck and Report Errors: the precheck fails if the destination contains collections with the same names as the source. Use object name mapping to resolve naming conflicts. Ignore Errors and Proceed: skips the name-conflict check. If a record in the destination has the same primary key or unique key as a source record, the destination record is kept. Warning This option may cause data inconsistency. |
| Data Format in Kafka | Only Canal JSON is supported. |
| Kafka Data Compression Format | The compression format for data written to Kafka. Options: LZ4 (default — low compression ratio, high speed), GZIP (high compression ratio, low speed, high CPU usage), Snappy (balanced). |
| Policy for Shipping Data to Kafka Partitions | Select a partition routing policy based on your requirements. |
| Message acknowledgement mechanism | Select a message acknowledgment mechanism based on your requirements. |
| Topic That Stores DDL Information | Select a topic to store DDL events. If left blank, DDL events are written to the same topic as data events. |
| Capitalization of Object Names in Destination Instance | Controls capitalization of database and collection names in the destination. Default is DTS default policy. See Specify the capitalization of object names in the destination instance. |
| Source Objects | Select objects from the Source Objects section and click the arrow icon to move them to Selected Objects. Only collection-level selection is supported. |
Step 5: Configure advanced settings
Click Next: Advanced Settings and configure the following parameters.
| Parameter | Description |
|---|---|
| Dedicated Cluster for Task Scheduling | By default, DTS schedules the task on the shared cluster. Purchase a dedicated cluster for higher stability. |
| Retry Time for Failed Connections | How long DTS retries when the source or destination is unreachable. Range: 10–1440 minutes. Default: 720 minutes. We recommend that you set this parameter to a value greater than 30. If you specify different retry times for multiple tasks sharing the same database, the shortest value takes effect. DTS charges for retry time. |
| Retry Time for Other Issues | How long DTS retries when DDL or DML operations fail. Range: 1–1440 minutes. Default: 10 minutes. Set to at least 10 minutes. Must be shorter than Retry Time for Failed Connections. |
| Obtain the entire document after it is updated | Available only when Migration Method is ChangeStream. Yes: synchronizes the full document for each update, which may increase source load and cause latency. If DTS cannot fetch the full document, only the updated fields are sent. No: synchronizes only the changed fields. |
| Enable Throttling for Full Data Synchronization | When enabled, configure QPS to the source database, RPS of Full Data Migration, and Data migration speed for full migration (MB/s) to reduce load on the destination. Available only when Full Data Synchronization is selected. |
| Only one data type for primary key _id in a table of the data to be synchronized | Available only when Full Data Synchronization is selected. Yes: DTS skips scanning the _id field data type during full sync. No: DTS scans the _id field data type. |
| Enable Throttling for Incremental Data Synchronization | When enabled, configure RPS of Incremental Data Synchronization and Data synchronization speed for incremental synchronization (MB/s) to reduce load on the destination. |
| Environment Tag | An optional tag to identify the instance. |
| Configure ETL | Enable to apply extract, transform, and load (ETL) logic. Enter data processing statements in the code editor. See Configure ETL in a data migration or data synchronization task. |
| Monitoring and Alerting | When enabled, specify an alert threshold and notification settings. DTS notifies alert contacts when the task fails or synchronization latency exceeds the threshold. See Configure monitoring and alerting. |
Step 6: Run the precheck
Click Next: Save Task Settings and Precheck.
To preview the API parameters for this configuration, hover over the button and click Preview OpenAPI parameters before proceeding.
The task cannot start until the precheck passes.
If the precheck fails, click View Details next to each failed item, resolve the issue, then click Precheck Again.
If an alert is raised for an item: fix non-ignorable alerts before proceeding. For ignorable alerts, click Confirm Alert Details, then Ignore in the dialog, confirm, and click Precheck Again. Ignoring alerts may cause data inconsistency.
Step 7: Purchase and start the instance
Wait for Success Rate to reach 100%, then click Next: Purchase Instance.
On the buy page, configure the following parameters.
Parameter Description Billing Method Subscription: pay upfront; more cost-effective for long-term use. Pay-as-you-go: billed hourly; suitable for short-term use. Release the instance when no longer needed to stop billing. Resource Group Settings The resource group for the synchronization instance. Default: default resource group. See What is Resource Management? Instance Class The synchronization speed tier. See Instance classes of data synchronization instances. Subscription Duration Available for Subscription billing. Options: 1–9 months, 1 year, 2 years, 3 years, or 5 years. Read and accept Data Transmission Service (Pay-as-you-go) Service Terms.
Click Buy and Start, then click OK in the confirmation dialog.
The task appears in the task list. Monitor its progress there.
Configure collection-to-topic mapping
By default, all collections are written to the topic selected in the destination database configuration. To route a specific collection to a different topic:
In the Selected Objects area, hover over the destination topic name at the collection level.
Click Edit next to the topic name.
In the Edit Table dialog, configure the following settings.
Parameter Description Name of target Topic The destination topic for this collection. The topic must exist in the Kafka instance. If modified, data is written to the new topic instead. Default: the topic selected during destination database configuration. Filter Conditions Optional row filter. See Set filter conditions. Number of Partitions The number of partitions for writing data to the topic. Click OK.
Data delivery examples
Each incremental change from MongoDB is serialized as a Canal JSON message and delivered to the configured Kafka topic. The message structure varies depending on the incremental sync method and update settings.
Choose your scenario
| Goal | Configuration |
|---|---|
| Low-latency sync with full DDL support | Set Migration Method to Oplog (Scenario 1) |
| Change Streams with partial document updates | Set Migration Method to ChangeStream and Obtain the entire document after it is updated to No (Scenario 2) |
| Change Streams with full document on each update | Set Migration Method to ChangeStream and Obtain the entire document after it is updated to Yes (Scenario 3) |
Canal JSON message fields
All three scenarios use the same top-level Canal JSON envelope. The following fields appear in every message.
| Field | Type | Description |
|---|---|---|
database | string | Source database name |
table | string | Source collection name |
type | string | Operation type: INSERT, UPDATE, DELETE, or DDL |
isDdl | boolean | true for DDL events (collection drop, rename); false for DML events |
es | number | Event timestamp in the source database (Unix milliseconds) |
ts | number | Timestamp when DTS processed the event (Unix milliseconds) |
id | number | DTS internal event ID |
pkNames | array | Primary key field names (typically ["_id"]) |
data | array | Document data after the operation. For partial updates (Oplog or ChangeStream without full document retrieval), contains only the changed fields using MongoDB update operators ($set, $unset). null for DDL events. |
old | array | Document state before an update: only the _id field is included. null for INSERT and DDL events. |
sql | object or null | DDL statement details (for isDdl: true). null for DML events. |
gtid | null | Not applicable for MongoDB (reserved for MySQL compatibility). |
mysqlType | null | Not applicable for MongoDB. |
serverId | null | Not applicable for MongoDB. |
sqlType | null | Not applicable for MongoDB. |
Scenario 1: Oplog
Set Migration Method to Oplog.
| Source change type | Source statement | Data received by the destination topic |
|---|---|---|
insert | db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}}) | See example below |
update $set | db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}}) | See example below |
update $set new field | db.kafka_test.update({"cid":"a"},{$set:{"salary":100}}) | See example below |
update $unset (remove field) | db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}}) | See example below |
delete | db.kafka_test.deleteOne({"cid":"a"}) | See example below |
ddl drop | db.kafka_test.drop() | See example below |
Scenario 2: ChangeStream — updated fields only
Set Migration Method to ChangeStream. Set Obtain the entire document after it is updated to No.
Insert and delete messages are identical to Scenario 1. Update messages contain only the changed fields.
Scenario 3: ChangeStream — full document on update
Set Migration Method to ChangeStream. Set Obtain the entire document after it is updated to Yes.
Update events deliver the full document after the change instead of just the changed fields.
Special case: missing fullDocument in ChangeStream
When a ChangeStream update event's fullDocument field is missing — for example, when a document moves across shards in a sharded collection — the delivered message falls back to Oplog behavior (partial update with $set or $unset operators).
Example: sharded collection update where fullDocument is missing
Source base data:
use admin
db.runCommand({ enablesharding:"dts_test" })Source incremental change:
use dts_test
sh.shardCollection("dts_test.cstest",{"name":"hashed"})
db.cstest.insert({"_id":1,"name":"a"})
db.cstest.updateOne({"_id":1,"name":"a"},{$set:{"name":"b"}})FAQ
Can I change the Kafka Data Compression Format after the task is created?
Yes. See Modify the objects to be synchronized.
Can I change the Message acknowledgment mechanism after the task is created?
Yes. See Modify the objects to be synchronized.