All Products
Search
Document Center

DataWorks:MongoDB

Last Updated:Mar 26, 2026

The MongoDB data source lets Data Integration read from and write to MongoDB collections. Use it to:

  • Run offline batch sync tasks to extract and load MongoDB data at scale

  • Set up real-time sync tasks that stream changes from MongoDB

  • Synchronize an entire MongoDB database in a single whole-database sync task

Supported versions: MongoDB 4.x, 5.x, 6.x, 7.x, and 8.0.

Usage notes

Connection and authentication

Data Integration connects to MongoDB using a database account. If you use ApsaraDB for MongoDB, a root account is available by default — do not use it for sync tasks. Create a dedicated account with the minimum required permissions.

Self-managed MongoDB databases are only accessible over the Alibaba Cloud private network. Public network access is not supported. MongoDB clusters deployed with Docker are not supported.

Sharded cluster configuration

For sharded MongoDB clusters, configure the mongos address in the data source — not the mongod or shard node address. Pointing to a shard node causes the sync task to read from that shard only, missing data from other shards. For background on mongos and mongod, see mongos and mongod.

Concurrency and _id type consistency

When concurrency is greater than 1, Data Integration splits the task based on the _id field. All _id values in the collection must be the same type (for example, all strings or all ObjectIds). Mixed _id types with concurrency > 1 causes some records to fail.

If the collection has mixed _id types, set concurrency to 1 and leave splitFactor unconfigured (or set it to 1).

Array type

Data Integration does not natively support the MongoDB array type. To write array data to MongoDB, configure the splitter parameter to convert a delimited string into an array. After conversion, parallel writes are supported.

Reading data from specific columns via the query parameter is not supported.

Default field mapping

If field structure cannot be retrieved from MongoDB during an offline sync task, Data Integration generates a default mapping with six fields: col1, col2, col3, col4, col5, and col6.

splitVector compatibility

Data Integration uses the splitVector command by default for task sharding. Some MongoDB versions do not support this command, resulting in:

no such cmd splitVector

To disable splitVector, open the sync task in the code editor and add the following to the MongoDB reader parameters:

"useSplitVector": false

Supported field types

Primitive types (MongoDB Reader)

For primitive types, MongoDB Reader reads field values automatically based on the field name and type configured in the sync task. No additional type configuration is needed.

Type Supported Description
ObjectId Yes Object ID type
Double Yes 64-bit floating-point
32-bit integer Yes 32-bit integer
64-bit integer Yes 64-bit integer
Decimal128 Yes When used as a nested or Combine type, add decimal128OutputType: bigDecimal to output as decimal
String Yes String type
Boolean Yes Boolean type
Timestamp Yes BsonTimestamp; no time zone conversion required
Date Yes Date type

Complex types (MongoDB Reader)

For complex types, configure the type property in the column definition to control how data is processed.

Type Supported Behavior
Document Yes Without type: serialized as a JSON string. With type: document: read nested properties by path (see Nested document parsing)
Array Yes array.json or arrays: serialized as a JSON string. array or document.array: concatenated into a string using the splitter separator

Combine type (Data Integration custom type)

The Combine type is a Data Integration-specific type. When type is set to combine, MongoDB Reader removes the key for the configured column, then serializes all remaining fields in the document into a single JSON string.

Example: Given three documents with fixed fields a and b, plus variable fields x_n:

  • doc1: a b x_1 x_2

  • doc2: a b x_2 x_3 x_4

  • doc3: a b x_5

Configure the column mapping as:

"column": [
  {"name": "a", "type": "string"},
  {"name": "b", "type": "string"},
  {"name": "doc", "type": "combine"}
]

Output in MaxCompute:

odps_column1 odps_column2 odps_column3
a b {x_1, x_2}
a b {x_2, x_3, x_4}
a b {x_5}
Common fields (a and b) are automatically excluded from the Combine output. Only fields unique to each document are retained in the merged JSON string.

MongoDB Reader type conversions

Data Integration type MongoDB types
LONG INT, LONG, document.INT, document.LONG
DOUBLE DOUBLE, document.DOUBLE
STRING STRING, ARRAY, document.STRING, document.ARRAY, COMBINE
DATE DATE, document.DATE
BOOLEAN BOOL, document.BOOL
BYTES BYTES, document.BYTES

MongoDB Writer type conversions

