DataWorks provides bidirectional data synchronization with MongoDB. This topic describes the data synchronization capabilities that DataWorks provides for MongoDB.
Supported versions
DataWorks supports MongoDB versions 4.x, 5.x, 6.x, 7.x, and 8.0.
Usage notes
Connect to a MongoDB database using an account created for that database. If you use an ApsaraDB for MongoDB data source, a root account is created by default. For security reasons, we recommend that you do not use the root account when you add a MongoDB data source.
If you use a MongoDB sharded cluster, you must configure the address of a mongos node for the data source. Do not configure the address of a mongod/shard node. Otherwise, a synchronization task may query data only from the specified shard instead of the entire dataset. For more information about mongos and mongod, see the mongos and mongod documentation.
MongoDB primary/secondary clusters are not supported.
If the concurrency is greater than 1, all
_idfields in the collection that is configured for the synchronization task must have the same data type. For example, all_idfields must be of the string or ObjectId type. Otherwise, some data may not be synchronized.NoteWhen the concurrency is greater than 1, the task is split based on the
_idfield. Therefore, mixed data types are not supported for the_idfield in this scenario. If the_idfield contains multiple data types, set the concurrency to 1 for data synchronization. To do this, do not configure the splitFactor parameter, or set the splitFactor parameter to 1.
Data Integration does not support the array type. However, MongoDB supports the array type and provides a powerful indexing feature. You can configure specific parameters to convert strings into MongoDB arrays. After the conversion, you can write the data to MongoDB in parallel.
Self-managed MongoDB databases do not support public network access. They can be accessed only over the Alibaba Cloud internal network.
MongoDB clusters that are deployed using Docker are not supported.
Data Integration does not support reading data from specified columns using the query parameter.
In a batch synchronization task, if Data Integration cannot retrieve the field structure from MongoDB, Data Integration generates field mappings for six fields by default. The field names are
col1,col2,col3,col4,col5, andcol6.During task execution, the
splitVectorcommand is used by default to shard the task. Some MongoDB versions do not support thesplitVectorcommand, which may cause ano such cmd splitVectorerror. To prevent this error, click the
icon in the task configuration, switch to the code editor, and add the following parameter to the MongoDB parameter configuration to prevent the use of splitVector."useSplitVector" : false
Supported field types
MongoDB data types supported by MongoDB Reader
Data Integration supports most, but not all, MongoDB data types. Make sure that your data types are supported.
When Data Integration reads supported data types, it performs the following operations:
For primitive data types, Data Integration automatically reads data from the corresponding path based on the name of the field that is configured in the column parameter. For more information, see Appendix: Sample script and parameter description for MongoDB. Data Integration also automatically converts the data type. You do not need to specify the type property for the column.
Type
Batch read (MongoDB Reader)
Description
ObjectId
Supported
The object ID type.
Double
Supported
The 64-bit floating-point number type.
32-bit integer
Supported
A 32-bit integer.
64-bit integer
Supported
A 64-bit integer.
Decimal128
Supported
The Decimal128 type.
NoteIf a field is configured as a nested type or a combine type, it is processed as an object during JSON serialization. You must add the
decimal128OutputTypeparameter and set it tobigDecimalto output the data as a decimal.String
Supported
The string type.
Boolean
Supported
The Boolean type.
Timestamp
Supported
The timestamp type.
NoteBsonTimestamp stores timestamps. You do not need to consider the impact of time zones. For more information, see Time zone issues in MongoDB.
Date
Support
The date type.
For some complex data types, you can configure the type property for the column to perform custom processing.
Type
Batch read (MongoDB Reader)
Description
Document
Supported
The embedded document type.
If the type property is not configured, the Document is directly converted using JSON serialization.
If the type property is set to
document, the field is a nested type. MongoDB Reader reads the Document properties based on the path. For a detailed example, see Example 2: Recursively parsing a multi-level nested Document below.
Array
Supported
The array type.
If type is set to
array.jsonorarrays, the data is directly processed using JSON serialization.If type is set to
arrayordocument.array, the elements are concatenated into a string. The separator, which is specified in the splitter property of the column, is a comma (,) by default.
ImportantData Integration does not support the array type. However, MongoDB supports the array type and provides a powerful indexing feature. You can configure specific parameters to convert strings into MongoDB arrays. After the conversion, you can write the data to MongoDB in parallel.
Special Data Integration data type: combine
Type | Batch read (MongoDB Reader) | Description |
Combine | Supported | A custom data type in Data Integration. If type is set to |
MongoDB Reader data type mappings
The following table lists the mappings between MongoDB data types and Data Integration data types for MongoDB Reader.
Converted type category | MongoDB data type |
LONG | INT, LONG, document.INT, and document.LONG |
DOUBLE | DOUBLE and document.DOUBLE |
STRING | STRING, ARRAY, document.STRING, document.ARRAY, and COMBINE |
DATE | DATE and document.DATE |
BOOLEAN | BOOL and document.BOOL |
BYTES | BYTES and document.BYTES |
MongoDB Writer data type mappings
Type category | MongoDB data type |
Integer | INT and LONG |
Floating-point | DOUBLE |
String | STRING and ARRAY |
Date and time | DATE |
Boolean | BOOL |
Binary | BYTES |
Example 1: Using the combine type
The combine data type of the MongoDB Reader plugin lets you merge multiple fields in a MongoDB document into a single JSON string. For example, assume that you want to import fields from three MongoDB documents to MaxCompute. In the following example, the fields are represented by keys instead of key-value pairs. The fields a and b are common to all three documents, and x_n is a variable field.
doc1: a b x_1 x_2doc2: a b x_2 x_3 x_4doc3: a b x_5
In the configuration file, you must explicitly specify the fields that require one-to-one mapping. For the fields that you want to merge, assign a new name that is different from any existing field name in the document and set the type to COMBINE. The following code provides an example.
"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]The following table shows the final output in MaxCompute.
odps_column1 | odps_column2 | odps_column3 |
a | b | {x_1,x_2} |
a | b | {x_2,x_3,x_4} |
a | b | {x_5} |
After you use the combine type to merge multiple fields in a MongoDB document, common fields are automatically deleted when the output is mapped to MaxCompute. Only the unique fields of the document are retained.
For example, a and b are common fields in all documents. After the fields in the document doc1: a b x_1 x_2 are merged using the combine type, the output is {a,b,x_1,x_2}. When this result is mapped to MaxCompute, the common fields a and b are deleted. The final output is {x_1,x_2}.
Example 2: Recursively parsing a multi-level nested Document
If a document in MongoDB has multiple levels of nesting, you can configure the document type to recursively parse it. The following code provides an example.
Source data in MongoDB:
{ "name": "name1", "a": { "b": { "c": "this is value" } } }MongoDB column configuration:
{"name":"_id","type":"string"} {"name":"name","type":"string"} {"name":"a.b.c","type":"document"}
With the preceding configuration, the value of the nested source field a.b.c is written to the destination field c. After the synchronization task runs, the data that is written to the destination is this is value.
Add a data source
Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data Source Management. You can view the infotips of parameters in the DataWorks console to understand the meanings of the parameters when you add a data source.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
Configure a batch synchronization task for a single table
For more information about the procedure, see Configure a task in the codeless UI and Configure a task in the code editor.
For more information about all parameters and a sample script for the code editor, see Appendix: Sample script and parameter description for MongoDB.
Configure a real-time synchronization task for a single table
For more information about the procedure, see Configure a real-time synchronization task in Data Integration and Configure a real-time synchronization task in DataStudio.
Configure a synchronization task for an entire database
You can configure tasks for batch synchronization, full and incremental real-time synchronization, or real-time synchronization from sharded databases for an entire database. For more information, see Batch synchronization task for an entire database and Configure a real-time synchronization task for an entire database.
Best practices
FAQ
Appendix: Sample script and parameter description for MongoDB
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configure a task in the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
Reader script sample
The following script is an example of a job that is configured to extract data from MongoDB to a local environment. For more information about the parameters, see the parameter descriptions that follow.
Before you run the code, delete the comments.
You cannot extract specified elements from an array.
{
"type":"job",
"version":"2.0",// Version number.
"steps":[
{
"category": "reader",
"name": "Reader",
"parameter": {
"datasource": "datasourceName", // Data source name.
"collectionName": "tag_data", // Collection name.
"query": "", // Data filtering query.
"column": [
{
"name": "unique_id", // Field name.
"type": "string" // Field type.
},
{
"name": "sid",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "auction_id",
"type": "string"
},
{
"name": "content_type",
"type": "string"
},
{
"name": "pool_type",
"type": "string"
},
{
"name": "frontcat_id",
"type": "array",
"splitter": ""
},
{
"name": "categoryid",
"type": "array",
"splitter": ""
},
{
"name": "gmt_create",
"type": "string"
},
{
"name": "taglist",
"type": "array",
"splitter": " "
},
{
"name": "property",
"type": "string"
},
{
"name": "scorea",
"type": "int"
},
{
"name": "scoreb",
"type": "int"
},
{
"name": "scorec",
"type": "int"
},
{
"name": "a.b",
"type": "document.int"
},
{
"name": "a.b.c",
"type": "document.array",
"splitter": " "
}
]
},
"stepType": "mongodb"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"common": {
"column": {
"timeZone": "GMT+0" // Time zone.
}
},
"errorLimit":{
"record":"0"// Number of error records.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. If you set this parameter to false, throttling is disabled and the mbps parameter does not take effect. If you set this parameter to true, throttling is enabled.
"concurrent":1, // Number of concurrent jobs.
"mbps":"12"// Throttling rate. 1 mbps = 1 MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Parameter | Description |
datasource | The name of the data source. In the code editor, the value of this parameter must be the same as the name of the added data source. |
collectionName | The name of the MongoDB collection. |
hint | MongoDB supports the hint parameter, which forces the query optimizer to use a specific index to complete a query. In some cases, this can improve query performance. For more information, see hint parameter. The following code provides an example: |
column | The document field names in MongoDB. Configure them as an array to represent multiple fields.
|
batchSize | The number of records to retrieve in a batch. This parameter is optional. Default value: |
cursorTimeoutInMs | The cursor timeout period. This parameter is optional. Default value: Note
|
query | You can use this parameter to filter the MongoDB data that is returned. Only the following time formats are supported. The UNIX timestamp format is not directly supported. Note
The following code provides common examples for the query parameter:
Note For more information about the query syntax of MongoDB, see the official MongoDB documentation. |
splitFactor | If severe data skew exists, consider increasing the splitFactor to achieve finer-grained sharding without increasing the concurrency. |
Writer script sample
The following script is an example of a data synchronization job that is configured to write data to MongoDB. For more information about the parameters, see the parameter descriptions that follow.
{
"type": "job",
"version": "2.0",// Version number.
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mongodb",// Plugin name.
"parameter": {
"datasource": "",// Data source name.
"column": [
{
"name": "_id",// Column name.
"type": "ObjectId"// Data type. If replaceKey is _id, you must set type to ObjectId. If you set type to string, the replacement fails.
},
{
"name": "age",
"type": "int"
},
{
"name": "id",
"type": "long"
},
{
"name": "wealth",
"type": "double"
},
{
"name": "hobby",
"type": "array",
"splitter": " "
},
{
"name": "valid",
"type": "boolean"
},
{
"name": "date_of_join",
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
],
"writeMode": {// Write mode.
"isReplace": "true",
"replaceKey": "_id"
},
"collectionName": "datax_test"// Collection name.
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {// Number of error records.
"record": "0"
},
"speed": {
"throttle": true,// Specifies whether to enable throttling. If you set this parameter to false, throttling is disabled and the mbps parameter does not take effect. If you set this parameter to true, throttling is enabled.
"concurrent": 1,// Number of concurrent jobs.
"mbps": "1"// Throttling rate. 1 mbps = 1 MB/s.
},
"jvmOption": "-Xms1024m -Xmx1024m"
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Writer script parameters
Parameter | Description | Required | Default value |
datasource | The name of the data source. In the code editor, the value of this parameter must be the same as the name of the added data source. | Yes | None |
collectionName | The name of the MongoDB collection. | Yes | None |
column | The document field names in MongoDB. Configure them as an array to represent multiple fields.
| Yes | None |
writeMode | Specifies whether to overwrite data during transmission. It includes isReplace and replaceKey:
Note If isReplace is set to true and a field other than the This is because the data to be written contains records where the | No | None |
preSql | A pre-operation to execute before writing data to MongoDB, such as clearing historical data. If preSql is empty, no pre-operation is configured. When you configure preSql, make sure that its value complies with the JSON syntax. | No | None |
When you run a Data Integration job, the configured preSql is executed first. The actual data writing phase begins only after the preSql execution is complete. The preSql parameter does not affect the content of the data that is written. The preSql parameter provides idempotent execution for Data Integration. For example, your preSql can be used to clear historical data before each task run based on your business rules. In this case, if a task fails, you can simply rerun the Data Integration job.
The format requirements for preSql are as follows:
You must configure the type field to specify the pre-operation type. The supported values are drop and remove. Example:
"preSql":{"type":"remove"}.drop: Deletes the collection and the data in it. The collection to be deleted is specified by the collectionName parameter.
remove: Deletes data based on a condition.
json: You can use a JSON object to specify the conditions for data deletion. Example:
"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}. In this example,${last_day}is a DataWorks scheduling parameter in the$[yyyy-mm-dd]format. You can also use other MongoDB-supported conditional operators (such as $gt, $lt, $gte, and $lte), logical operators (such as and and or), or functions (such as max, min, sum, avg, and ISODate) as needed.Data Integration executes the data deletion query using the following standard MongoDB API.
query=(BasicDBObject) com.mongodb.util.JSON.parse(json); col.deleteMany(query);NoteTo delete data based on conditions, we recommend that you use the JSON configuration.
item: You can configure the column name (name), condition (condition), and column value (value) for data filtering in an item. Example:
"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}.Data Integration constructs a query condition based on the configured item conditions and then executes the deletion using a standard MongoDB API. For example:
col.deleteMany(query);.
If the preSql is not recognized, no pre-deletion operation is performed.