All Products
Search
Document Center

DataWorks:MongoDB data source

Last Updated:Nov 26, 2025

DataWorks provides bidirectional data synchronization with MongoDB. This topic describes the data synchronization capabilities that DataWorks provides for MongoDB.

Supported versions

DataWorks supports MongoDB versions 4.x, 5.x, 6.x, 7.x, and 8.0.

Usage notes

  • Connect to a MongoDB database using an account created for that database. If you use an ApsaraDB for MongoDB data source, a root account is created by default. For security reasons, we recommend that you do not use the root account when you add a MongoDB data source.

  • If you use a MongoDB sharded cluster, you must configure the address of a mongos node for the data source. Do not configure the address of a mongod/shard node. Otherwise, a synchronization task may query data only from the specified shard instead of the entire dataset. For more information about mongos and mongod, see the mongos and mongod documentation.

  • MongoDB primary/secondary clusters are not supported.

  • If the concurrency is greater than 1, all _id fields in the collection that is configured for the synchronization task must have the same data type. For example, all _id fields must be of the string or ObjectId type. Otherwise, some data may not be synchronized.

    Note
    • When the concurrency is greater than 1, the task is split based on the _id field. Therefore, mixed data types are not supported for the _id field in this scenario. If the _id field contains multiple data types, set the concurrency to 1 for data synchronization. To do this, do not configure the splitFactor parameter, or set the splitFactor parameter to 1.

  • Data Integration does not support the array type. However, MongoDB supports the array type and provides a powerful indexing feature. You can configure specific parameters to convert strings into MongoDB arrays. After the conversion, you can write the data to MongoDB in parallel.

  • Self-managed MongoDB databases do not support public network access. They can be accessed only over the Alibaba Cloud internal network.

  • MongoDB clusters that are deployed using Docker are not supported.

  • Data Integration does not support reading data from specified columns using the query parameter.

  • In a batch synchronization task, if Data Integration cannot retrieve the field structure from MongoDB, Data Integration generates field mappings for six fields by default. The field names are col1, col2, col3, col4, col5, and col6.

  • During task execution, the splitVector command is used by default to shard the task. Some MongoDB versions do not support the splitVector command, which may cause a no such cmd splitVector error. To prevent this error, click the image icon in the task configuration, switch to the code editor, and add the following parameter to the MongoDB parameter configuration to prevent the use of splitVector.

    "useSplitVector" : false

Supported field types

MongoDB data types supported by MongoDB Reader

Data Integration supports most, but not all, MongoDB data types. Make sure that your data types are supported.

When Data Integration reads supported data types, it performs the following operations:

  • For primitive data types, Data Integration automatically reads data from the corresponding path based on the name of the field that is configured in the column parameter. For more information, see Appendix: Sample script and parameter description for MongoDB. Data Integration also automatically converts the data type. You do not need to specify the type property for the column.

    Type

    Batch read (MongoDB Reader)

    Description

    ObjectId

    Supported

    The object ID type.

    Double

    Supported

    The 64-bit floating-point number type.

    32-bit integer

    Supported

    A 32-bit integer.

    64-bit integer

    Supported

    A 64-bit integer.

    Decimal128

    Supported

    The Decimal128 type.

    Note

    If a field is configured as a nested type or a combine type, it is processed as an object during JSON serialization. You must add the decimal128OutputType parameter and set it to bigDecimal to output the data as a decimal.

    String

    Supported

    The string type.

    Boolean

    Supported

    The Boolean type.

    Timestamp

    Supported

    The timestamp type.

    Note

    BsonTimestamp stores timestamps. You do not need to consider the impact of time zones. For more information, see Time zone issues in MongoDB.

    Date

    Support

    The date type.

  • For some complex data types, you can configure the type property for the column to perform custom processing.

    Type

    Batch read (MongoDB Reader)

    Description

    Document

    Supported

    The embedded document type.

    • If the type property is not configured, the Document is directly converted using JSON serialization.

    • If the type property is set to document, the field is a nested type. MongoDB Reader reads the Document properties based on the path. For a detailed example, see Example 2: Recursively parsing a multi-level nested Document below.

    Array

    Supported

    The array type.

    • If type is set to array.json or arrays, the data is directly processed using JSON serialization.

    • If type is set to array or document.array, the elements are concatenated into a string. The separator, which is specified in the splitter property of the column, is a comma (,) by default.

    Important

    Data Integration does not support the array type. However, MongoDB supports the array type and provides a powerful indexing feature. You can configure specific parameters to convert strings into MongoDB arrays. After the conversion, you can write the data to MongoDB in parallel.

