All Products
Search
Document Center

Data Transmission Service:Synchronize data from ApsaraDB for MongoDB to Message Queue for Apache Kafka

Last Updated:Oct 10, 2025

Data Transmission Service (DTS) supports data synchronization from MongoDB to a Kafka cluster. This topic describes the steps to perform the synchronization operation using a MongoDB instance that uses a replica set architecture as the source database and a Message Queue for Apache Kafka instance as the destination database.

Prerequisites

  • You have created a destination Message Queue for Apache Kafka instance.

    Note

    For information about the supported versions of the source and destination databases, see Synchronization Solution Overview.

  • You have created a topic to receive data in the destination Message Queue for Apache Kafka instance.

  • If the source database is a ApsaraDB for MongoDB sharded cluster, you must apply for endpoints for all shard nodes. The shard nodes in a sharded cluster instance must share the same account password and endpoint. For more information about how to apply for an enpoint, see Apply for an endpoint for a shard.

Considerations

Type

Description

Source database limits

  • Bandwidth requirements: The server on which the source database is deployed must have sufficient outbound bandwidth. Otherwise, the data synchronization speed is affected.

  • If you want to modify collections in the destination database, such as configuring name mapping for collections, you can synchronize up to 1,000 collections in a single data synchronization task. If you run a task to synchronize more than 1,000 collections, a request error occurs. In this case, we recommend that you configure multiple tasks to synchronize the collections or configure a task to synchronize the entire database.

  • If the source database is an ApsaraDB for MongoDB sharded cluster instance, the _id field in the collection to be synchronized must be unique. Otherwise, data inconsistency may occur.

  • If the source database is an ApsaraDB for MongoDB sharded cluster instance, the number of Mongos nodes in the instance cannot exceed 10. You must also make sure that the source ApsaraDB for MongoDB sharded cluster instance does not contain orphaned documents. Otherwise, data inconsistency may occur and the task may fail. For more information, see the MongoDB documentation and the How do I delete orphaned documents of a MongoDB database deployed in the sharded cluster architecture? section of the FAQ topic.

  • You cannot use the standalone ApsaraDB for MongoDB instance, Azure Cosmos DB for MongoDB cluster or an Amazon DocumentDB elastic cluster as the source database.

  • The oplog feature must be enabled for the source database and must retain log data for at least seven days. Alternatively, change streams must be enabled to ensure that DTS can subscribe to data changes in the source database within the last seven days. Otherwise, DTS may fail to obtain data changes in the source database and data synchronization fails. In some circumstances, data inconsistency or data loss may occur. Issues that arise in such circumstances are not covered by the service level agreement (SLA) of DTS.

    Important
    • We recommend that you use the oplog to record data changes in the source database.

    • Only MongoDB 4.0 and later allow you to use change streams to obtain data changes in the source database. Two-way synchronization is not supported when you use change streams to obtain data changes in the source database.

    • If the source database is a non-elastic Amazon DocumentDB cluster, you must enable change streams and set the Migration Method parameter to ChangeStream and the Architecture parameter to Sharded Cluster.

  • Limits on operations to be performed on the source database:

    • During full data synchronization, do not change the schemas of databases or collections or data of the ARRAY type. Otherwise, the data synchronization task fails, or data inconsistency occurs between the source and destination databases.

    • If the source MongoDB instance uses the sharded cluster architecture, do not run commands that change the data distribution of the objects to be synchronized during the synchronization task. These commands include shardCollection, reshardCollection, unshardCollection, moveCollection, and movePrimary. Otherwise, data inconsistency may occur.

  • If the source database is a MongoDB instance that uses the sharded cluster architecture and the balancer of the source database balances data, latency may occur in the instance.

  • DTS cannot connect MongoDB database over a SRV endpoint.

