本文为您介绍MongoDB Writer支持的数据类型、写入方式、字段映射和数据源等参数和配置示例。
背景信息
MongoDB Writer插件利用MongoDB的Java客户端MongoClient进行MongoDB的写操作。最新版本的Mongo已经将DB锁的粒度从DB级别降低到Document级别,配合MongoDB强大的索引功能,基本可以满足数据源向MongoDB写入数据的需求。针对数据更新的需求,也可以通过配置业务主键的方式实现。
说明
- 在开始配置MongoDB Writer插件前,请首先配置好数据源,详情请参见配置MongoDB数据源。
- 如果您使用的是云数据库MongoDB版,MongoDB默认会有root账号。
- 出于安全策略的考虑,数据集成仅支持使用 MongoDB数据库对应账号进行连接。您在添加使用MongoDB数据源时,请避免使用root作为访问账号。
MongoDB Writer通过数据集成框架获取Reader生成的协议数据,然后将支持的类型通过逐一判断转换为MongoDB支持的类型。数据集成本身不支持数组类型,但MongoDB支持数组类型,并且数组类型具有强大的索引功能。
您可以通过参数的特殊配置,将字符串转换为MongoDB中的数组。转换类型后,即可并行写入MongoDB。
类型转换列表
MongoDB Writer支持大部分MongoDB类型,但也存在部分没有支持的情况,请注意检查您的数据类型。
MongoDB Writer针对MongoDB类型的转换列表,如下所示。
类型分类 | MongoDB数据类型 |
---|---|
整数类 | INT和LONG |
浮点类 | DOUBLE |
字符串类 | STRING和ARRAY |
日期时间类 | DATE |
布尔型 | BOOL |
二进制类 | BYTES |
说明 此处DATE类型,写入至MongoDB后即为DATETIME类型。
参数说明
参数 | 描述 | 是否必选 | 默认值 |
---|---|---|---|
datasource | 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 |
collectionName | MonogoDB的集合名。 | 是 | 无 |
column | MongoDB的文档列名,配置为数组形式表示MongoDB的多个列。
|
是 | 无 |
writeMode | 指定了传输数据时是否覆盖的信息,包括isReplace和replaceKey:
|
否 | 无 |
preSql | 表示数据同步写出MongoDB前的前置操作,例如清理历史数据等。如果preSql为空,表示没有配置前置操作。配置preSql时,需要确保preSql符合JSON语法要求。 | 否 | 无 |
执行数据集成作业时,会首先执行您已配置的preSql。完成preSql的执行后,才可以进入实际的数据读取或写出阶段。preSql本身不会影响读取和写出的数据内容。数据集成通过preSql参数,可以具备幂等执行特性。例如,您的preSql在每次任务执行前都会清理历史数据(根据您的业务规则进行清理)。此时,如果任务失败,您只需要重新执行数据集成作业即可。
preSql的格式要求如下:
- 需要配置type字段,表示前置操作类别,支持drop和remove,例如
"preSql":{"type":"remove"}
:- drop:表示删除集合和集合内的数据,collectionName参数配置的集合即是待删除的集合。
- remove:表示根据条件删除数据。
- json:您可以通过JSON控制待删除的数据条件,例如
"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}
。此处的${last_day}
为DataWorks调度参数,格式为$[yyyy-mm-dd]
。您可以根据需要具体使用其它MongoDB支持的条件操作符号($gt、$lt、$gte和$lte等)、逻辑操作符(and和or等)或函数(max、min、sum、avg和ISODate等)。数据集成通过如下MongoDB标准API执行您的数据,删除query。query=(BasicDBObject) com.mongodb.util.JSON.parse(json); col.deleteMany(query);
说明 如果您需要条件删除数据,建议优先使用JSON配置形式。 - item:您可以在item中配置数据过滤的列名(name)、条件(condition)和列值(value)。例如
"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}
。数据集成会基于您配置的item条件项,构造查询query条件,进而通过MongoDB标准API执行删除。例如
col.deleteMany(query);
。
- 不识别的preSql,无需进行任何前置删除操作。
向导开发介绍
暂不支持向导开发模式。
脚本开发介绍
使用脚本模式开发的详情请参见通过脚本模式配置任务。
配置写入MongoDB的数据同步作业,详情请参见上述参数说明。
{
"type": "job",
"version": "2.0",//版本号。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mongodb",//插件名。
"parameter": {
"datasource": "",//数据源名。
"column": [
{
"name": "_id",//列名。
"type": "ObjectId"//数据类型。如果replacekey为_id,则此处的type必须配置为ObjectID。如果配置为string,会无法进行替换。
},
{
"name": "age",
"type": "int"
},
{
"name": "id",
"type": "long"
},
{
"name": "wealth",
"type": "double"
},
{
"name": "hobby",
"type": "array",
"splitter": " "
},
{
"name": "valid",
"type": "boolean"
},
{
"name": "date_of_join",
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
],
"writeMode": {//写入模式。
"isReplace": "true",
"replaceKey": "_id"
},
"collectionName": "datax_test"//连接名称。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {//错误记录数。
"record": "0"
},
"speed": {
"jvmOption": "-Xms1024m -Xmx1024m",
"throttle": true,//false代表不限流,下面的限流的速度不生效;true代表限流。
"concurrent": 1,//作业并发数。
"mbps": "1"//限流的速度。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}