Special Data Integration data type: combine

Type

Batch read (MongoDB Reader)

Description

Combine

Supported

A custom data type in Data Integration.

If type is set to combine, MongoDB Reader removes the keys corresponding to the configured columns and serializes all other information in the entire Document into a JSON output. For a detailed example, see Example 1: Using the combine type below.

MongoDB Reader data type mappings

The following table lists the mappings between MongoDB data types and Data Integration data types for MongoDB Reader.

Converted type category

MongoDB data type

LONG

INT, LONG, document.INT, and document.LONG

DOUBLE

DOUBLE and document.DOUBLE

STRING

STRING, ARRAY, document.STRING, document.ARRAY, and COMBINE

DATE

DATE and document.DATE

BOOLEAN

BOOL and document.BOOL

BYTES

BYTES and document.BYTES

MongoDB Writer data type mappings

Type category

MongoDB data type

Integer

INT and LONG

Floating-point

DOUBLE

String

STRING and ARRAY

Date and time

DATE

Boolean

BOOL

Binary

BYTES

Example 1: Using the combine type

The combine data type of the MongoDB Reader plugin lets you merge multiple fields in a MongoDB document into a single JSON string. For example, assume that you want to import fields from three MongoDB documents to MaxCompute. In the following example, the fields are represented by keys instead of key-value pairs. The fields a and b are common to all three documents, and x_n is a variable field.

  • doc1: a b x_1 x_2

  • doc2: a b x_2 x_3 x_4

  • doc3: a b x_5

In the configuration file, you must explicitly specify the fields that require one-to-one mapping. For the fields that you want to merge, assign a new name that is different from any existing field name in the document and set the type to COMBINE. The following code provides an example.

"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]

The following table shows the final output in MaxCompute.

odps_column1

odps_column2

odps_column3

a

b

{x_1,x_2}

a

b

{x_2,x_3,x_4}

a

b

{x_5}

Note

After you use the combine type to merge multiple fields in a MongoDB document, common fields are automatically deleted when the output is mapped to MaxCompute. Only the unique fields of the document are retained.

For example, a and b are common fields in all documents. After the fields in the document doc1: a b x_1 x_2 are merged using the combine type, the output is {a,b,x_1,x_2}. When this result is mapped to MaxCompute, the common fields a and b are deleted. The final output is {x_1,x_2}.

Example 2: Recursively parsing a multi-level nested Document

If a document in MongoDB has multiple levels of nesting, you can configure the document type to recursively parse it. The following code provides an example.

  • Source data in MongoDB:

    {
        "name": "name1",
        "a":
        {
            "b":
            {
                "c": "this is value"
            }
        }
    }
  • MongoDB column configuration:

    {"name":"_id","type":"string"}
    {"name":"name","type":"string"}
    {"name":"a.b.c","type":"document"}

    eg

With the preceding configuration, the value of the nested source field a.b.c is written to the destination field c. After the synchronization task runs, the data that is written to the destination is this is value.

Add a data source

Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data Source Management. You can view the infotips of parameters in the DataWorks console to understand the meanings of the parameters when you add a data source.

Develop a data synchronization task

For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.

Configure a batch synchronization task for a single table

Configure a real-time synchronization task for a single table

For more information about the procedure, see Configure a real-time synchronization task in Data Integration and Configure a real-time synchronization task in DataStudio.

Configure a synchronization task for an entire database