Other limits

  • Only collection-level synchronization is supported.

  • DTS cannot synchronize data from the admin, config, or local database.

  • If a single piece of data to be synchronized exceeds 10 MB, the task fails.

  • If the source database is a sharded cluster MongoDB instance:

    • Make sure that the MongoDB balancer of the source database is disabled during full data synchronization. Do not enable the balancer until all full data synchronization is complete and incremental data synchronization starts. Otherwise, data inconsistency may occur. For more information about the MongoDB balancer, see Manage the ApsaraDB for MongoDB balancer.

    • If the incremental data synchronization method is Oplog, DTS cannot guarantee the write order of data from different shards of the source database to the target Kafka.

  • Transaction information is not retained. When transactions are synchronized to the destination database, the transactions are converted into single records.

  • If the broker nodes is increased or decreased in the destination Kafka instance during a DTS task, you must restart the DTS task.

  • Make sure that DTS can connect to the source and destination instances. For example, the security settings of the database instances and the listeners and advertised.listeners parameters in the server.properties file of a self-managed Kafka instance do not restrict access from DTS.

  • During a full data synchronization task, DTS uses the read and write resources of the source and destination databases. This may increase the loads of the database servers. Therefore, we recommend that you evaluate the performance of the source and destination databases before you start a DTS instance and synchronize data during off-peak hours, for example, when the CPU loads of the source and destination databases are less than 30%.

  • DTS attempts to resume failed instances that have been running for less than seven days. Before you switch your business to the destination instance, end or release the synchronization instance. This prevents the instance from being automatically resumed, which would overwrite data in the destination database.

  • DTS calculates the latency of incremental data synchronization based on the timestamp of the latest synchronized data in the destination database and the current timestamp in the source database. If no update operation is performed on the source database for an extended period of time, the synchronization latency may be inaccurate. If the latency of the data synchronization task is excessively high, you can perform an update operation on the source database to update the latency.

  • If a DTS task fails to run, DTS technical support will try to restore the task within 8 hours. During the restoration, the task may be restarted, and the parameters of the task may be modified.

    Note

    Only the parameters of the DTS task may be modified. The parameters of databases are not modified. The parameters that may be modified include but are not limited to the parameters in the "Modify instance parameters" section of the Modify the parameters of a DTS instance topic.

Billing

Synchronization type

Task configuration fee

Full data synchronization

Free of charge.

Incremental data synchronization

Charged. For more information, see Billing overview.

Synchronization types

Synchronization type

Description

Full synchronization

Synchronizes all historical data of the source synchronization object in ApsaraDB for MongoDB to the target Kafka instance.

Note

Full synchronization of DATABASE and COLLECTION is supported.

Incremental synchronization

In addition to full synchronization, you can synchronize incremental updates from the source ApsaraDB for MongoDB to the target Kafka instance.

Using Oplog

Incremental synchronization does not support databases created after the task starts. The following incremental updates are supported:

  • CREATE COLLECTION, INDEX

  • DROP DATABASE, COLLECTION, INDEX

  • RENAME COLLECTION

  • Operations to insert, update, and delete documents in a collection.

Using ChangeStream

The following incremental updates are supported:

  • DROP DATABASE, COLLECTION

  • RENAME COLLECTION

  • Operations to insert, update, and delete documents in a collection.

Database account permissions

Database

Required permissions

Account creation and authorization

Source ApsaraDB for MongoDB

The read permission on the databases to be synchronized, the admin database, and the local database.

Account Management