Data Integration type MongoDB types
Integer INT, LONG
Floating-point DOUBLE
String STRING, ARRAY
Date and time DATE
Boolean BOOL
Binary BYTES

Nested document parsing

For multi-level nested documents, use type: document to read nested fields by path.

Source document in MongoDB:

{
  "name": "name1",
  "a": {
    "b": {
      "c": "this is value"
    }
  }
}

Column configuration:

{"name": "_id", "type": "string"}
{"name": "name", "type": "string"}
{"name": "a.b.c", "type": "document"}

After the sync task runs, the value of a.b.c (this is value) is written to the destination field c.

Add a data source

Before creating a sync task, add MongoDB as a data source in DataWorks. See Data source management for the procedure and parameter descriptions.

Configure a sync task

Offline sync task

Real-time sync task

Whole-database sync task

Script reference

Use this section when configuring a sync task in the code editor. For the general script format, see Configure a task in the code editor.

Reader script

The following script reads data from MongoDB and writes to a stream destination. Delete all inline comments before running.

Important

Extracting a specific element from an array is not supported.

{
  "type": "job",
  "version": "2.0",
  "steps": [
    {
      "category": "reader",
      "name": "Reader",
      "parameter": {
        "datasource": "datasourceName",
        "collectionName": "tag_data",
        "query": "",
        "column": [
          {"name": "unique_id", "type": "string"},
          {"name": "sid", "type": "string"},
          {"name": "user_id", "type": "string"},
          {"name": "auction_id", "type": "string"},
          {"name": "content_type", "type": "string"},
          {"name": "pool_type", "type": "string"},
          {"name": "frontcat_id", "type": "array", "splitter": ""},
          {"name": "categoryid", "type": "array", "splitter": ""},
          {"name": "gmt_create", "type": "string"},
          {"name": "taglist", "type": "array", "splitter": " "},
          {"name": "property", "type": "string"},
          {"name": "scorea", "type": "int"},
          {"name": "scoreb", "type": "int"},
          {"name": "scorec", "type": "int"},
          {"name": "a.b", "type": "document.int"},
          {"name": "a.b.c", "type": "document.array", "splitter": " "}
        ]
      },
      "stepType": "mongodb"
    },
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "common": {
      "column": {
        "timeZone": "GMT+0"
      }
    },
    "errorLimit": {
      "record": "0"
    },
    "speed": {
      "throttle": true,
      "concurrent": 1,
      "mbps": "12"
    }
  },
  "order": {
    "hops": [{"from": "Reader", "to": "Writer"}]
  }
}

Reader parameters

Parameter Description Required Default
datasource Name of the data source. Must match the name configured in Data Integration. Yes
collectionName Name of the MongoDB collection. Yes
column Array of field definitions. Each entry specifies name and type. See supported types below. Yes
query Filter to limit the data range returned. Supports specific date formats only; JavaScript syntax is not supported. No
hint Forces the query optimizer to use a specific index. Example: {"collectionName": "test_collection", "hint": "{age:1}"}. See hint parameter. No
batchSize Number of records retrieved per batch. Decrease this value if the cursor times out. No 1000
cursorTimeoutInMs Cursor timeout in milliseconds. Set to a negative value to disable timeout (not recommended — an abandoned cursor persists on the server until restart). If the cursor times out, decrease batchSize or increase this value. No 600000
splitFactor Adjusts chunk granularity when data skew is significant. Increase this value to reduce chunk size without raising concurrency. No
useSplitVector Set to false to disable the splitVector command. Required for MongoDB versions that do not support splitVector. No true
decimal128OutputType Set to bigDecimal to output Decimal128 fields as decimal when used in nested or Combine type configurations. No

Supported `column` type values:

type value Description
string String
long Integer
double Floating-point number
date Date
bool Boolean
bytes Binary
arrays Array serialized as a JSON string, e.g., ["a","b","c"]
array Array concatenated as a delimited string using splitter, e.g., a,b,c. Use arrays instead when possible.
combine Merges all remaining document fields into a single JSON string

`query` examples:

{"query": "{ status: \"normal\"}"}
{"query": "{ status: { $in: [\"normal\", \"forbidden\"] }}"}
{"query": "{ status: \"normal\", age: { $lt: 30 }}"}
{"query": "{ createTime: {$gte: ISODate('2022-12-01T00:00:00.000+0800')}}"}
{"query": "{ createTime: {$gte: ISODate('$[yyyy-mm-dd]T00:00:00.000+0800')}}"}

