本文为您介绍Elasticsearch Reader的工作原理、功能和参数。
使用限制
- 不支持同步scaled_float类型的字段。
- 不支持同步字段中带有关键字
$ref
的索引。
支持的Elasticsearch版本
DataWorks平台目前仅支持配置阿里云Elasticsearch5.x、6.x、7.x版本数据源,不支持配置自建Elasticsearch数据源。
重要 如果您使用的是6.x及以上版本,仅支持使用独享数据集成资源组。
支持的字段类型
类型 | 离线读(Elasticsearch Reader) | 离线写(Elasticsearch Writer) | 实时写 |
---|---|---|---|
binary | 支持 | 支持 | 支持 |
boolean | 支持 | 支持 | 支持 |
keyword | 支持 | 支持 | 支持 |
constant_keyword | 不支持 | 不支持 | 不支持 |
wildcard | 不支持 | 不支持 | 不支持 |
long | 支持 | 支持 | 支持 |
integer | 支持 | 支持 | 支持 |
short | 支持 | 支持 | 支持 |
byte | 支持 | 支持 | 支持 |
double | 支持 | 支持 | 支持 |
float | 支持 | 支持 | 支持 |
half_float | 不支持 | 不支持 | 不支持 |
scaled_float | 不支持 | 不支持 | 不支持 |
unsigned_long | 不支持 | 不支持 | 不支持 |
date | 支持 | 支持 | 支持 |
date_nanos | 不支持 | 不支持 | 不支持 |
alias | 不支持 | 不支持 | 不支持 |
object | 支持 | 支持 | 支持 |
flattened | 不支持 | 不支持 | 不支持 |
nested | 支持 | 支持 | 支持 |
join | 不支持 | 不支持 | 不支持 |
integer_range | 支持 | 支持 | 支持 |
float_range | 支持 | 支持 | 支持 |
long_range | 支持 | 支持 | 支持 |
double_range | 支持 | 支持 | 支持 |
date_range | 支持 | 支持 | 支持 |
ip_range | 不支持 | 支持 | 支持 |
ip | 支持 | 支持 | 支持 |
version | 支持 | 支持 | 支持 |
murmur3 | 不支持 | 不支持 | 不支持 |
aggregate_metric_double | 不支持 | 不支持 | 不支持 |
histogram | 不支持 | 不支持 | 不支持 |
text | 支持 | 支持 | 支持 |
annotated-text | 不支持 | 不支持 | 不支持 |
completion | 支持 | 不支持 | 不支持 |
search_as_you_type | 不支持 | 不支持 | 不支持 |
token_count | 支持 | 不支持 | 不支持 |
dense_vector | 不支持 | 不支持 | 不支持 |
rank_feature | 不支持 | 不支持 | 不支持 |
rank_features | 不支持 | 不支持 | 不支持 |
geo_point | 支持 | 支持 | 支持 |
geo_shape | 支持 | 支持 | 支持 |
point | 不支持 | 不支持 | 不支持 |
shape | 不支持 | 不支持 | 不支持 |
percolator | 不支持 | 不支持 | 不支持 |
string | 支持 | 支持 | 支持 |
工作原理
Elasticsearch在公共资源组上支持Elasticsearch5.x版本,在独享数据集成资源组上支持Elasticsearch5.x、6.x和7.x版本。独享数据集成资源组的详情请参见新增和使用独享数据集成资源组。
Elasticsearch Reader的工作原理如下:
- 通过Elasticsearch的_searchscrollslice(即游标分片)方式实现,slice结合数据集成任务的task多线程分片机制使用。
- 根据Elasticsearch中的Mapping配置,转换数据类型。
更多详情请参见Elasticsearch官方文档。
说明 Elasticsearch Reader会获取Server端shard信息用于数据同步,需要确保在任务同步中Server端的shards处于存活状态,否则会存在数据不一致风险。
基本配置
重要 实际运行时,请删除下述代码中的注释。
{
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
},
"setting":{
"errorLimit":{
"record":"0" //错误记录数。
},
"jvmOption":"",
"speed":{
"concurrent":3,//并发数
"throttle":true,//
"mbps":"12",//限流
}
},
"steps":[
{
"category":"reader",
"name":"Reader",
"parameter":{
"column":[ //读取列。
"id",
"name"
],
"endpoint":"", //服务地址。
"index":"", //索引。
"password":"", //密码。
"scroll":"", //scroll标志。
"search":"", //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
"type":"default",
"username":"" //用户名。
},
"stepType":"elasticsearch"
},
{
"category":"writer",
"name":"Writer",
"parameter":{ },
"stepType":"stream"
}
],
"type":"job",
"version":"2.0" //版本号。
}
高级功能
- 支持全量拉取
支持将Elasticsearch中一个文档的所有内容拉取为一个字段。配置详情请参见场景一:全量拉取。
- 支持提取半结构化到结构化数据
分类 描述 相关文档 产生背景 Elasticsearch中的数据特征为字段不固定,且有中文名、数据使用深层嵌套的形式。为更好地方便下游业务对数据的计算和存储需求,特推出从半结构化到结构化的转换解决方案。 — 实现原理 将Elasticsearch获取到的JSON数据,利用JSON工具的路径获取特性,将嵌套数据扁平化为一维结构的数据。然后将数据映射至结构化数据表中,拆分Elasticsearch复合结构数据至多个结构化数据表。 — 解决方案 JSON有嵌套的情况,通过path路径来解决。 - 属性
- 属性.子属性
- 属性[0].子属性
场景二:嵌套或对象字段属性同步 附属信息有一对多的情况,需要进行拆表拆行处理,进行遍历。 属性[*].子属性
场景三:数组属性拆分为多行 数组归并,一个字符串数组内容,归并为一个属性,并进行去重。 属性[]
场景四:数组属性去重归并 多属性合一,将多个属性合并为一个属性。 属性1,属性2
场景五:多属性合一同步 多属性选择处理。 属性1|属性2
场景六:多属性选择同步
参数说明
参数 | 描述 | 是否必选 | 默认值 |
---|---|---|---|
datasource | 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 |
index | Elasticsearch中的index名。 | 是 | 无 |
type | Elasticsearch中index的type名。 | 否 | index名 |
search | Elasticsearch的query参数。 | 是 | 无 |
pageSize | 每次读取数据的条数。 | 否 | 100 |
scroll | Elasticsearch的分页参数,设置游标存放时间。
| 是 | 无 |
sort | 返回结果的排序字段。 | 否 | 无 |
retryCount | 失败后重试的次数。 | 否 | 300 |
connTimeOut | 客户端连接超时时间。 | 否 | 600,000 |
readTimeOut | 客户端读取超时时间。 | 否 | 600,000 |
multiThread | http请求,是否有多线程。 | 否 | true |
preemptiveAuth | http是否使用抢先模式请求 | 否 | false |
retrySleepTime | 失败后重试的时间间隔。 | 否 | 1000 |
discovery | 是否开启节点发现。
| 否 | false |
compression | 是否使用GZIP压缩请求正文,使用时需要在es节点上启用http.compression设置。 | 否 | false |
dateFormat | 待同步字段存在date类型,且该字段mapping没有format配置时,需要配置dateFormat参数。配置形式如下: "dateFormat" : "yyyy-MM-dd||yyyy-MM-dd HH:mm:ss" ,该配置需要包含同步date类型字段的所有格式。 | 否 | 无 |
full | 是否将全文档内容作为一个字段同步至目标端,将Elasticsearch的查询数据作为一个字段,配置详情请参见场景一:全量拉取。 | 否 | 无 |
multi | 该配置是一个高级功能具有五种用法,两个子属性分别为multi.key 和multi.mult ,配置详情请参见高级功能中表格内容。 | 否 | 无 |
向导开发介绍
打开新建的数据同步节点,即可进行同步任务的配置,详情请参见通过向导模式配置离线同步任务。
您需要在数据同步任务的编辑页面进行以下配置:
- 选择数据源。 配置同步任务的数据来源和数据去向。
参数 描述 数据源 通常填写您配置的数据源名称。 索引 Elasticsearch中的index名。 检索查询条件 Elasticsearch的query参数。 分页大小 每次读取数据的条数,默认为100。 游标时间 分页参数,设置游标存放时间。 高级配置 高级配置包括以下内容: - 排序方式:返回结果的排序字段。
- 全文作为一列:是否将Elasticsearch的数据拉取为一个字段。
例如,Elasticsearch Reader需要读取Elasticsearch的所有数据作为一列同步至MaxCompute,则需要设置全文作为一列。设置Elasticsearch Reader中的column为content,content为hits[]中的一行信息_source全内容。
说明 _id属性为Elasticsearch数据的固有属性,目前无法通过数据集成同步任务单独抽取并写入目的端。您可以将Elasticsearch Reader中全文作为一列参数配置为是(即JSON脚本中full参数配置为true),将Elasticsearch中的每个数据都作为一个字段同步到目的端,然后在目的端使用get_json_object
函数或其他JSON处理函数,将_id
值单独取出来做后续处理。 - 拆多行数组列名:是否将数组进行列拆多行的处理,需要辅助设置子属性,例如:属性[*].子属性。
- 字段映射,即上述参数说明中的column。左侧的源头表字段和右侧的目标表字段为一一对应关系。单击添加一行可以增加单个字段,鼠标放至需要删除的字段上,即可单击删除图标进行删除 。说明 来源或目标端有Lindom、HBase、Tair、Elasticsearch数据源,字段无需连线,直接编辑即可保存。
参数 描述 同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。 同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。 取消映射 单击取消映射,可以取消建立的映射关系。 自动排版 可以根据相应的规律自动排版。 添加一行 单击添加一行,您可以输入以下类型的字段: - 可以配合调度参数使用,例如${bizdate}等。
- 可以输入关系数据库支持的函数,例如now()、count(1)等。
- 如果您输入的值无法解析,则类型显示为未识别。
- 通道控制。
参数 描述 任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。 同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。 错误记录数 错误记录数,表示脏数据的最大容忍条数。 分布式处理能力 数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组和新增和使用独享数据集成资源组。
脚本开发介绍
配置一个从Elasticsearch读取数据的JSON示例,使用脚本开发的详情请参见通过脚本模式配置离线同步任务。
重要 实际运行时,请删除下述代码中的注释。
{
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
},
"setting":{
"errorLimit":{
"record":"0" //错误记录数。
},
"jvmOption":"",
"speed":{
"concurrent":3,
"throttle":false
}
},
"steps":[
{
"category":"reader",
"name":"Reader",
"parameter":{
"column":[ //读取列。
"id",
"name"
],
"endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200", //服务地址。
"index":"aliyun_es_xx", //索引。
"password":"*******", //密码。
"multiThread":true,
"scroll":"5m", //scroll标志。
"pageSize":5000,
"connTimeOut":600000,
"readTimeOut":600000,
"retryCount":30,
"retrySleepTime":"10000",
"search":{
"range":{
"gmt_modified":{
"gte":0
}
}
}, //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
"type":"doc",
"username":"aliyun_di" //用户名。
},
"stepType":"elasticsearch"
},
{
"category":"writer",
"name":"Writer",
"parameter":{ },
"stepType":"stream"
}
],
"type":"job",
"version":"2.0" //版本号。
}
配置数据集成资源组
- 单击数据同步任务编辑页面右侧的数据集成资源组配置。
- 根据提示选择对应的独享数据集成资源组。说明
- (推荐)数据集成资源组配置页面默认支持选择独享数据集成资源组,为确保数据同步的稳定性和性能要求,推荐使用独享数据集成资源组。
- 如果您需要选择公共资源组,请在页面右下方单击更多选项,在弹出的警告对话框单击确认,在数据集成资源组配置子页面进行选择。关于自定义数据集成资源组和公共资源组,详情请参见公共资源组。
场景一:全量拉取
说明 向导模式暂不支持该配置。
- 背景说明:将Elasticsearch中文档查询的结果拉取为一个字段。
- 配置示例:
## 读端:Elasticsearch中的原始数据 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "IXgdO4MB4GR_1DmrjTXP", "_score": 1.0, "_source": { "feature1": "value1", "feature2": "value2", "feature3": "value3" } }] ##数据集成Elasticsearch Reader插件配置 "parameter": { "column": [ "content" ], "full":true } ##写端结果:同步至目标端1行1列 {"_index":"mutiltest_1","_type":"_doc","_id":"IXgdO4MB4GR_1DmrjTXP","_source":{"feature1":"value1","feature2":"value2","feature3":"value3"},"sort":["IXgdO4MB4GR_1DmrjTXP"]}
场景二:嵌套或对象字段属性同步
- 背景说明:Object对象或nested嵌套字段的属性时,通过path路径来解决。
- 配置形式:
- 属性
- 属性.子属性
- 属性[0].子属性
- 脚本配置:
"multi":{ "multi":true }
说明 向导模式暂不支持配置。 - 配置示例:
## 读端:Elasticsearch中的原始数据 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "7XAOOoMB4GR_1Dmrrust", "_score": 1.0, "_source": { "level1": { "level2": [ { "level3": "testlevel3_1" }, { "level3": "testlevel3_2" } ] } } } ] ##数据集成Elasticsearch reader插件配置 "parameter": { "column": [ "level1", "level1.level2", "level1.level2[0]", "level1.level2.level3" ], "multi":{ "multi":true } } ##写端结果:1行数据4列 column1(level1): {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]} column2(level1.level2): [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}] column3(level1.level2[0]): {"level3":"testlevel3_1"} column4(level1.level2.level3): null
说明- 获取的节点上层有数组时结果为null,如上样例获取level1.level2.level3不会报错,同步结果为null,需要配置为level1.level2[0].level3或level1.level2[1].level3,当前不支持level1.level2[*].level3。
- 不支持key出现"."的数据, 如"level1.level2":{"level3":"testlevel3_1"}, 此时该条数据获取结果为null。
场景三:数组属性拆分为多行
- 背景说明:附属信息有一对多的情况,需要将数组列拆成多行。
- 配置形式:属性[*].子属性
- 效果示意:源端数据{ "splitKey" :[1,2,3,4,5]},拆完后写到目标端为5行:{"splitKey[0]":1,"splitKey[1]":2,"splitKey[2]":3,"splitKey[3]":4,"splitKey[4]":5}
- 脚本配置:
"multi":{ "multi":true, "key": "headers" }
说明- 向导模式下配置拆多行数组列名,会自动生成脚本配置,具有相同效果。
- value必须为List,否则会报错。
- 配置示例:
## 读端:Elasticsearch中的原始数据
[
{
"_index": "lmtestjson",
"_type": "_doc",
"_id": "nhxmIYMBKDL4VkVLyXRN",
"_score": 1.0,
"_source": {
"headers": [
{
"remoteip": "192.0.2.1"
},
{
"remoteip": "192.0.2.2"
}
]
}
},
{
"_index": "lmtestjson",
"_type": "_doc",
"_id": "wRxsIYMBKDL4VkVLcXqf",
"_score": 1.0,
"_source": {
"headers": [
{
"remoteip": "192.0.2.3"
},
{
"remoteip": "192.0.2.4"
}
]
}
}
]
##数据集成Elasticsearch reader插件配置
{
"column":[
"headers[*].remoteip"
]
"multi":{
"multi":true,
"key": "headers"
}
}
##写端结果:4行
192.0.2.1
192.0.2.2
192.0.2.3
192.0.2.4
场景四:数组属性去重归并
- 背景说明:数组去重归并,将一个数组属性去重归并后写入为字符串属性,数组属性可以为子属性如name1.name2,去重采用tostring结果作为标准。
- 配置形式:属性[]。
column里面带有 [] 关键字就会认为对该属性做去重归并。
- 脚本配置:
"multi":{ "multi":true }
说明 向导模式暂不支持配置。 - 配置示例:
## 读端:Elasticsearch中的原始数据 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "4nbUOoMB4GR_1Dmryj8O", "_score": 1.0, "_source": { "feature1": [ "value1", "value1", "value2", "value2", "value3" ] } } ] ##数据集成Elasticsearch reader插件配置 "parameter": { "column":[ "feature1[]" ], "multi":{ "multi":true } } ##写端结果:1行1列数据 "value1,value2,value3"
场景五:多属性合一同步
- 背景说明:多属性选择处理,返回第一个有值的属性,都不存在时将写入null。
- 配置形式:属性1|属性2|...
column里面带有 "|"关键字就会对该项做多属性选择。
- 脚本配置:
"multi":{ "multi":true }
说明 向导模式暂不支持该配置。 - 配置示例:
##读端:Elasticsearch中的原始数据 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "v3ShOoMB4GR_1DmrZN22", "_score": 1.0, "_source": { "feature1": "feature1", "feature2": [ 1, 2, 3 ], "feature3": { "child": "feature3" } } }] ##数据集成Elasticsearch reade插件配置 "parameter": { "column":[ "feature1|feature2|feature3" ], "multi":{ "multi":true } } ##写端结果:1行1列数据 "feature1"
场景六:多属性选择同步
- 背景说明:多属性选择处理 ,返回第一个有值的属性,都不存在时写入null。
- 配置形式:属性1|属性2|...
column里面带有 "|"关键字就会对该项做多属性选择
- 脚本配置:
"multi":{ "multi":true }
说明 向导模式暂不支持该配置。 - 配置示例:
##读端:Elasticsearch中的原始数据 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "v3ShOoMB4GR_1DmrZN22", "_score": 1.0, "_source": { "feature1": "feature1", "feature2": [ 1, 2, 3 ], "feature3": { "child": "feature3" } } }] ##数据集成Elasticsearch reader插件配置 "parameter": { "column":[ "feature1,feature2,feature3" ], "multi":{ "multi":true } } ##写端结果:1行1列数据 "feature1,[1,2,3],{"child":"feature3"}"