Procedure

  1. Use one of the following methods to go to the Data Synchronization page and select the region in which the data synchronization instance resides.

    DTS console

    1. Log on to the DTS console.

    2. In the left-side navigation pane, click Data Synchronization.

    3. In the upper-left corner of the page, select the region in which the data synchronization task resides.

    DMS console

    Note

    The actual operations may vary based on the mode and layout of the DMS console. For more information, see Simple mode and Customize the layout and style of the DMS console.

    1. Log on to the DMS console.

    2. In the top navigation bar, move the pointer over Data + AI and choose DTS (DTS) > Data Synchronization.

    3. From the drop-down list to the right of Data Synchronization Tasks, select the region in which the data synchronization instance resides.

  2. Click Create Task to go to the task configuration page.

  3. Configure the source and destination databases. The following table describes the parameters.

    Category

    Configuration

    Description

    None

    Task Name

    The name of the DTS task. DTS automatically generates a task name. We recommend that you specify a descriptive name that makes it easy to identify the task. You do not need to specify a unique task name.

    Source Database

    Select Existing Connection

    • If you use a database instance that is registered with DTS, select the instance from the drop-down list. DTS automatically populates the following database parameters for the instance. For more information, see Manage database connections.

      Note

      In the DMS console, you can select the database instance from the Select a DMS database instance drop-down list.

    • If you fail to register the instance with DTS, or you do not need to use the instance that is registered with DTS, you must configure the following database information.

    Database Type

    Select MongoDB.

    Access Method

    Select Alibaba Cloud Instance.

    Instance Region

    Select the region of the source ApsaraDB for MongoDB instance.

    Replicate Data Across Alibaba Cloud Accounts

    In this example, a database of the current Alibaba Cloud account is used. Select No.

    Architecture

    This example selects Replica Set.

    Note

    If your source ApsaraDB for MongoDB instance is a Sharded Cluster, you also need to enter the Shard account and Shard password.

    Migration Method

    The method used to synchronize incremental data from the source database. Select a method based on your business requirements. Valid values:

    • Oplog (recommended):

      This option is available if the oplog feature is enabled for the source database.

      Note

      By default, the oplog feature is enabled for both self-managed MongoDB databases and ApsaraDB for MongoDB instances. This feature allows you to synchronize incremental data at a low latency because of a fast log pulling speed. Therefore, we recommend that you select Oplog for the Migration Method parameter.

    • ChangeStream:

      This option is available if change streams are enabled for the source database. For more information, see Change Streams.

      Note
      • If the source database is an inelastic Amazon DocumentDB cluster, you can set the Migration Method parameter only to ChangeStream.

      • If you select Sharded Cluster for the Architecture parameter, you do not need to configure the Shard account and Shard password parameters.

    Instance ID

    Select the instance ID of the source ApsaraDB for MongoDB.

    Authentication Database

    Enter the name of the database for the database account in the source ApsaraDB for MongoDB instance. If you have not modified it, the default is admin.

    Database Account

    Enter the database account for the source ApsaraDB for MongoDB. For the permission requirements, see Permissions required for database accounts.

    Database Password

    The password that is used to access the database.

    Encryption

    Specifies whether to encrypt the connection to the source database. You can select Non-encrypted, SSL-encrypted, or Mongo Atlas SSL based on your business requirements. The options available for the Encryption parameter are determined by the values selected for the Access Method and Architecture parameters. The options displayed in the DTS console prevail.

    Note
    • If the Architecture parameter is set to Sharded Cluster, and the Migration Method parameter is set to Oplog for the ApsaraDB for MongoDB database, the Encryption parameter SSL-encrypted is unavailable.

    • If the source database is a self-managed MongoDB database that uses the Replica Set architecture, the Access Method parameter is not set to Alibaba Cloud Instance, and the Encryption parameter is set to SSL-encrypted, you can upload a certification authority (CA) certificate to verify the connection to the source database.

    Destination Database

    Select Existing Connection

    • If you use a database instance that is registered with DTS, select the instance from the drop-down list. DTS automatically populates the following database parameters for the instance. For more information, see Manage database connections.

      Note

      In the DMS console, you can select the database instance from the Select a DMS database instance drop-down list.

    • If you fail to register the instance with DTS, or you do not need to use the instance that is registered with DTS, you must configure the following database information.

    Database Type

    Select Kafka.

    Access Method

    Select Alibaba Cloud Instance.

    Instance Region

    Select the region where the destination Kafka instance resides.

    Kafka Instance ID

    Select the ID of the destination Kafka instance.

    Encryption

    Based on your business and security requirements, select Non-encrypted or SCRAM-SHA-256.

    Topic

    From the drop-down list, select the topic that is used to receive data.

    Use Kafka Schema Registry

    Kafka Schema Registry is a metadata service layer that provides a RESTful interface to store and retrieve Avro schemas.

    • No: Do not use Kafka Schema Registry.

    • Yes: Use Kafka Schema Registry. You must enter the URL or IP address that is registered in Kafka Schema Registry for your Avro schemas.

  4. Click Test Connectivity and Proceed in the lower part of the page.

    Note
    • Make sure that the CIDR blocks of DTS servers can be automatically or manually added to the security settings of the source and destination databases to allow access from DTS servers. For more information, see Add the CIDR blocks of DTS servers.

    • If the source or destination database is a self-managed database and its Access Method is not set to Alibaba Cloud Instance, click Test Connectivity in the CIDR Blocks of DTS Servers dialog box.

  5. Configure the objects to be synchronized.

    1. In the Configure Objects step, configure the objects that you want to synchronize.

      Configuration

      Description

      Synchronization Types

      By default, Incremental Data Synchronization is selected. You can select only Full Data Synchronization. You cannot select Schema Synchronization. After the precheck is complete, DTS synchronizes the historical data of the selected objects from the source database to the destination database. The historical data is the basis for subsequent incremental synchronization.

      Processing Mode of Conflicting Tables

      • Precheck and Report Errors: checks whether the destination database contains collections that have the same names as the collections in the source database. If the source and destination databases do not contain collections that have identical collection names, the precheck is passed. Otherwise, an error is returned during the precheck, and the data synchronization instance cannot be started.

        Note

        If the source and destination databases have collections with identical names and the collections in the destination database cannot be deleted or renamed, you can use the object name mapping feature to rename the collections that are synchronized to the destination database. For more information, see Rename an object to be synchronized.

      • Ignore Errors and Proceed: skips the precheck for identical collection names in the source and destination databases.

        Warning

        If you select Ignore Errors and Proceed, data inconsistency may occur and your business may be exposed to potential risks.

        • If a data record in the destination database has the same primary key value or unique key value as a data record in the source database, DTS does not synchronize the data record to the destination database. The existing data record in the destination database is retained.

        • Data may fail to be initialized, only specific columns are synchronized, or the data synchronization instance fails.

      Data Format in Kafka

      Only Canal JSON is supported.

      Note

      The data received by Kafka can be categorized into three scenarios.

      Kafka Data Compression Format

      The compression format for Kafka compressed data. Select a compression format based on your business requirements. Valid values:

      • LZ4 (default): low compression ratio and high compression speed.

      • GZIP: high compression ratio and low compression speed.

        Note

        GZIP compression consumes a large quantity of CPU resources.

      • Snappy: medium compression ratio and medium compression speed.

      Policy for Shipping Data to Kafka Partitions

      Select a policy based on your requirements.

      Message acknowledgement mechanism

      Select a message acknowledgment mechanism based on your requirements.

      Topic That Stores DDL Information

      From the drop-down list, select the topic that is used to store DDL information.

      Note

      If you do not select a topic, DDL information is stored in the topic that receives data by default.

      Capitalization of Object Names in Destination Instance

      The capitalization of database names and collection names in the destination instance. By default, DTS default policy is selected. You can select other options to ensure that the capitalization of object names is consistent with the default capitalization of object names in the source or destination database. For more information, see Specify the capitalization of object names in the destination instance.

      Source Objects

      Select one or more objects from the Source Objects section and click the 向右 icon to add the objects to the Selected Objects section.

      Note

      You can select objects to synchronize at the collection level.

      Selected Objects

      No additional configuration is required in this example.

      You can use the mapping feature to set the mapping information in the destination Kafka instance for collections in the source database.

    2. Click Next: Advanced Settings to configure advanced settings.

      Configuration

      Description

      Dedicated Cluster for Task Scheduling

      By default, DTS schedules the task to the shared cluster if you do not specify a dedicated cluster. If you want to improve the stability of data synchronization instances, purchase a dedicated cluster. For more information, see What is a DTS dedicated cluster.

      Retry Time for Failed Connections

      The retry time range for failed connections. If the source or destination database fails to be connected after the data synchronization task is started, DTS immediately retries a connection within the time range. Valid values: 10 to 1440. Unit: minutes. Default value: 720. We recommend that you set this parameter to a value greater than 30. If DTS reconnects to the source and destination databases within the specified time range, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.

      Note
      • If you specify different retry time ranges for multiple data synchronization tasks that have the same source or destination database, the shortest retry time range takes precedence.

      • When DTS retries a connection, you are charged for the DTS instance. We recommend that you specify the retry time range based on your business requirements. You can also release the DTS instance at your earliest opportunity after the source and destination instances are released.

      Retry Time for Other Issues

      The retry time range for other issues. For example, if the DDL or DML operations fail to be performed after the data synchronization task is started, DTS immediately retries the operations within the time range. Valid values: 1 to 1440. Unit: minutes. Default value: 10. We recommend that you set this parameter to a value greater than 10. If the failed operations are successfully performed within the specified time range, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.

      Important

      The value of the Retry Time for Other Issues parameter must be smaller than the value of the Retry Time for Failed Connections parameter.

      Obtain the entire document after it is updated.

      During incremental data synchronization, specifies whether to synchronize the complete data of the document that corresponds to an update operation to the destination.

      Note

      This configuration item is available only when Migration Method is set to ChangeStream.

      • Yes: Synchronizes the full data of the document that contains the updated field.

        Important
        • This feature is based on the native capabilities of MongoDB and may increase the load on the source database. This can reduce the speed of incremental data collection and cause latency in the synchronization instance.

        • If DTS fails to obtain the complete data, only the data of the updated field is synchronized.

      • No: Synchronizes only the data of the updated fields.

      Enable Throttling for Full Data Synchronization

      During full data synchronization, DTS uses the read and write resources of the source and destination databases. This may increase the load on the database servers. You can configure the Queries per second (QPS) to the source database, RPS of Full Data Migration, and Data migration speed for full migration (MB/s) parameters for full data synchronization tasks to reduce the load on the destination database server.

      Note

      You can configure this parameter only if Full Data Synchronization is selected for the Synchronization Types parameter.

      Only one data type for primary key _id in a table of the data to be synchronized

      Only one data type for primary key _id in a single collection of the data to be synchronized.

      Note

      This parameter is displayed only if Full Data Synchronization is selected for the Synchronization Types parameter.

      • Yes: During full data synchronization, DTS does not scan the data type for primary key in the data to be synchronized from the source database.

      • No: During full data synchronization, DTS scans the data type for primary key in the data to be synchronized from the source database.

      Enable Throttling for Incremental Data Synchronization

      Specifies whether to enable throttling for incremental data synchronization. You can enable throttling for incremental data synchronization based on your business requirements. To configure throttling, you must configure the RPS of Incremental Data Synchronization and Data synchronization speed for incremental synchronization (MB/s) parameters. This reduces the load on the destination database server.

      Environment Tag

      You can select an environment tag to identify the instance based on your requirements. In this example, you do not need to select a tag.

      Configure ETL

      Specifies whether to enable the extract, transform, and load (ETL) feature. For more information, see What is ETL? Valid values:

      Monitoring and Alerting

      Specifies whether to configure alerting for the data synchronization instance. If the task fails or the synchronization latency exceeds the specified threshold, alert contacts will receive notifications. Valid values:

  6. Save the task settings and run a precheck.

    • To view the parameters to be specified when you call the relevant API operation to configure the DTS task, move the pointer over Next: Save Task Settings and Precheck and click Preview OpenAPI parameters.

    • If you do not need to view or have viewed the parameters, click Next: Save Task Settings and Precheck in the lower part of the page.

    Note
    • Before you can start the data synchronization task, DTS performs a precheck. You can start the data synchronization task only after the task passes the precheck.

    • If the data synchronization task fails the precheck, click View Details next to each failed item. After you analyze the causes based on the check results, troubleshoot the issues. Then, rerun the precheck.

    • If an alert is triggered for an item during the precheck:

      • If an alert item cannot be ignored, click View Details next to the failed item and troubleshoot the issue. Then, run a precheck again.

      • If an alert item can be ignored, click Confirm Alert Details. In the View Details dialog box, click Ignore. In the message that appears, click OK. Then, click Precheck Again to run a precheck again. If you ignore the alert item, data inconsistency may occur, and your business may be exposed to potential risks.

  7. Purchase the instance.

    1. Wait until the Success Rate becomes 100%. Then, click Next: Purchase Instance.

    2. On the buy page, configure the Billing Method and Instance Class parameters for the data synchronization task. The following table describes the parameters.

      Section

      Parameter

      Description

      New Instance Class

      Billing Method

      • Subscription: You pay for a subscription when you create a data synchronization instance. The subscription billing method is more cost-effective than the pay-as-you-go billing method for long-term use.

      • Pay-as-you-go: A pay-as-you-go instance is billed on an hourly basis. The pay-as-you-go billing method is suitable for short-term use. If you no longer require a pay-as-you-go data synchronization instance, you can release the instance to reduce costs.

      Resource Group Settings

      The resource group to which the data synchronization instance belongs. Default value: default resource group. For more information, see What is Resource Management?

      Instance Class

      DTS provides instance classes that vary in synchronization speed. You can select an instance class based on your business requirements. For more information, see Instance classes of data synchronization instances.

      Subscription Duration

      If you select the subscription billing method, specify the subscription duration and the number of data synchronization instances that you want to create. The subscription duration can be one to nine months, one year, two years, three years, or five years.

      Note

      This parameter is available only if you select the Subscription billing method.

    3. Read and select Data Transmission Service (Pay-as-you-go) Service Terms.

    4. Click Buy and Start. In the dialog box that appears, click OK.

      You can view the progress of the task in the task list.

