This topic describes the data types and parameters that are supported by MongoDB Writer and how to configure MongoDB Writer by using the codeless user interface (UI) and code editor.
Background information
- Before you configure MongoDB Writer, you must configure a MongoDB data source. For more information, see Add a MongoDB data source.
- If you use ApsaraDB for MongoDB, a root account is provided for the MongoDB database by default.
- For security purposes, Data Integration can use only the account of a MongoDB database to connect to the MongoDB database. When you add a MongoDB data source, do not use the root account.
MongoDB Writer obtains data from a reader and converts the data from data types supported by Data Integration to data types supported by MongoDB. Data Integration does not support arrays. MongoDB supports arrays, and arrays support the indexing feature.
You can configure parameters to convert strings to MongoDB arrays. Then, MongoDB Writer uses parallel threads to write the arrays to a MongoDB database.
Data type mappings
MongoDB Writer supports most MongoDB data types. Make sure that the data types of your database are supported.
Category | MongoDB data type |
---|---|
Integer | INT and LONG |
Floating point | DOUBLE |
String | STRING and ARRAY |
Date and time | DATE |
Boolean | BOOL |
Binary | BYTES |
Parameters
Parameter | Description | Required | Default value |
---|---|---|---|
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. | Yes | No default value |
collectionName | The name of the collection in MongoDB. | Yes | No default value |
column | The names of the document fields to which you want to write data. Specify the names in an array.
| Yes | No default value |
writeMode | The write mode. The following parameters are included:
Note If you set isReplace to true and set a field other than the _id field as the primary key, an error that is similar to the following error may occur when the data synchronization node is run: The reason is that the value of the _id field does not match the value of the replaceKey parameter for some of the data that is written to the destination table. For more information, see Error: After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2". | No | No default value |
preSql | The SQL statement that you want to execute before the synchronization node is run. For example, you can set this parameter to the SQL statement that is used to delete outdated data. If the preSql parameter is left empty, no SQL statement is executed before the synchronization node is run. Make sure that the value of the preSql parameter is specified based on the JSON syntax. | No | No default value |
Before the synchronization node is run, Data Integration executes the SQL statement specified by the preSql parameter. Then, Data Integration starts to write data. The preSql parameter does not affect the data that is written. You can configure the preSql parameter to ensure the idempotence of the write operation. For example, you can configure the preSql parameter to delete outdated data before a synchronization node is run based on your business requirements. If the synchronization node fails, you need to only rerun the synchronization node.
- Configure the type parameter to specify the action type. Valid values: drop and remove. Example:
"preSql":{"type":"remove"}
.- drop: deletes the collection specified by the collectionName parameter and the data in the collection.
- remove: deletes data based on specified conditions.
- json: the conditions used to delete data. Example:
"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}
.${last_day}
is a scheduling parameter of DataWorks. You can configure this parameter in the format of$[yyyy-mm-dd]
. Other operators and functions are also supported, such as comparison operators $gt, $lt, $gte, and $lte, logical operators $and and $or, and functions max, min, sum, avg, and ISODate. You can use them based on your business requirements.Data Integration uses the following standard MongoDB API to query and delete the specified data:query=(BasicDBObject) com.mongodb.util.JSON.parse(json); col.deleteMany(query);
Note If you want to delete data based on conditions, we recommend that you specify the conditions in the JSON format. - item: the name, condition, and value for filtering data. Example:
"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}
.Data Integration configures query conditions based on the value of the item parameter and deletes data by using the standard MongoDB API. Example:
col.deleteMany(query);
.
- If the value of the preSql parameter cannot be recognized, no SQL statement is executed.
Configure MongoDB Writer by using the codeless UI
- Configure data sources. Configure the source and destination for the synchronization node.
Parameter Description Connection The name of the data source to which you want to write data. This parameter is equivalent to the datasource parameter that is described in the preceding section. CollectionName The name of the collection in MongoDB. This parameter is equivalent to the collectionName parameter that is described in the preceding section. WriteMode(overwrite or not) The write mode. This parameter is equivalent to the writeMode parameter that is described in the preceding section. Note If you set this parameter to true and set a field other than the_id
field as the primary key, an error that is similar to the following error may occur when the data synchronization node is run:
The reason is that the value of theAfter applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"
_id
field does not match the value of the replaceKey parameter for some of the data that is written to the destination table. For more information, see Error: After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2".PreSql The SQL statement that you want to execute before the synchronization node is run. This parameter is equivalent to the preSql parameter that is described in the preceding section. For example, you can set this parameter to the SQL statement that is used to delete outdated data. If the preSql parameter is left empty, no SQL statement is executed before the synchronization node is run. Make sure that the value of the preSql parameter is specified based on the JSON syntax. - Configure field mappings. This operation is equivalent to setting the column parameter that is described in the preceding section. By default, the system maps the field in a row of the source to the field in the same row of the destination. You can click the
icon to manually edit fields in the destination.
- Configure channel control policies.
Parameter Description Expected Maximum Concurrency The maximum number of parallel threads that the synchronization node uses to read data from the source or write data to the destination. You can configure the parallelism for the synchronization node on the codeless UI. Bandwidth Throttling Specifies whether to enable throttling. You can enable throttling and specify a maximum transmission rate to prevent heavy read workloads on the source. We recommend that you enable throttling and set the maximum transmission rate to an appropriate value based on the configurations of the source. Dirty Data Records Allowed The maximum number of dirty data records allowed. Distributed Execution The distributed execution mode that allows you to split your node into pieces and distribute them to multiple Elastic Compute Service (ECS) instances for parallel execution. This speeds up synchronization. If you use a large number of parallel threads to run your synchronization node in distributed execution mode, excessive access requests are sent to the data sources. Therefore, before you use the distributed execution mode, you must evaluate the access load on the data sources. You can enable this mode only if you use an exclusive resource group for Data Integration. For more information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration and Create and use an exclusive resource group for Data Integration.
Configure MongoDB Writer by using the code editor
For more information about how to configure a data synchronization node by using the code editor, see Configure a batch synchronization node by using the code editor.
{
"type": "job",
"version": "2.0",// The version number.
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mongodb",// The writer type.
"parameter": {
"datasource": "",// The name of the data source.
"column": [
{
"name": "_id",// The name of the field.
"type": "ObjectId"// The data type of the field. If you set the replaceKey parameter to _id, you must set the type parameter to ObjectId. If you set the type parameter to string, the data cannot be overwritten.
},
{
"name": "age",
"type": "int"
},
{
"name": "id",
"type": "long"
},
{
"name": "wealth",
"type": "double"
},
{
"name": "hobby",
"type": "array",
"splitter": " "
},
{
"name": "valid",
"type": "boolean"
},
{
"name": "date_of_join",
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
],
"writeMode": {// The write mode.
"isReplace": "true",
"replaceKey": "_id"
},
"collectionName": "datax_test"// The name of the collection.
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {// The maximum number of dirty data records allowed.
"record": "0"
},
"speed": {
"throttle": true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent": 1,// The maximum number of parallel threads.
"mbps": "1"// The maximum transmission rate.
},
"jvmOption": "-Xms1024m -Xmx1024m"
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Error: no such cmd splitVector
- Possible cause:
When a data synchronization node is run, the node preferentially runs the
splitVector
command to split the node into shards. However, some MongoDB versions do not support thesplitVector
command. If you run this command in a node that is used to write data to a MangoDB database of one of the versions, theno such cmd splitVector
error occurs. - Solution:
- On the configuration tab of the data synchronization node, click the
icon in the top toolbar to switch to the code editor mode.
- Add the following parameter to the parameter configurations in the preceding sample code:
Set the added parameter to false to prevent the"useSplitVector" : false
splitVector
command from being used.
- On the configuration tab of the data synchronization node, click the
Error: After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"
- Problem description:If you set WriteMode(overwrite or not) to Yes and set a field other than the
_id
field as the primary key when you configure a data synchronization node by using the codeless UI, an error may occur. - Possible cause:
The value of the _id field does not match the value of the replaceKey parameter for some of the data that is written to the destination table. In the sample configuration, you set replaceKey to
my_id
. - Solution:
- Scenario 1: Modify the configurations of the data synchronization node to ensure that the value of the replaceKey parameter is the same as that of the _id field.
- Scenario 2: Set the _id field as the primary key for the data synchronization node.