For incremental sync on non-date fields, use an assignment node to convert the field to the required type before passing it to Data Integration. See Assignment node and Scenario: typical application scenarios of scheduling parameters in Data Integration.

For MongoDB query syntax, see the official MongoDB documentation.

Writer script

The following script writes data from a stream source to MongoDB. Delete all inline comments before running.

{
  "type": "job",
  "version": "2.0",
  "steps": [
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "mongodb",
      "parameter": {
        "datasource": "",
        "column": [
          {"name": "_id", "type": "ObjectId"},
          {"name": "age", "type": "int"},
          {"name": "id", "type": "long"},
          {"name": "wealth", "type": "double"},
          {"name": "hobby", "type": "array", "splitter": " "},
          {"name": "valid", "type": "boolean"},
          {"name": "date_of_join", "format": "yyyy-MM-dd HH:mm:ss", "type": "date"}
        ],
        "writeMode": {
          "isReplace": "true",
          "replaceKey": "_id"
        },
        "collectionName": "datax_test"
      },
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "errorLimit": {"record": "0"},
    "speed": {
      "throttle": true,
      "concurrent": 1,
      "mbps": "1"
    },
    "jvmOption": "-Xms1024m -Xmx1024m"
  },
  "order": {
    "hops": [{"from": "Reader", "to": "Writer"}]
  }
}

Writer parameters

Parameter Description Required Default
datasource Name of the data source. Must match the name configured in Data Integration. Yes
collectionName Name of the MongoDB collection. Yes
column Array of field definitions. Each entry specifies name and type. See supported types below. Yes
writeMode Write behavior. Set isReplace to true to overwrite records with the same replaceKey. Set isReplace to false to skip overwriting. replaceKey specifies the business primary key (single key only). No
preSql Pre-operation to run before writing, such as clearing historical data. Supports drop (deletes the collection) and remove (deletes records matching a condition). See preSql configuration. No

`column` supported types for Writer:

type value Description
int 32-bit integer
string String
array Split a source string into an array using splitter. Specify itemtype for element type (double, int, long, bool, bytes, or string). Example: {"type":"array","name":"col","splitter":",","itemtype":"string"}
json JSON string
long Long integer
date Date
double Floating-point number

For nested types, prefix type with document. and use dot notation for name:

{"type": "document.string", "name": "col_nest.col_string"}
{"type": "document.array", "name": "col_nest.col_array", "splitter": ",", "itemtype": "string"}
Important

If isReplace is set to true and replaceKey is a field other than _id, a runtime error may occur: This happens when the written data contains _id values that conflict with the replaceKey. See FAQ for details.

After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"

preSql configuration

preSql runs before data writing begins. It does not affect the data being written, but enables idempotent task execution — if a task fails and is re-run, preSql clears the previous partial write first.

Supported `type` values:

  • drop — Deletes the collection specified by collectionName.

  • remove — Deletes records matching a condition.

Remove by JSON condition (recommended):

"preSql": {
  "type": "remove",
  "json": "{'operationTime': {'$gte': ISODate('${last_day}T00:00:00.424+0800')}}"
}

${last_day} is a DataWorks scheduling parameter in the format $[yyyy-mm-dd]. Supported MongoDB operators include $gt, $lt, $gte, $lte, and logical operators such as and and or. Functions such as ISODate, max, min, sum, and avg are also supported.

Internally, Data Integration runs:

query = (BasicDBObject) com.mongodb.util.JSON.parse(json);
col.deleteMany(query);

Remove by item conditions:

"preSql": {
  "type": "remove",
  "item": [
    {"name": "pv", "value": "100", "condition": "$gt"},
    {"name": "pid", "value": "10"}
  ]
}

Data Integration builds a query from the item conditions and runs col.deleteMany(query).

If preSql is not recognized, no pre-operation is performed.

Best practices

FAQ

Error: `no such cmd splitVector`

Some MongoDB versions do not support the splitVector command. Open the sync task in the code editor and add "useSplitVector": false to the MongoDB reader parameters. See splitVector compatibility.

Error: `After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"`

This error occurs when isReplace is true and replaceKey is set to a field other than _id. The update modifies a record whose _id does not match the replaceKey, violating MongoDB's immutability constraint on _id.

To avoid this: use _id as the replaceKey, or make sure the source data has consistent _id values that match your replace key. See FAQ about offline synchronization for MongoDB for more details.