Mapping information

  1. In the Selected Objects area, hover the mouse pointer over the destination Topic name at the collection level.

  2. Click Edit next to the destination Topic name.

  3. In the Edit Table dialog box that appears, configure the mapping information.

    Configuration

    Description

    Name of target Topic

    The name of the destination Topic to which the source collection is synced, which defaults to the Topic selected in the Destination Database section of the Configurations for Source and Destination Databases step.

    Important
    • The topic name you enter must exist in the destination Kafka instance. Otherwise, the data synchronization will fail.

    • If you modify the Name of target Topic, the data will be written to the Topic you enter.

    Filter Conditions

    For more information, see Set filter conditions.

    Number of Partitions

    The number of partitions for writing data to the destination topic.

  4. Click OK.

Data delivery scenarios

Scenario 1: Synchronize incremental data using Oplog

Main instance configurations

For Migration Method, select Oplog.

Data delivery examples

Source incremental change type

Source incremental change statement

Data received by the destination topic

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

View data (click to expand)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973438,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

View data (click to expand)

{
	"data": [{
		"$set": {
			"person.age": 20
		}
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848051984,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

View data (click to expand)

{
	"data": [{
		"$set": {
			"salary": 100.0
		}
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848147734,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

View data (click to expand)

{
	"data": [{
		"$unset": {
			"salary": true
		}
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848208186,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

View data (click to expand)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848289798,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

View data (click to expand)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 1741847893000000005,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847893760,
	"type": "DDL"
}

Scenario 2: Synchronize incremental data using ChangeStream (synchronize data of updated fields)

Main instance configurations

For Migration Method, select ChangeStream. For Retrieve Full Document After Update Operation, select No.

Data delivery examples

Source incremental change type

Source incremental change statement

Data received by the destination topic

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

View data (click to expand)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973803,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

View data (click to expand)

{
	"data": [{
		"$set": {
			"person.age": 20
		}
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848052912,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

View data (click to expand)

{
	"data": [{
		"$set": {
			"salary": 100.0
		}
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848148056,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

View data (click to expand)

{
	"data": [{
		"$unset": {
			"salary": 1
		}
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848209142,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

View data (click to expand)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848290254,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

View data (click to expand)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 174184789300000****,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847894679,
	"type": "DDL"
}

Scenario 3: Synchronize incremental data using ChangeStream (synchronize the complete data of the document that corresponds to an updated field)

Main instance configurations

Set Migration Method to ChangeStream, and set Retrieve Full Document After Update Operation to Yes.

Data delivery examples

Source incremental change type

Source incremental change statement

Data received by the destination topic

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

View data (click to expand)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973128,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

View data (click to expand)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848052219,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

View data (click to expand)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"salary": 100.0,
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848147327,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

View data (click to expand)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848208401,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

View data (click to expand)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848290499,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

View data (click to expand)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 174184789300000****,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847894045,
	"type": "DDL"
}

Special cases

Precautions

When the fullDocument field of an update event is missing, the data delivery result is the same as when you synchronize incremental data using Oplog.

Example

Source base data

Source incremental change statement

Data received by the destination topic

use admin
db.runCommand({ enablesharding:"dts_test" }) 
use dts_test
sh.shardCollection("dts_test.cstest",{"name":"hashed"})
db.cstest.insert({"_id":1,"name":"a"})
db.cstest.updateOne({"_id":1,"name":"a"},{$set:{"name":"b"}})

View data (click to expand)

{
	"data": [{
		"$set": {
			"name": "b"
		}
	}],
	"database": "dts_test",
	"es": 1740720994000,
	"gtid": null,
	"id": 174072099400000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"name": "a",
		"_id": 1.0
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "cstest",
	"ts": 1740721007099,
	"type": "UPDATE"
}

FAQ

  • Can I modify the Kafka Data Compression Format?

    Yes. For more information, see Modify the objects to be synchronized.

  • Can I modify the Message acknowledgement mechanism?

    Yes. For more information, see Modify the objects to be synchronized.