Data Transmission Service (DTS) supports data synchronization from MongoDB to a Kafka cluster. This topic describes the steps to perform the synchronization operation using a MongoDB instance that uses a replica set architecture as the source database and a Message Queue for Apache Kafka instance as the destination database.
Prerequisites
You have created a destination Message Queue for Apache Kafka instance.
NoteFor information about the supported versions of the source and destination databases, see Synchronization Solution Overview.
You have created a topic to receive data in the destination Message Queue for Apache Kafka instance.
If the source database is a ApsaraDB for MongoDB sharded cluster, you must apply for endpoints for all shard nodes. The shard nodes in a sharded cluster instance must share the same account password and endpoint. For more information about how to apply for an enpoint, see Apply for an endpoint for a shard.
Considerations
Type | Description |
Source database limits |
|
Other limits |
|
Billing
Synchronization type | Task configuration fee |
Full data synchronization | Free of charge. |
Incremental data synchronization | Charged. For more information, see Billing overview. |
Synchronization types
Synchronization type | Description |
Full synchronization | Synchronizes all historical data of the source synchronization object in ApsaraDB for MongoDB to the target Kafka instance. Note Full synchronization of DATABASE and COLLECTION is supported. |
Incremental synchronization | In addition to full synchronization, you can synchronize incremental updates from the source ApsaraDB for MongoDB to the target Kafka instance. Using OplogIncremental synchronization does not support databases created after the task starts. The following incremental updates are supported:
Using ChangeStreamThe following incremental updates are supported:
|
Database account permissions
Database | Required permissions | Account creation and authorization |
Source ApsaraDB for MongoDB | The read permission on the databases to be synchronized, the admin database, and the local database. |
Procedure
Use one of the following methods to go to the Data Synchronization page and select the region in which the data synchronization instance resides.
DTS console
Log on to the DTS console.
In the left-side navigation pane, click Data Synchronization.
In the upper-left corner of the page, select the region in which the data synchronization task resides.
DMS console
NoteThe actual operations may vary based on the mode and layout of the DMS console. For more information, see Simple mode and Customize the layout and style of the DMS console.
Log on to the DMS console.
In the top navigation bar, move the pointer over Data + AI and choose .
From the drop-down list to the right of Data Synchronization Tasks, select the region in which the data synchronization instance resides.
Click Create Task to go to the task configuration page.
Configure the source and destination databases. The following table describes the parameters.
Category
Configuration
Description
None
Task Name
The name of the DTS task. DTS automatically generates a task name. We recommend that you specify a descriptive name that makes it easy to identify the task. You do not need to specify a unique task name.
Source Database
Select Existing Connection
If you use a database instance that is registered with DTS, select the instance from the drop-down list. DTS automatically populates the following database parameters for the instance. For more information, see Manage database connections.
NoteIn the DMS console, you can select the database instance from the Select a DMS database instance drop-down list.
If you fail to register the instance with DTS, or you do not need to use the instance that is registered with DTS, you must configure the following database information.
Database Type
Select MongoDB.
Access Method
Select Alibaba Cloud Instance.
Instance Region
Select the region of the source ApsaraDB for MongoDB instance.
Replicate Data Across Alibaba Cloud Accounts
In this example, a database of the current Alibaba Cloud account is used. Select No.
Architecture
This example selects Replica Set.
NoteIf your source ApsaraDB for MongoDB instance is a Sharded Cluster, you also need to enter the Shard account and Shard password.
Migration Method
The method used to synchronize incremental data from the source database. Select a method based on your business requirements. Valid values:
Oplog (recommended):
This option is available if the oplog feature is enabled for the source database.
NoteBy default, the oplog feature is enabled for both self-managed MongoDB databases and ApsaraDB for MongoDB instances. This feature allows you to synchronize incremental data at a low latency because of a fast log pulling speed. Therefore, we recommend that you select Oplog for the Migration Method parameter.
ChangeStream:
This option is available if change streams are enabled for the source database. For more information, see Change Streams.
NoteIf the source database is an inelastic Amazon DocumentDB cluster, you can set the Migration Method parameter only to ChangeStream.
If you select Sharded Cluster for the Architecture parameter, you do not need to configure the Shard account and Shard password parameters.
Instance ID
Select the instance ID of the source ApsaraDB for MongoDB.
Authentication Database
Enter the name of the database for the database account in the source ApsaraDB for MongoDB instance. If you have not modified it, the default is admin.
Database Account
Enter the database account for the source ApsaraDB for MongoDB. For the permission requirements, see Permissions required for database accounts.
Database Password
The password that is used to access the database.
Encryption
Specifies whether to encrypt the connection to the source database. You can select Non-encrypted, SSL-encrypted, or Mongo Atlas SSL based on your business requirements. The options available for the Encryption parameter are determined by the values selected for the Access Method and Architecture parameters. The options displayed in the DTS console prevail.
NoteIf the Architecture parameter is set to Sharded Cluster, and the Migration Method parameter is set to Oplog for the ApsaraDB for MongoDB database, the Encryption parameter SSL-encrypted is unavailable.
If the source database is a self-managed MongoDB database that uses the Replica Set architecture, the Access Method parameter is not set to Alibaba Cloud Instance, and the Encryption parameter is set to SSL-encrypted, you can upload a certification authority (CA) certificate to verify the connection to the source database.
Destination Database
Select Existing Connection
If you use a database instance that is registered with DTS, select the instance from the drop-down list. DTS automatically populates the following database parameters for the instance. For more information, see Manage database connections.
NoteIn the DMS console, you can select the database instance from the Select a DMS database instance drop-down list.
If you fail to register the instance with DTS, or you do not need to use the instance that is registered with DTS, you must configure the following database information.
Database Type
Select Kafka.
Access Method
Select Alibaba Cloud Instance.
Instance Region
Select the region where the destination Kafka instance resides.
Kafka Instance ID
Select the ID of the destination Kafka instance.
Encryption
Based on your business and security requirements, select Non-encrypted or SCRAM-SHA-256.
Topic
From the drop-down list, select the topic that is used to receive data.
Use Kafka Schema Registry
Kafka Schema Registry is a metadata service layer that provides a RESTful interface to store and retrieve Avro schemas.
No: Do not use Kafka Schema Registry.
Yes: Use Kafka Schema Registry. You must enter the URL or IP address that is registered in Kafka Schema Registry for your Avro schemas.
Click Test Connectivity and Proceed in the lower part of the page.
NoteMake sure that the CIDR blocks of DTS servers can be automatically or manually added to the security settings of the source and destination databases to allow access from DTS servers. For more information, see Add the CIDR blocks of DTS servers.
If the source or destination database is a self-managed database and its Access Method is not set to Alibaba Cloud Instance, click Test Connectivity in the CIDR Blocks of DTS Servers dialog box.
Configure the objects to be synchronized.
In the Configure Objects step, configure the objects that you want to synchronize.
Configuration
Description
Synchronization Types
By default, Incremental Data Synchronization is selected. You can select only Full Data Synchronization. You cannot select Schema Synchronization. After the precheck is complete, DTS synchronizes the historical data of the selected objects from the source database to the destination database. The historical data is the basis for subsequent incremental synchronization.
Processing Mode of Conflicting Tables
Precheck and Report Errors: checks whether the destination database contains collections that have the same names as the collections in the source database. If the source and destination databases do not contain collections that have identical collection names, the precheck is passed. Otherwise, an error is returned during the precheck, and the data synchronization instance cannot be started.
NoteIf the source and destination databases have collections with identical names and the collections in the destination database cannot be deleted or renamed, you can use the object name mapping feature to rename the collections that are synchronized to the destination database. For more information, see Rename an object to be synchronized.
Ignore Errors and Proceed: skips the precheck for identical collection names in the source and destination databases.
WarningIf you select Ignore Errors and Proceed, data inconsistency may occur and your business may be exposed to potential risks.
If a data record in the destination database has the same primary key value or unique key value as a data record in the source database, DTS does not synchronize the data record to the destination database. The existing data record in the destination database is retained.
Data may fail to be initialized, only specific columns are synchronized, or the data synchronization instance fails.
Data Format in Kafka
Only Canal JSON is supported.
NoteThe data received by Kafka can be categorized into three scenarios.
Kafka Data Compression Format
The compression format for Kafka compressed data. Select a compression format based on your business requirements. Valid values:
LZ4 (default): low compression ratio and high compression speed.
GZIP: high compression ratio and low compression speed.
NoteGZIP compression consumes a large quantity of CPU resources.
Snappy: medium compression ratio and medium compression speed.
Policy for Shipping Data to Kafka Partitions
Select a policy based on your requirements.
Message acknowledgement mechanism
Select a message acknowledgment mechanism based on your requirements.
Topic That Stores DDL Information
From the drop-down list, select the topic that is used to store DDL information.
NoteIf you do not select a topic, DDL information is stored in the topic that receives data by default.
Capitalization of Object Names in Destination Instance
The capitalization of database names and collection names in the destination instance. By default, DTS default policy is selected. You can select other options to ensure that the capitalization of object names is consistent with the default capitalization of object names in the source or destination database. For more information, see Specify the capitalization of object names in the destination instance.
Source Objects
Select one or more objects from the Source Objects section and click the
icon to add the objects to the Selected Objects section. NoteYou can select objects to synchronize at the collection level.
Selected Objects
No additional configuration is required in this example.
You can use the mapping feature to set the mapping information in the destination Kafka instance for collections in the source database.
Click Next: Advanced Settings to configure advanced settings.
Configuration
Description
Dedicated Cluster for Task Scheduling
By default, DTS schedules the task to the shared cluster if you do not specify a dedicated cluster. If you want to improve the stability of data synchronization instances, purchase a dedicated cluster. For more information, see What is a DTS dedicated cluster.
Retry Time for Failed Connections
The retry time range for failed connections. If the source or destination database fails to be connected after the data synchronization task is started, DTS immediately retries a connection within the time range. Valid values: 10 to 1440. Unit: minutes. Default value: 720. We recommend that you set this parameter to a value greater than 30. If DTS reconnects to the source and destination databases within the specified time range, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.
NoteIf you specify different retry time ranges for multiple data synchronization tasks that have the same source or destination database, the shortest retry time range takes precedence.
When DTS retries a connection, you are charged for the DTS instance. We recommend that you specify the retry time range based on your business requirements. You can also release the DTS instance at your earliest opportunity after the source and destination instances are released.
Retry Time for Other Issues
The retry time range for other issues. For example, if the DDL or DML operations fail to be performed after the data synchronization task is started, DTS immediately retries the operations within the time range. Valid values: 1 to 1440. Unit: minutes. Default value: 10. We recommend that you set this parameter to a value greater than 10. If the failed operations are successfully performed within the specified time range, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.
ImportantThe value of the Retry Time for Other Issues parameter must be smaller than the value of the Retry Time for Failed Connections parameter.
Obtain the entire document after it is updated.
During incremental data synchronization, specifies whether to synchronize the complete data of the document that corresponds to an update operation to the destination.
NoteThis configuration item is available only when Migration Method is set to ChangeStream.
Yes: Synchronizes the full data of the document that contains the updated field.
ImportantThis feature is based on the native capabilities of MongoDB and may increase the load on the source database. This can reduce the speed of incremental data collection and cause latency in the synchronization instance.
If DTS fails to obtain the complete data, only the data of the updated field is synchronized.
No: Synchronizes only the data of the updated fields.
Enable Throttling for Full Data Synchronization
During full data synchronization, DTS uses the read and write resources of the source and destination databases. This may increase the load on the database servers. You can configure the Queries per second (QPS) to the source database, RPS of Full Data Migration, and Data migration speed for full migration (MB/s) parameters for full data synchronization tasks to reduce the load on the destination database server.
NoteYou can configure this parameter only if Full Data Synchronization is selected for the Synchronization Types parameter.
Only one data type for primary key _id in a table of the data to be synchronized
Only one data type for primary key _id in a single collection of the data to be synchronized.
NoteThis parameter is displayed only if Full Data Synchronization is selected for the Synchronization Types parameter.
Yes: During full data synchronization, DTS does not scan the data type for primary key in the data to be synchronized from the source database.
No: During full data synchronization, DTS scans the data type for primary key in the data to be synchronized from the source database.
Enable Throttling for Incremental Data Synchronization
Specifies whether to enable throttling for incremental data synchronization. You can enable throttling for incremental data synchronization based on your business requirements. To configure throttling, you must configure the RPS of Incremental Data Synchronization and Data synchronization speed for incremental synchronization (MB/s) parameters. This reduces the load on the destination database server.
Environment Tag
You can select an environment tag to identify the instance based on your requirements. In this example, you do not need to select a tag.
Configure ETL
Specifies whether to enable the extract, transform, and load (ETL) feature. For more information, see What is ETL? Valid values:
Yes: configures the ETL feature. You can enter data processing statements in the code editor. For more information, see Configure ETL in a data migration or data synchronization task.
No: does not configure the ETL feature.
Monitoring and Alerting
Specifies whether to configure alerting for the data synchronization instance. If the task fails or the synchronization latency exceeds the specified threshold, alert contacts will receive notifications. Valid values:
No: does not enable alerting.
Yes: configures alerting. In this case, you must also configure the alert threshold and alert notification settings. For more information, see the "Configure monitoring and alerting when you create a DTS task" section of the Configure monitoring and alerting topic.
Save the task settings and run a precheck.
To view the parameters to be specified when you call the relevant API operation to configure the DTS task, move the pointer over Next: Save Task Settings and Precheck and click Preview OpenAPI parameters.
If you do not need to view or have viewed the parameters, click Next: Save Task Settings and Precheck in the lower part of the page.
NoteBefore you can start the data synchronization task, DTS performs a precheck. You can start the data synchronization task only after the task passes the precheck.
If the data synchronization task fails the precheck, click View Details next to each failed item. After you analyze the causes based on the check results, troubleshoot the issues. Then, rerun the precheck.
If an alert is triggered for an item during the precheck:
If an alert item cannot be ignored, click View Details next to the failed item and troubleshoot the issue. Then, run a precheck again.
If an alert item can be ignored, click Confirm Alert Details. In the View Details dialog box, click Ignore. In the message that appears, click OK. Then, click Precheck Again to run a precheck again. If you ignore the alert item, data inconsistency may occur, and your business may be exposed to potential risks.
Purchase the instance.
Wait until the Success Rate becomes 100%. Then, click Next: Purchase Instance.
On the buy page, configure the Billing Method and Instance Class parameters for the data synchronization task. The following table describes the parameters.
Section
Parameter
Description
New Instance Class
Billing Method
Subscription: You pay for a subscription when you create a data synchronization instance. The subscription billing method is more cost-effective than the pay-as-you-go billing method for long-term use.
Pay-as-you-go: A pay-as-you-go instance is billed on an hourly basis. The pay-as-you-go billing method is suitable for short-term use. If you no longer require a pay-as-you-go data synchronization instance, you can release the instance to reduce costs.
Resource Group Settings
The resource group to which the data synchronization instance belongs. Default value: default resource group. For more information, see What is Resource Management?
Instance Class
DTS provides instance classes that vary in synchronization speed. You can select an instance class based on your business requirements. For more information, see Instance classes of data synchronization instances.
Subscription Duration
If you select the subscription billing method, specify the subscription duration and the number of data synchronization instances that you want to create. The subscription duration can be one to nine months, one year, two years, three years, or five years.
NoteThis parameter is available only if you select the Subscription billing method.
Read and select Data Transmission Service (Pay-as-you-go) Service Terms.
Click Buy and Start. In the dialog box that appears, click OK.
You can view the progress of the task in the task list.
Mapping information
In the Selected Objects area, hover the mouse pointer over the destination Topic name at the collection level.
Click Edit next to the destination Topic name.
In the Edit Table dialog box that appears, configure the mapping information.
Configuration
Description
Name of target Topic
The name of the destination Topic to which the source collection is synced, which defaults to the Topic selected in the Destination Database section of the Configurations for Source and Destination Databases step.
ImportantThe topic name you enter must exist in the destination Kafka instance. Otherwise, the data synchronization will fail.
If you modify the Name of target Topic, the data will be written to the Topic you enter.
Filter Conditions
For more information, see Set filter conditions.
Number of Partitions
The number of partitions for writing data to the destination topic.
Click OK.
Data delivery scenarios
Scenario 1: Synchronize incremental data using Oplog
Main instance configurations
For Migration Method, select Oplog.
Data delivery examples
Source incremental change type | Source incremental change statement | Data received by the destination topic |
| | |
| | |
| | |
| | |
| | |
| |
Scenario 2: Synchronize incremental data using ChangeStream (synchronize data of updated fields)
Main instance configurations
For Migration Method, select ChangeStream. For Retrieve Full Document After Update Operation, select No.
Data delivery examples
Source incremental change type | Source incremental change statement | Data received by the destination topic |
| | |
| | |
| | |
| | |
| | |
| |
Scenario 3: Synchronize incremental data using ChangeStream (synchronize the complete data of the document that corresponds to an updated field)
Main instance configurations
Set Migration Method to ChangeStream, and set Retrieve Full Document After Update Operation to Yes.
Data delivery examples
Source incremental change type | Source incremental change statement | Data received by the destination topic |
| | |
| | |
| | |
| | |
| | |
| |
Special cases
Precautions
When the fullDocument field of an update event is missing, the data delivery result is the same as when you synchronize incremental data using Oplog.
Example
Source base data | Source incremental change statement | Data received by the destination topic |
| |
FAQ
Can I modify the Kafka Data Compression Format?
Yes. For more information, see Modify the objects to be synchronized.
Can I modify the Message acknowledgement mechanism?
Yes. For more information, see Modify the objects to be synchronized.