You can configure tasks for batch synchronization, full and incremental real-time synchronization, or real-time synchronization from sharded databases for an entire database. For more information, see Batch synchronization task for an entire database and Configure a real-time synchronization task for an entire database.

Best practices

FAQ

Appendix: Sample script and parameter description for MongoDB

Configure a batch synchronization task by using the code editor

If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configure a task in the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.

Reader script sample

The following script is an example of a job that is configured to extract data from MongoDB to a local environment. For more information about the parameters, see the parameter descriptions that follow.

Important
  • Before you run the code, delete the comments.

  • You cannot extract specified elements from an array.

{
    "type":"job",
    "version":"2.0",// Version number.
    "steps":[
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {
                "datasource": "datasourceName", // Data source name.
                "collectionName": "tag_data", // Collection name.
                "query": "", // Data filtering query.
                "column": [
                    {
                        "name": "unique_id", // Field name.
                        "type": "string" // Field type.
                    },
                    {
                        "name": "sid",
                        "type": "string"
                    },
                    {
                        "name": "user_id",
                        "type": "string"
                    },
                    {
                        "name": "auction_id",
                        "type": "string"
                    },
                    {
                        "name": "content_type",
                        "type": "string"
                    },
                    {
                        "name": "pool_type",
                        "type": "string"
                    },
                    {
                        "name": "frontcat_id",
                        "type": "array",
                        "splitter": ""
                    },
                    {
                        "name": "categoryid",
                        "type": "array",
                        "splitter": ""
                    },
                    {
                        "name": "gmt_create",
                        "type": "string"
                    },
                    {
                        "name": "taglist",
                        "type": "array",
                        "splitter": " "
                    },
                    {
                        "name": "property",
                        "type": "string"
                    },
                    {
                        "name": "scorea",
                        "type": "int"
                    },
                    {
                        "name": "scoreb",
                        "type": "int"
                    },
                    {
                        "name": "scorec",
                        "type": "int"
                    },
                    {
                        "name": "a.b",
                        "type": "document.int"
                    },
                    {
                        "name": "a.b.c",
                        "type": "document.array",
                        "splitter": " "
                    }
                ]
            },
            "stepType": "mongodb"
        },
        { 
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "common": { 
            "column": { 
                "timeZone": "GMT+0" // Time zone.
            } 
        },
        "errorLimit":{
            "record":"0"// Number of error records.
        },
        "speed":{
            "throttle":true,// Specifies whether to enable throttling. If you set this parameter to false, throttling is disabled and the mbps parameter does not take effect. If you set this parameter to true, throttling is enabled.
            "concurrent":1, // Number of concurrent jobs.
            "mbps":"12"// Throttling rate. 1 mbps = 1 MB/s.
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Parameter

Description

datasource

The name of the data source. In the code editor, the value of this parameter must be the same as the name of the added data source.

collectionName

The name of the MongoDB collection.

hint

MongoDB supports the hint parameter, which forces the query optimizer to use a specific index to complete a query. In some cases, this can improve query performance. For more information, see hint parameter. The following code provides an example:

{
"collectionName":"test_collection",
"hint":"{age:1}"
}

column

The document field names in MongoDB. Configure them as an array to represent multiple fields.

  • name: The name of the column.

  • The supported types for the type parameter include the following:

    • string: String.

    • long: Integer.

    • double: Floating-point number.

    • date: Date.

    • bool: Boolean value.

    • bytes: Binary sequence.

    • arrays: Read as a JSON string, such as ["a","b","c"].

    • array: Read as a string with elements separated by the splitter, such as a,b,c. We recommend that you use the arrays format.

    • combine: When you use the MongoDB Reader plugin to read data, you can merge multiple fields in a MongoDB document into a single JSON string.

  • splitter: MongoDB supports the array type, but the Data Integration framework does not. Therefore, the array type read from MongoDB must be merged into a string using this separator.

batchSize

The number of records to retrieve in a batch. This parameter is optional. Default value: 1000.

cursorTimeoutInMs

The cursor timeout period. This parameter is optional. Default value: 1000 * 60 * 10 = 600000. If cursorTimeoutInMs is set to a negative value, the cursor never times out.

Note
  • We do not recommend setting the cursor to never time out. If the client program exits unexpectedly, a cursor that never times out will remain on the MongoDB server until the service is restarted.

  • If a cursor timeout occurs, you can perform the following operations:

    • Decrease the number of records retrieved in a batch using the batchSize parameter.

    • Increase the cursor timeout period using the cursorTimeoutInMs parameter.

query

You can use this parameter to filter the MongoDB data that is returned. Only the following time formats are supported. The UNIX timestamp format is not directly supported.

Note
  • The query parameter does not support JavaScript syntax.

  • Reading data from specified columns is not supported.

The following code provides common examples for the query parameter:

  • Query data with a status of normal.

    {
      ...
      "query":"{ status: "normal"}"
      ...
    }
  • status: "normal"

    {
      ...
      "query":"{ status: { $in: [ "normal", "forbidden" ] }}"
      ...
    }
  • AND syntax: Query data with a status of normal and an age less than 30.

    {
      ...
      "query":"{ status: "normal", age: { $lt: 30 }}"
      ...
    }
  • Date syntax: Query data with a creation time greater than or equal to 2022-12-01 00:00:00.000. +0800 indicates the UTC+8 time zone.

    {
      ...
      "query":"{ createTime:{$gte:ISODate('2022-12-01T00:00:00.000+0800')}}"
      ...
    }
  • Date syntax with a scheduling parameter placeholder: Query data with a creation time greater than or equal to a specific point in time.

    {
      ...
      "query":"{ createTime:{$gte:ISODate('$[yyyy-mm-dd]T00:00:00.000+0800')}}"
      ...
    }
    Note

    For more information about scheduling parameters, see Scenarios: Typical application scenarios of scheduling parameters in Data Integration. For information about how to implement incremental synchronization for batch tasks, see Use scheduling parameters in Data Integration.

  • Incremental synchronization for non-time fields.

    You can use an assignment node to process a field into the target data type and then pass it to Data Integration for data synchronization. For example, if the incremental field in MongoDB is a UNIX timestamp, you can use an assignment node to convert a time-type field into a UNIX timestamp using an engine function. Then, you can pass the timestamp to the batch synchronization task. For more information about how to use an assignment node, see Assignment node.

Note

For more information about the query syntax of MongoDB, see the official MongoDB documentation.

splitFactor

If severe data skew exists, consider increasing the splitFactor to achieve finer-grained sharding without increasing the concurrency.

Writer script sample

The following script is an example of a data synchronization job that is configured to write data to MongoDB. For more information about the parameters, see the parameter descriptions that follow.

{
    "type": "job",
    "version": "2.0",// Version number.
    "steps": [
        {
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "mongodb",// Plugin name.
            "parameter": {
                "datasource": "",// Data source name.
                "column": [
                    {
                        "name": "_id",// Column name.
                        "type": "ObjectId"// Data type. If replaceKey is _id, you must set type to ObjectId. If you set type to string, the replacement fails.
                    },
                    {
                        "name": "age",
                        "type": "int"
                    },
                    {
                        "name": "id",
                        "type": "long"
                    },
                    {
                        "name": "wealth",
                        "type": "double"
                    },
                    {
                        "name": "hobby",
                        "type": "array",
                        "splitter": " "
                    },
                    {
                        "name": "valid",
                        "type": "boolean"
                    },
                    {
                        "name": "date_of_join",
                        "format": "yyyy-MM-dd HH:mm:ss",
                        "type": "date"
                    }
                ],
                "writeMode": {// Write mode.
                    "isReplace": "true",
                    "replaceKey": "_id"
                },
                "collectionName": "datax_test"// Collection name.
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {// Number of error records.
            "record": "0"
        },
        "speed": {
            "throttle": true,// Specifies whether to enable throttling. If you set this parameter to false, throttling is disabled and the mbps parameter does not take effect. If you set this parameter to true, throttling is enabled.
            "concurrent": 1,// Number of concurrent jobs.
            "mbps": "1"// Throttling rate. 1 mbps = 1 MB/s.
        },
       "jvmOption": "-Xms1024m -Xmx1024m"
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

Writer script parameters

Parameter

Description

Required

Default value

datasource

The name of the data source. In the code editor, the value of this parameter must be the same as the name of the added data source.

Yes

None

collectionName

The name of the MongoDB collection.

Yes

None

column

The document field names in MongoDB. Configure them as an array to represent multiple fields.

  • name: The name of the column.

  • type: The type of the column.

    • int: A 32-bit integer.

    • string: A string.

    • array: The splitter parameter is required. It is used to separate the source string. For example:

      If the source data is a,b,c and splitter is set to a comma (,), the data is split into the array ["a","b","c"] and written to MongoDB.

      {"type":"array","name":"col_split_array","splitter":",","itemtype":"string"}
      Note

      For the array type, the itemtype parameter supports the following enumeration types: double, int, long, bool, bytes, and string.

    • json: A JSON string.

    • long: A long integer.

    • date: A date.

    • double: A floating-point number.

    Note

    MongoDB Writer also supports writing nested types. Add the type prefix document. to indicate a nested type. You can configure a cascaded name for the name parameter. Example:

    {"type":"document.string","name":"col_nest.col_string"}
    {"type":"document.array","name":"col_nest.col_split_array","splitter":",","itemtype":"string"}
  • splitter: A special separator. Use this parameter only when a string needs to be split into a character array. The string is split into an array using the specified separator and stored in MongoDB.

Yes

None

writeMode

Specifies whether to overwrite data during transmission. It includes isReplace and replaceKey:

  • isReplace: If set to true, an overwrite operation is performed for the same replaceKey. If set to false, no overwrite operation is performed.

  • replaceKey: Specifies the business primary key for each record, which is used for overwriting. Multiple keys are not supported for replaceKey. This usually refers to the primary key in MongoDB.

Note

If isReplace is set to true and a field other than the _id field is configured as the replaceKey, an error similar to the following one may occur during runtime:

After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"

This is because the data to be written contains records where the _id does not match the replaceKey. For more information, see the FAQ: Error: After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2".

No

None

preSql

A pre-operation to execute before writing data to MongoDB, such as clearing historical data. If preSql is empty, no pre-operation is configured. When you configure preSql, make sure that its value complies with the JSON syntax.

No

None

When you run a Data Integration job, the configured preSql is executed first. The actual data writing phase begins only after the preSql execution is complete. The preSql parameter does not affect the content of the data that is written. The preSql parameter provides idempotent execution for Data Integration. For example, your preSql can be used to clear historical data before each task run based on your business rules. In this case, if a task fails, you can simply rerun the Data Integration job.

The format requirements for preSql are as follows:

  • You must configure the type field to specify the pre-operation type. The supported values are drop and remove. Example: "preSql":{"type":"remove"}.

    • drop: Deletes the collection and the data in it. The collection to be deleted is specified by the collectionName parameter.

    • remove: Deletes data based on a condition.

    • json: You can use a JSON object to specify the conditions for data deletion. Example: "preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}. In this example, ${last_day} is a DataWorks scheduling parameter in the $[yyyy-mm-dd] format. You can also use other MongoDB-supported conditional operators (such as $gt, $lt, $gte, and $lte), logical operators (such as and and or), or functions (such as max, min, sum, avg, and ISODate) as needed.

      Data Integration executes the data deletion query using the following standard MongoDB API.

      query=(BasicDBObject) com.mongodb.util.JSON.parse(json);        
      col.deleteMany(query);
      Note

      To delete data based on conditions, we recommend that you use the JSON configuration.

    • item: You can configure the column name (name), condition (condition), and column value (value) for data filtering in an item. Example: "preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}.

      Data Integration constructs a query condition based on the configured item conditions and then executes the deletion using a standard MongoDB API. For example: col.deleteMany(query);.

  • If the preSql is not recognized, no pre-deletion operation is performed.