本文为您介绍MongoDB Reader支持的数据类型、字段映射和数据源等参数及配置示例。
MongoDB Reader插件通过MongoDB的Java客户端MongoClient,进行MongoDB的读操作。最新版本的Mongo已经将DB锁的粒度,从DB级别降低至document级别,配合MongoDB强大的索引功能,即可达到高性能读取MongoDB的需求。
MongoDB Reader通过数据集成框架从MongoDB并行地读取数据,通过主控的Job程序,按照指定规则对MongoDB中的数据进行分片并行读取,然后将MongoDB支持的类型通过逐一判断转换为数据集成支持的类型。
使用限制
- MongoDB版本限制:仅支持4.x版本。
- 数据集成支持使用MongoDB数据库对应账号进行连接,如果您使用的是云数据库MongoDB版,默认会有一个root账号。出于安全策略的考虑,在添加使用MongoDB数据源时,请避免使用root作为访问账号。
- 在并发大于1的情况下,同步任务配置的集合中所有
_id
字段类型必须一致(例如,_id
字段都为string类型或者ObjectId类型),否则会出现部分数据无法同步的问题。说明 并发大于1时,任务拆分会使用_id
字段进行划分,因而在此场景下_id
字段不支持混合类型。如果_id
有多种字段类型,您可以使用单并发的形式进行数据同步,且不配置splitFactor或splitFactor配置为1。
数据类型与类型转换
支持的MongoDB数据类型
MongoDB Reader支持大部分MongoDB类型,但也存在部分没有支持的情况,请注意检查您的数据类型。
- 基本类型的数据,会根据同步任务配置的读取字段(column,详见下文的全量参数说明)中的name自动读取对应path下的数据,并根据数据类型做自动转换,您无需指定column的type属性。
类型 是否支持 说明 ObjectId 支持 对象ID类型。 Double 支持 64位浮点数类型。 32-bit integer 支持 32位整数。 64-bit integer 支持 64位整数。 Decimal128 支持 Decimal128类型。 说明 如果配置为嵌套类型、Combine类型,JSON序列化时会被当做对象处理,需增加参数decimal128OutputType
为bigDecimal
,才能输出为decimal。String 支持 字符串类型。 Boolean 支持 布尔类型。 Timestamp 支持 时间戳类型。 说明 BsonTimestamp存储的是时间戳,无需考虑时区影响,详情请参见MongoDB中的时区问题。Date 支持 日期类型。 - 部分复杂类型的数据,您可通过配置column的type属性,进行自定义处理。
类型 是否支持 说明 Document 支持 嵌入文档类型。 - 如果没有配置type属性,则直接将Document转JSON序列化处理。
- 如果配置了type属性为
document
,则属于嵌套类型,MongoDB Reader会按path读取Document属性。详细示例请参见下文的数据类型示例2:递归解析处理多层嵌套的Document。
Array 支持 数组类型。 - 如果type配置为
array.json
、arrays
,直接JSON序列化处理。 - 如果type配置为
array
、document.array
,则拼接为字符串,分隔符(column中的splitter)默认为英文逗号。
数据集成特殊数据类型:combine
类型 | 是否支持 | 说明 |
---|---|---|
Combine | 支持 | 数据集成自定义类型。 如果type配置为 |
数据类型转换
转换后的类型分类 | MongoDB数据类型 |
---|---|
LONG | INT、LONG、document.INT和document.LONG |
DOUBLE | DOUBLE和document.DOUBLE |
STRING | STRING、ARRAY、document.STRING、document.ARRAY和COMBINE |
DATE | DATE和document.DATE |
BOOLEAN | BOOL和document.BOOL |
BYTES | BYTES和document.BYTES |
数据类型示例1:Combine类型使用示例
doc1: a b x_1 x_2
doc2: a b x_2 x_3 x_4
doc3: a b x_5
"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]
最终导出的MaxCompute结果如下所示。odps_column1 | odps_column2 | odps_column3 |
---|---|---|
a | b | {x_1,x_2} |
a | b | {x_2,x_3,x_4} |
a | b | {x_5} |
使用COMBINE类型合并MongoDB Document中的多个字段后,输出结果映射至MaxCompute时会自动删除公共字段,仅保留Document的特有字段。
例如,a、b为所有Document均有的公共字段,Document文件doc1: a b x_1 x_2
使用COMBINE类型合并字段后,输出结果本应该为{a,b,x_1,x_2},该结果映射至MaxCompute后,会删除公共字段a和b,最终输出的结果为{x_1,x_2}。
数据类型示例2:递归解析处理多层嵌套的Document
- MongoDB源端数据为:
{ "name": "name1", "a": { "b": { "c": "this is value" } } }
- MongoDB列可配置为:
{"name":"_id","type":"string"} {"name":"name","type":"string"} {"name":"a.b.c","type":"document"}
this is value
。全量参数说明
参数 | 描述 |
---|---|
datasource | 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。 |
collectionName | MonogoDB的集合名。 |
hint | MongoDB支持hint参数,使查询优化器使用特定索引来完成查询,在某些情况下,可以提高查询性能。详情请参见hint参数。示例如下:
|
column | MongoDB的文档列名,配置为数组形式表示MongoDB的多个列。
|
batchSize | 批量获取的记录数,该参数为选填参数。默认值为1000 条。 |
cursorTimeoutInMs | 游标超时时间,该参数为选填参数。默认值为1000 * 60 * 10 = 600000 。如果cursorTimeoutInMs配置为负值,则表示游标永不超时。说明
|
query | 您可以通过该配置型来限制返回MongoDB数据范围,仅支持以下时间格式,不支持直接使用时间戳类型的格式。 说明 query不支持JS语法。
说明 更多MongoDB的查询语法请参见MongoDB官方文档。 |
splitFactor | 如果存在比较严重的数据倾斜,可以考虑增加splitFactor,实现更小粒度的切分,无需增加并发数。 |
向导开发介绍
打开新建的数据同步节点,即可进行同步任务的配置,详情请参见通过向导模式配置离线同步任务。
- 选择数据源。 配置同步任务的数据来源和数据去向。
参数 描述 数据源 即上述参数说明中的datasource,通常输入您配置的数据源名称。 集合名称 即上述参数说明中的collectionName。 批量条数 从MongoDB批量获取的数据条数,默认值为1000。 游标超时时间 游标超时时间,默认值是3600000毫秒,如果配置为负数,则永不超时。 检索查询条件 即上述参数说明中的query。您可以使用调度参数来指定同步源表的数据范围,当同步任务运行时,任务中配置的参数都会被替换为调度参数表达式所表达的实际值,然后再执行数据同步。离线同步增量同步实现方式请参见数据集成使用调度参数的相关说明。 说明 非时间类型增量字段同步:可以通过赋值节点将字段处理为目标数据类型后,再传入数据集成进行数据同步。例如,当MongoDB存储的增量字段为时间戳,您可以通过赋值节点将时间类型字段通过引擎函数转换为时间戳,再传给离线同步任务使用,关于赋值节点的使用详情请参见:赋值节点。 - 字段映射,即上述参数说明中的column。默认使用同行映射。您可以单击
图标手动编辑源表字段。
- 通道控制。
参数 描述 任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。 说明 当前任务期望最大并发数仅支持配置为1。同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。 错误记录数 错误记录数,表示脏数据的最大容忍条数。 分布式处理能力 数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组和新增和使用独享数据集成资源组。
脚本开发介绍
使用脚本模式开发的详情请参见通过脚本模式配置离线同步任务。
- 实际运行时,请删除下述代码中的注释。
- 暂时不支持取出array中的指定元素。
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"category": "reader",
"name": "Reader",
"parameter": {
"datasource": "datasourceName", //数据源名称。
"collectionName": "tag_data", //集合名称。
"query": "", // 数据查询过滤。
"column": [
{
"name": "unique_id", //字段名称。
"type": "string" //字段类型。
},
{
"name": "sid",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "auction_id",
"type": "string"
},
{
"name": "content_type",
"type": "string"
},
{
"name": "pool_type",
"type": "string"
},
{
"name": "frontcat_id",
"type": "array",
"splitter": ""
},
{
"name": "categoryid",
"type": "array",
"splitter": ""
},
{
"name": "gmt_create",
"type": "string"
},
{
"name": "taglist",
"type": "array",
"splitter": " "
},
{
"name": "property",
"type": "string"
},
{
"name": "scorea",
"type": "int"
},
{
"name": "scoreb",
"type": "int"
},
{
"name": "scorec",
"type": "int"
},
{
"name": "a.b",
"type": "document.int"
},
{
"name": "a.b.c",
"type": "document.array",
"splitter": " "
}
]
},
"stepType": "mongodb"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1 //作业并发数。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
常见问题
Reader是否大小写敏感?
Column.name
为大小写敏感,如配置有误,会导致读出数据为null。例如:- MongoDB源数据为:
{ "MY_NAME": "zhangsan" }
- 同步任务的Column配置为:
{ "column": [ { "name": "my_name" } ] }
怎么配置Reader超时时长?
超时时长的配置参数为cursorTimeoutInMs
,默认为600000s(10分钟),参数含义为MongoDB Server执行Query总耗时,不包含数据传输时长。若全量读取的数据较大,可能导致报错:MongoDBReader$Task - operation exceeded time limitcom.mongodb.MongoExecutionTimeoutException: operation exceeded time limit
。
报错:no such cmd splitVector
- 可能原因:
在同步任务运行时,默认优先使用
splitVector
命令进行任务分片,在部分MongoDB版本中,不支持splitVector
命令,进而会导致报错no such cmd splitVector
。 - 解决方案:
- 进入同步任务配置页面后,单击顶部的转换脚本
按钮。将任务修改为脚本模式。
- 在MongoDB的parameter配置中,增加参数
以避免使用"useSplitVector" : false
splitVector
。
- 进入同步任务配置页面后,单击顶部的转换脚本