The MongoDB data source lets Data Integration read from and write to MongoDB collections. Use it to:
-
Run offline batch sync tasks to extract and load MongoDB data at scale
-
Set up real-time sync tasks that stream changes from MongoDB
-
Synchronize an entire MongoDB database in a single whole-database sync task
Supported versions: MongoDB 4.x, 5.x, 6.x, 7.x, and 8.0.
Usage notes
Connection and authentication
Data Integration connects to MongoDB using a database account. If you use ApsaraDB for MongoDB, a root account is available by default — do not use it for sync tasks. Create a dedicated account with the minimum required permissions.
Self-managed MongoDB databases are only accessible over the Alibaba Cloud private network. Public network access is not supported. MongoDB clusters deployed with Docker are not supported.
Sharded cluster configuration
For sharded MongoDB clusters, configure the mongos address in the data source — not the mongod or shard node address. Pointing to a shard node causes the sync task to read from that shard only, missing data from other shards. For background on mongos and mongod, see mongos and mongod.
Concurrency and _id type consistency
When concurrency is greater than 1, Data Integration splits the task based on the _id field. All _id values in the collection must be the same type (for example, all strings or all ObjectIds). Mixed _id types with concurrency > 1 causes some records to fail.
If the collection has mixed _id types, set concurrency to 1 and leave splitFactor unconfigured (or set it to 1).
Array type
Data Integration does not natively support the MongoDB array type. To write array data to MongoDB, configure the splitter parameter to convert a delimited string into an array. After conversion, parallel writes are supported.
Reading data from specific columns via the query parameter is not supported.
Default field mapping
If field structure cannot be retrieved from MongoDB during an offline sync task, Data Integration generates a default mapping with six fields: col1, col2, col3, col4, col5, and col6.
splitVector compatibility
Data Integration uses the splitVector command by default for task sharding. Some MongoDB versions do not support this command, resulting in:
no such cmd splitVector
To disable splitVector, open the sync task in the code editor and add the following to the MongoDB reader parameters:
"useSplitVector": false
Supported field types
Primitive types (MongoDB Reader)
For primitive types, MongoDB Reader reads field values automatically based on the field name and type configured in the sync task. No additional type configuration is needed.
| Type | Supported | Description |
|---|---|---|
| ObjectId | Yes | Object ID type |
| Double | Yes | 64-bit floating-point |
| 32-bit integer | Yes | 32-bit integer |
| 64-bit integer | Yes | 64-bit integer |
| Decimal128 | Yes | When used as a nested or Combine type, add decimal128OutputType: bigDecimal to output as decimal |
| String | Yes | String type |
| Boolean | Yes | Boolean type |
| Timestamp | Yes | BsonTimestamp; no time zone conversion required |
| Date | Yes | Date type |
Complex types (MongoDB Reader)
For complex types, configure the type property in the column definition to control how data is processed.
| Type | Supported | Behavior |
|---|---|---|
| Document | Yes | Without type: serialized as a JSON string. With type: document: read nested properties by path (see Nested document parsing) |
| Array | Yes | array.json or arrays: serialized as a JSON string. array or document.array: concatenated into a string using the splitter separator |
Combine type (Data Integration custom type)
The Combine type is a Data Integration-specific type. When type is set to combine, MongoDB Reader removes the key for the configured column, then serializes all remaining fields in the document into a single JSON string.
Example: Given three documents with fixed fields a and b, plus variable fields x_n:
-
doc1: a b x_1 x_2 -
doc2: a b x_2 x_3 x_4 -
doc3: a b x_5
Configure the column mapping as:
"column": [
{"name": "a", "type": "string"},
{"name": "b", "type": "string"},
{"name": "doc", "type": "combine"}
]
Output in MaxCompute:
| odps_column1 | odps_column2 | odps_column3 |
|---|---|---|
| a | b | {x_1, x_2} |
| a | b | {x_2, x_3, x_4} |
| a | b | {x_5} |
Common fields (aandb) are automatically excluded from the Combine output. Only fields unique to each document are retained in the merged JSON string.
MongoDB Reader type conversions
| Data Integration type | MongoDB types |
|---|---|
| LONG | INT, LONG, document.INT, document.LONG |
| DOUBLE | DOUBLE, document.DOUBLE |
| STRING | STRING, ARRAY, document.STRING, document.ARRAY, COMBINE |
| DATE | DATE, document.DATE |
| BOOLEAN | BOOL, document.BOOL |
| BYTES | BYTES, document.BYTES |
MongoDB Writer type conversions
| Data Integration type | MongoDB types |
|---|---|
| Integer | INT, LONG |
| Floating-point | DOUBLE |
| String | STRING, ARRAY |
| Date and time | DATE |
| Boolean | BOOL |
| Binary | BYTES |
Nested document parsing
For multi-level nested documents, use type: document to read nested fields by path.
Source document in MongoDB:
{
"name": "name1",
"a": {
"b": {
"c": "this is value"
}
}
}
Column configuration:
{"name": "_id", "type": "string"}
{"name": "name", "type": "string"}
{"name": "a.b.c", "type": "document"}
After the sync task runs, the value of a.b.c (this is value) is written to the destination field c.
Add a data source
Before creating a sync task, add MongoDB as a data source in DataWorks. See Data source management for the procedure and parameter descriptions.
Configure a sync task
Offline sync task
-
Codeless UI: Configure a task in the codeless UI
-
Code editor: Configure a task in the code editor
-
Script reference: See Script reference for all parameters and a full script example.
Real-time sync task
Whole-database sync task
Script reference
Use this section when configuring a sync task in the code editor. For the general script format, see Configure a task in the code editor.
Reader script
The following script reads data from MongoDB and writes to a stream destination. Delete all inline comments before running.
Extracting a specific element from an array is not supported.
{
"type": "job",
"version": "2.0",
"steps": [
{
"category": "reader",
"name": "Reader",
"parameter": {
"datasource": "datasourceName",
"collectionName": "tag_data",
"query": "",
"column": [
{"name": "unique_id", "type": "string"},
{"name": "sid", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "auction_id", "type": "string"},
{"name": "content_type", "type": "string"},
{"name": "pool_type", "type": "string"},
{"name": "frontcat_id", "type": "array", "splitter": ""},
{"name": "categoryid", "type": "array", "splitter": ""},
{"name": "gmt_create", "type": "string"},
{"name": "taglist", "type": "array", "splitter": " "},
{"name": "property", "type": "string"},
{"name": "scorea", "type": "int"},
{"name": "scoreb", "type": "int"},
{"name": "scorec", "type": "int"},
{"name": "a.b", "type": "document.int"},
{"name": "a.b.c", "type": "document.array", "splitter": " "}
]
},
"stepType": "mongodb"
},
{
"stepType": "stream",
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"common": {
"column": {
"timeZone": "GMT+0"
}
},
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,
"concurrent": 1,
"mbps": "12"
}
},
"order": {
"hops": [{"from": "Reader", "to": "Writer"}]
}
}
Reader parameters
| Parameter | Description | Required | Default |
|---|---|---|---|
datasource |
Name of the data source. Must match the name configured in Data Integration. | Yes | — |
collectionName |
Name of the MongoDB collection. | Yes | — |
column |
Array of field definitions. Each entry specifies name and type. See supported types below. |
Yes | — |
query |
Filter to limit the data range returned. Supports specific date formats only; JavaScript syntax is not supported. | No | — |
hint |
Forces the query optimizer to use a specific index. Example: {"collectionName": "test_collection", "hint": "{age:1}"}. See hint parameter. |
No | — |
batchSize |
Number of records retrieved per batch. Decrease this value if the cursor times out. | No | 1000 |
cursorTimeoutInMs |
Cursor timeout in milliseconds. Set to a negative value to disable timeout (not recommended — an abandoned cursor persists on the server until restart). If the cursor times out, decrease batchSize or increase this value. |
No | 600000 |
splitFactor |
Adjusts chunk granularity when data skew is significant. Increase this value to reduce chunk size without raising concurrency. | No | — |
useSplitVector |
Set to false to disable the splitVector command. Required for MongoDB versions that do not support splitVector. |
No | true |
decimal128OutputType |
Set to bigDecimal to output Decimal128 fields as decimal when used in nested or Combine type configurations. |
No | — |
Supported `column` type values:
type value |
Description |
|---|---|
string |
String |
long |
Integer |
double |
Floating-point number |
date |
Date |
bool |
Boolean |
bytes |
Binary |
arrays |
Array serialized as a JSON string, e.g., ["a","b","c"] |
array |
Array concatenated as a delimited string using splitter, e.g., a,b,c. Use arrays instead when possible. |
combine |
Merges all remaining document fields into a single JSON string |
`query` examples:
{"query": "{ status: \"normal\"}"}{"query": "{ status: { $in: [\"normal\", \"forbidden\"] }}"}{"query": "{ status: \"normal\", age: { $lt: 30 }}"}{"query": "{ createTime: {$gte: ISODate('2022-12-01T00:00:00.000+0800')}}"}{"query": "{ createTime: {$gte: ISODate('$[yyyy-mm-dd]T00:00:00.000+0800')}}"}
For incremental sync on non-date fields, use an assignment node to convert the field to the required type before passing it to Data Integration. See Assignment node and Scenario: typical application scenarios of scheduling parameters in Data Integration.
For MongoDB query syntax, see the official MongoDB documentation.
Writer script
The following script writes data from a stream source to MongoDB. Delete all inline comments before running.
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mongodb",
"parameter": {
"datasource": "",
"column": [
{"name": "_id", "type": "ObjectId"},
{"name": "age", "type": "int"},
{"name": "id", "type": "long"},
{"name": "wealth", "type": "double"},
{"name": "hobby", "type": "array", "splitter": " "},
{"name": "valid", "type": "boolean"},
{"name": "date_of_join", "format": "yyyy-MM-dd HH:mm:ss", "type": "date"}
],
"writeMode": {
"isReplace": "true",
"replaceKey": "_id"
},
"collectionName": "datax_test"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {"record": "0"},
"speed": {
"throttle": true,
"concurrent": 1,
"mbps": "1"
},
"jvmOption": "-Xms1024m -Xmx1024m"
},
"order": {
"hops": [{"from": "Reader", "to": "Writer"}]
}
}
Writer parameters
| Parameter | Description | Required | Default |
|---|---|---|---|
datasource |
Name of the data source. Must match the name configured in Data Integration. | Yes | — |
collectionName |
Name of the MongoDB collection. | Yes | — |
column |
Array of field definitions. Each entry specifies name and type. See supported types below. |
Yes | — |
writeMode |
Write behavior. Set isReplace to true to overwrite records with the same replaceKey. Set isReplace to false to skip overwriting. replaceKey specifies the business primary key (single key only). |
No | — |
preSql |
Pre-operation to run before writing, such as clearing historical data. Supports drop (deletes the collection) and remove (deletes records matching a condition). See preSql configuration. |
No | — |
`column` supported types for Writer:
type value |
Description |
|---|---|
int |
32-bit integer |
string |
String |
array |
Split a source string into an array using splitter. Specify itemtype for element type (double, int, long, bool, bytes, or string). Example: {"type":"array","name":"col","splitter":",","itemtype":"string"} |
json |
JSON string |
long |
Long integer |
date |
Date |
double |
Floating-point number |
For nested types, prefix type with document. and use dot notation for name:
{"type": "document.string", "name": "col_nest.col_string"}
{"type": "document.array", "name": "col_nest.col_array", "splitter": ",", "itemtype": "string"}
If isReplace is set to true and replaceKey is a field other than _id, a runtime error may occur: This happens when the written data contains _id values that conflict with the replaceKey. See FAQ for details.
After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"
preSql configuration
preSql runs before data writing begins. It does not affect the data being written, but enables idempotent task execution — if a task fails and is re-run, preSql clears the previous partial write first.
Supported `type` values:
-
drop— Deletes the collection specified bycollectionName. -
remove— Deletes records matching a condition.
Remove by JSON condition (recommended):
"preSql": {
"type": "remove",
"json": "{'operationTime': {'$gte': ISODate('${last_day}T00:00:00.424+0800')}}"
}
${last_day} is a DataWorks scheduling parameter in the format $[yyyy-mm-dd]. Supported MongoDB operators include $gt, $lt, $gte, $lte, and logical operators such as and and or. Functions such as ISODate, max, min, sum, and avg are also supported.
Internally, Data Integration runs:
query = (BasicDBObject) com.mongodb.util.JSON.parse(json);
col.deleteMany(query);
Remove by item conditions:
"preSql": {
"type": "remove",
"item": [
{"name": "pv", "value": "100", "condition": "$gt"},
{"name": "pid", "value": "10"}
]
}
Data Integration builds a query from the item conditions and runs col.deleteMany(query).
If preSql is not recognized, no pre-operation is performed.
Best practices
FAQ
Error: `no such cmd splitVector`
Some MongoDB versions do not support the splitVector command. Open the sync task in the code editor and add "useSplitVector": false to the MongoDB reader parameters. See splitVector compatibility.
Error: `After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"`
This error occurs when isReplace is true and replaceKey is set to a field other than _id. The update modifies a record whose _id does not match the replaceKey, violating MongoDB's immutability constraint on _id.
To avoid this: use _id as the replaceKey, or make sure the source data has consistent _id values that match your replace key. See FAQ about offline synchronization for MongoDB for more details.