Use the Data Integration feature of DataWorks to read JSON fields from a MongoDB collection and load them into a MaxCompute table as a batch sync job.
Prerequisites
Before you begin, ensure that you have:
Activated MaxCompute and DataWorks. See Activate MaxCompute and DataWorks
Created a business flow in DataWorks (basic mode). See Create a business flow
Limitations
The MongoDB instance must have an Internet endpoint enabled. The default resource group for DataWorks communicates with MongoDB over the public internet. A Virtual Private Cloud (VPC)-only instance cannot be reached.
How it works
Data Integration maps MongoDB JSON fields to MaxCompute columns using dot-notation paths. For example, given this source document:
{
"store": {
"bicycle": {
"color": "red",
"price": 19.95
}
},
"expensive": 10
}The field store.bicycle.color maps to a MaxCompute column. You specify this path in the sync job configuration along with a type annotation:
| JSON field path | Type annotation | Note |
|---|---|---|
store.bicycle.color | document.String | Nested field — use the document. prefix followed by the data type |
expensive | string | Top-level field — specify the type directly |
Step 1: Prepare test data in MongoDB
Create a database user for DataWorks to authenticate with. In the MongoDB shell, run:
db.createUser({user:"bookuser",pwd:"123456",roles:["user1"]})This creates a user named
bookuserwith password123456and theuser1role.Load test data into the collection. This tutorial uses an ApsaraDB for MongoDB instance with a VPC network type. Before proceeding, request an Internet endpoint for the instance — without it, the default resource group for DataWorks cannot reach the instance. Insert the following JSON document into the
userlogcollection of theadmindatabase:{ "store": { "book": [ { "category": "reference", "author": "Nigel Rees", "title": "Sayings of the Century", "price": 8.95 }, { "category": "fiction", "author": "Evelyn Waugh", "title": "Sword of Honour", "price": 12.99 }, { "category": "fiction", "author": "J. R. R. Tolkien", "title": "The Lord of the Rings", "isbn": "0-395-19395-8", "price": 22.99 } ], "bicycle": { "color": "red", "price": 19.95 } }, "expensive": 10 }Verify the data was inserted correctly. In the DMS console for MongoDB, run the following query against the
admindatabase:db.userlog.find().limit(10)
Step 2: Create the destination table in MaxCompute
Log on to the DataWorks console.
In the business flow you created, right-click and choose New Table > MaxCompute > Table.
On the Create Table page, select the engine type and enter a table name.
On the table editing page, click DDL Statement.
In the DDL Mode dialog box, enter the following statement and click Generate Table Schema.
create table mqdata (mqdata string);ImportantThe table name in the DDL statement must match the table name you entered on the Create Table page.
Click Commit To Production Environment.
Step 3: Add the MongoDB data source
Add MongoDB as a data source in DataWorks. See Configure a MongoDB data source.
Step 4: Create and run the sync job
In the business flow, right-click the workflow and choose Create Node > Data Integration > Offline synchronization.
In the Create Node dialog box, enter a name and click Confirm.
In the top navigation bar, click the
icon to switch to script mode.In script mode, click the
icon.In the Import Template dialog box, set the source type, data source, target type, and data source fields, then click Confirm.
Replace the generated script with the following configuration. This script reads
store.bicycle.colorfrom theuserlogcollection and writes it to themqdatacolumn in MaxCompute.{ "type": "job", "steps": [ { "stepType": "mongodb", "parameter": { "datasource": "mongodb_userlog", "column": [ { "name": "store.bicycle.color", "type": "document.String" } ], "collectionName": "userlog" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "", "isCompress": false, "truncate": true, "datasource": "odps_first", "column": [ "mqdata" ], "emptyAsNull": false, "table": "mqdata" }, "name": "Writer", "category": "writer" } ], "version": "2.0", "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "" }, "speed": { "concurrent": 2, "throttle": false } } }Key parameters:
Parameter Value Description datasource(reader)mongodb_userlogName of the MongoDB data source added in Step 3 column[].namestore.bicycle.colorDot-notation path to the target JSON field column[].typedocument.StringType annotation for nested fields; use stringfor top-level fieldscollectionNameuserlogMongoDB collection to read from datasource(writer)odps_firstName of the MaxCompute data source column(writer)mqdataMaxCompute column to write to tablemqdataMaxCompute destination table speed.concurrent2Number of concurrent read threads Click the
icon to run the job.Check the Operation Log tab to confirm the job completed without errors.
Verify the result
In the business flow, right-click the workflow and choose New > MaxCompute > ODPS SQL.
In the Create Node dialog box, enter a node name and click Confirm.
On the ODPS SQL node editing page, enter the following query:
SELECT * from mqdata;Click the
icon to run the query.Check the Operation Log tab to view the results. The output should contain the value
red, which is the value ofstore.bicycle.colorin the source document.
What's next
To sync additional JSON fields, add more entries to the
columnarray in the sync job script.To run the sync job on a schedule, configure a scheduling dependency in DataWorks.