Elasticsearch数据源为您提供读取和写入Elasticsearch双向通道的功能,本文为您介绍DataWorks的Elasticsearch数据同步的能力支持情况。
背景信息
Elasticsearch在公共资源组上支持Elasticsearch 5.x版本,在独享数据集成资源组和新版资源组(通用型资源组)上支持Elasticsearch 5.x、6.x、7.x和8.x版本。
独享数据集成资源组的详情请参见新增和使用独享数据集成资源组。
新版资源组的详情请参见新增和使用新版资源组。
Elasticsearch是遵从Apache开源条款的一款开源产品,是当前主流的企业级搜索引擎。Elasticsearch是一个基于Lucene的搜索和数据分析工具,它提供分布式服务。Elasticsearch核心概念同数据库核心概念的对应关系如下所示。
Relational DB(实例)-> Databases(数据库)-> Tables(表)-> Rows(一行数据)-> Columns(一行数据的一列)
Elasticsearch -> Index -> Types -> Documents -> Fields
Elasticsearch中可以有多个索引或数据库,每个索引可以包括多个类型或表,每个类型可以包括多个文档或行,每个文档可以包括多个字段或列。Elasticsearch Writer插件使用Elasticsearch的Rest API接口,批量把从Reader读入的数据写入Elasticsearch中。
支持的版本
DataWorks平台目前仅支持配置阿里云Elasticsearch 5.x、6.x、7.x和8.x版本数据源,不支持配置自建Elasticsearch数据源。
使用限制
离线读写
Elasticsearch Reader会获取Server端shard信息用于数据同步,需要确保在任务同步中Server端的shards处于存活状态,否则会存在数据不一致风险。
如果您使用的是6.x及以上版本,仅支持使用独享数据集成资源组或使用新版资源组。
不支持同步scaled_float类型的字段。
不支持同步字段中带有关键字
$ref
的索引。
支持的字段类型
类型 | 离线读(Elasticsearch Reader) | 离线写(Elasticsearch Writer) | 实时写 |
binary | 支持 | 支持 | 支持 |
boolean | 支持 | 支持 | 支持 |
keyword | 支持 | 支持 | 支持 |
constant_keyword | 不支持 | 不支持 | 不支持 |
wildcard | 不支持 | 不支持 | 不支持 |
long | 支持 | 支持 | 支持 |
integer | 支持 | 支持 | 支持 |
short | 支持 | 支持 | 支持 |
byte | 支持 | 支持 | 支持 |
double | 支持 | 支持 | 支持 |
float | 支持 | 支持 | 支持 |
half_float | 不支持 | 不支持 | 不支持 |
scaled_float | 不支持 | 不支持 | 不支持 |
unsigned_long | 不支持 | 不支持 | 不支持 |
date | 支持 | 支持 | 支持 |
date_nanos | 不支持 | 不支持 | 不支持 |
alias | 不支持 | 不支持 | 不支持 |
object | 支持 | 支持 | 支持 |
flattened | 不支持 | 不支持 | 不支持 |
nested | 支持 | 支持 | 支持 |
join | 不支持 | 不支持 | 不支持 |
integer_range | 支持 | 支持 | 支持 |
float_range | 支持 | 支持 | 支持 |
long_range | 支持 | 支持 | 支持 |
double_range | 支持 | 支持 | 支持 |
date_range | 支持 | 支持 | 支持 |
ip_range | 不支持 | 支持 | 支持 |
ip | 支持 | 支持 | 支持 |
version | 支持 | 支持 | 支持 |
murmur3 | 不支持 | 不支持 | 不支持 |
aggregate_metric_double | 不支持 | 不支持 | 不支持 |
histogram | 不支持 | 不支持 | 不支持 |
text | 支持 | 支持 | 支持 |
annotated-text | 不支持 | 不支持 | 不支持 |
completion | 支持 | 不支持 | 不支持 |
search_as_you_type | 不支持 | 不支持 | 不支持 |
token_count | 支持 | 不支持 | 不支持 |
dense_vector | 不支持 | 不支持 | 不支持 |
rank_feature | 不支持 | 不支持 | 不支持 |
rank_features | 不支持 | 不支持 | 不支持 |
geo_point | 支持 | 支持 | 支持 |
geo_shape | 支持 | 支持 | 支持 |
point | 不支持 | 不支持 | 不支持 |
shape | 不支持 | 不支持 | 不支持 |
percolator | 不支持 | 不支持 | 不支持 |
string | 支持 | 支持 | 支持 |
工作原理
Elasticsearch Reader的工作原理如下:
通过Elasticsearch的_searchscrollslice(即游标分片)方式实现,slice结合数据集成任务的task多线程分片机制使用。
根据Elasticsearch中的Mapping配置,转换数据类型。
更多详情请参见Elasticsearch官方文档。
Elasticsearch Reader会获取Server端shard信息用于数据同步,需要确保在任务同步中Server端的shards处于存活状态,否则会存在数据不一致风险。
基本配置
实际运行时,请删除下述代码中的注释。
{
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
},
"setting":{
"errorLimit":{
"record":"0" //错误记录数。
},
"jvmOption":"",
"speed":{
"concurrent":3,//并发数
"throttle":true,//
"mbps":"12",//限流,此处1mbps = 1MB/s。
}
},
"steps":[
{
"category":"reader",
"name":"Reader",
"parameter":{
"column":[ //读取列。
"id",
"name"
],
"endpoint":"", //服务地址。
"index":"", //索引。
"password":"", //密码。
"scroll":"", //scroll标志。
"search":"", //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
"type":"default",
"username":"" //用户名。
},
"stepType":"elasticsearch"
},
{
"stepType": "elasticsearch",
"parameter": {
"column": [ //写入列
{
"name": "id",
"type": "integer"
},
{
"name": "name",
"type": "text"
}
],
"index": "test", //写入索引
"indexType": "", //写入索引类型,es7不填
"actionType": "index", //写入方式
"cleanup": false, //是否重建索引
"datasource": "test", //数据源名称
"primaryKeyInfo": { //主键取值方式
"fieldDelimiterOrigin": ",",
"column": [
"id"
],
"type": "specific",
"fieldDelimiter": ","
},
"dynamic": false, //动态映射
"batchSize": 1024 //批量写文档数
},
"name": "Writer",
"category": "writer"
}
],
"type":"job",
"version":"2.0" //版本号。
}
高级功能
支持全量拉取
支持将Elasticsearch中一个文档的所有内容拉取为一个字段。配置详情请参见场景一:全量拉取。
支持提取半结构化到结构化数据
分类
描述
相关文档
产生背景
Elasticsearch中的数据特征为字段不固定,且有中文名、数据使用深层嵌套的形式。为更好地方便下游业务对数据的计算和存储需求,特推出从半结构化到结构化的转换解决方案。
—
实现原理
将Elasticsearch获取到的JSON数据,利用JSON工具的路径获取特性,将嵌套数据扁平化为一维结构的数据。然后将数据映射至结构化数据表中,拆分Elasticsearch复合结构数据至多个结构化数据表。
—
解决方案
JSON有嵌套的情况,通过path路径来解决。
属性
属性.子属性
属性[0].子属性
附属信息有一对多的情况,需要进行拆表拆行处理,进行遍历。
属性[*].子属性
数组归并,一个字符串数组内容,归并为一个属性,并进行去重。
属性[]
多属性合一,将多个属性合并为一个属性。
属性1,属性2
多属性选择处理。
属性1|属性2
数据同步任务开发
Elasticsearch数据同步任务的配置入口和通用配置流程指导可参见下文的配置指导,详细的配置参数解释可在配置界面查看对应参数的文案提示。
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源。
单表离线同步任务配置指导
操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。
脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。
单表实时写同步任务配置指导
操作流程请参见DataStudio侧实时同步任务配置。
整库离线写、单表/整库全增量实时写同步任务配置指导
操作流程请参见数据集成侧同步任务配置。
附录:脚本Demo与参数说明
附录:离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。
Reader脚本Demo
{
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
},
"setting":{
"errorLimit":{
"record":"0" //错误记录数。
},
"jvmOption":"",
"speed":{
"concurrent":3,
"throttle":false
}
},
"steps":[
{
"category":"reader",
"name":"Reader",
"parameter":{
"column":[ //读取列。
"id",
"name"
],
"endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200", //服务地址。
"index":"aliyun_es_xx", //索引。
"password":"*******", //密码。
"multiThread":true,
"scroll":"5m", //scroll标志。
"pageSize":5000,
"connTimeOut":600000,
"readTimeOut":600000,
"retryCount":30,
"retrySleepTime":"10000",
"search":{
"range":{
"gmt_modified":{
"gte":0
}
}
}, //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
"type":"doc",
"username":"aliyun_di" //用户名。
},
"stepType":"elasticsearch"
},
{
"category":"writer",
"name":"Writer",
"parameter":{ },
"stepType":"stream"
}
],
"type":"job",
"version":"2.0" //版本号。
}
Reader脚本参数
参数 | 描述 | 是否必选 | 默认值 |
datasource | 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 |
index | Elasticsearch中的index名。 | 是 | 无 |
type | Elasticsearch中index的type名。 | 否 | index名 |
search | Elasticsearch的query参数。 | 是 | 无 |
pageSize | 每次读取数据的条数。 | 否 | 100 |
scroll | Elasticsearch的分页参数,设置游标存放时间。
| 是 | 无 |
strictMode | 以严格模式读取Elasticsearch中的数据,当出现Elasticsearch的shard.failed时会停止读取,避免读取少数据。 | 否 | true |
sort | 返回结果的排序字段。 | 否 | 无 |
retryCount | 失败后重试的次数。 | 否 | 300 |
connTimeOut | 客户端连接超时时间。 | 否 | 600,000 |
readTimeOut | 客户端读取超时时间。 | 否 | 600,000 |
multiThread | http请求,是否有多线程。 | 否 | true |
preemptiveAuth | http是否使用抢先模式请求 | 否 | false |
retrySleepTime | 失败后重试的时间间隔。 | 否 | 1000 |
discovery | 是否开启节点发现。
| 否 | false |
compression | 是否使用GZIP压缩请求正文,使用时需要在es节点上启用http.compression设置。 | 否 | false |
dateFormat | 待同步字段存在date类型,且该字段mapping没有format配置时,需要配置dateFormat参数。配置形式如下: | 否 | 无 |
full | 是否将全文档内容作为一个字段同步至目标端,将Elasticsearch的查询数据作为一个字段,配置详情请参见场景一:全量拉取。 | 否 | 无 |
multi | 该配置是一个高级功能具有五种用法,两个子属性分别为 | 否 | 无 |
Writer脚本Demo
{
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"steps": [
{
"category": "reader",
"name": "Reader",
"parameter": {
},
"stepType": "stream"
},
{
"category": "writer",
"name": "Writer",
"parameter": {
"datasource":"xxx",
"index": "test-1",
"type": "default",
"cleanup": true,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"discovery": false,
"primaryKeyInfo":{
"type":"pk",
"fieldDelimiter":",",
"column":[]
},
"batchSize": 1000,
"dynamic":false,
"esPartitionColumn":[
{
"name":"col1",
"comment":"xx",
"type":"STRING"
}
],
"column": [
{
"name": "pk",
"type": "id"
},
{
"name": "col_ip",
"type": "ip"
},
{
"name": "col_array",
"type": "long",
"array": true,
},
{
"name": "col_double",
"type": "double"
},
{
"name": "col_long",
"type": "long"
},
{
"name": "col_integer",
"type": "integer"
{
"name": "col_keyword",
"type": "keyword"
},
{
"name": "col_text",
"type": "text",
"analyzer": "ik_max_word",
"other_params":
{
"doc_values": false
},
},
{
"name": "col_geo_point",
"type": "geo_point"
},
{
"name": "col_date",
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
{
"name": "col_nested1",
"type": "nested"
},
{
"name": "col_nested2",
"type": "nested"
},
{
"name": "col_object1",
"type": "object"
},
{
"name": "col_object2",
"type": "object"
},
{
"name": "col_integer_array",
"type": "integer",
"array": true
},
{
"name": "col_geo_shape",
"type": "geo_shape",
"tree": "quadtree",
"precision": "10m"
}
]
},
"stepType": "elasticsearch"
}
],
"type": "job",
"version": "2.0"
}
VPC环境的Elasticsearch运行在默认资源组会存在网络不通的情况。您需要使用独享数据集成资源组,才能连通VPC进行数据同步。添加资源的详情请参见独享数据集成资源组。
Writer脚本参数
参数 | 描述 | 是否必选 | 默认值 |
datasource | 选择需要同步的Elasticsearch数据源,若还未在DataWorks创建该数据源,请先创建,详情请参见配置Elasticsearch数据源。 | 是 | 无 |
index | Elasticsearch中的index名。 | 是 | 无 |
indexType | Elasticsearch中index的type名。 | 否 | Elasticsearch |
cleanup | 定义当前任务在索引index已存在的情况是否要删除数据。
| 否 | false |
batchSize | 定义同步任务一次性插入ElasticSearch的Document条数。 | 否 | 1,000 |
trySize | 定义往ElasticSearch写入数据失败后的重试次数。 | 否 | 30 |
timeout | 客户端超时时间。 | 否 | 600,000 |
discovery | 任务是否启动节点发现功能。
| 否 | false |
compression | HTTP请求,开启压缩。 | 否 | true |
multiThread | HTTP请求,是否有多线程。 | 否 | true |
ignoreWriteError | 忽略写入错误,不重试,继续写入。 | 否 | false |
ignoreParseError | 忽略解析数据格式错误,继续写入。 | 否 | true |
alias | Elasticsearch的别名类似于数据库的视图机制,为索引my_index创建一个别名my_index_alias,对my_index_alias的操作与my_index的操作一致。 配置alias表示在数据导入完成后,为指定的索引创建别名。 | 否 | 无 |
aliasMode | 数据导入完成后增加别名的模式,包括append(增加模式)和exclusive(只留这一个):
后续会转换别名为实际的索引名称,别名可以用来进行索引迁移和多个索引的查询统一,并可以用来实现视图的功能。 | 否 | append |
settings | 创建index时的settings,与Elasticsearch官方一致。 | 否 | 无 |
column | column用来配置文档的多个字段Filed信息,具体每个字段项可以配置name(名称)、type(类型)等基础配置,以及Analyzer、Format和Array等扩展配置。 Elasticsearch所支持的字段类型如下所示。
列类型的说明如下:
如果需要在column中配置除了type以外的属性值,您可以使用other_params参数,该参数配置在column中,在update mappings时,用于描述column中除了type以外的Elasticsearch属性信息。
如果您希望源端数据写入为Elasticsearch时按照数组类型写入,您可按照JSON格式或指定分隔符的方式来解析源端数据。配置详情请参见附录:ElasticSearch写入的格式期望是数组类型。 | 是 | 无 |
dynamic | 定义当在文档中发现未存在的字段时,同步任务是否通过Elasticsearch动态映射机制为字段添加映射。
Elasticsearch 7.x版本的默认type为_doc。使用Elasticsearch的自动mappings时,请配置_doc和esVersion为7。 您需要转换为脚本模式,添加一个版本参数: | 否 | false |
actionType | 表示Elasticsearch在数据写出时的action类型,目前数据集成支持index和update两种actionType,默认值为index:
| 否 | index |
primaryKeyInfo | 定义当前写入ElasticSearch的主键取值方式。
| 是 | specific |
esPartitionColumn | 定义写入ElasticSearch时是否开启分区,用于修改ElasticSearch中的routing的参数。
| 否 | false |
enableWriteNull | 该参数用于是否支持将来源端的空值字段同步至Elasticsearch。取值如下:
| 否 | true |
场景一:全量拉取
背景说明:将Elasticsearch中文档查询的结果拉取为一个字段。
配置示例:
## 读端:Elasticsearch中的原始数据 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "IXgdO4MB4GR_1DmrjTXP", "_score": 1.0, "_source": { "feature1": "value1", "feature2": "value2", "feature3": "value3" } }] ##数据集成Elasticsearch Reader插件配置 "parameter": { "column": [ "content" ], "full":true } ##写端结果:同步至目标端1行1列 {"_index":"mutiltest_1","_type":"_doc","_id":"IXgdO4MB4GR_1DmrjTXP","_source":{"feature1":"value1","feature2":"value2","feature3":"value3"},"sort":["IXgdO4MB4GR_1DmrjTXP"]}
场景二:嵌套或对象字段属性同步
背景说明:Object对象或nested嵌套字段的属性时,通过path路径来解决。
配置形式:
属性
属性.子属性
属性[0].子属性
脚本配置:
"multi":{ "multi":true }
说明向导模式暂不支持配置。
配置示例:
## 读端:Elasticsearch中的原始数据 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "7XAOOoMB4GR_1Dmrrust", "_score": 1.0, "_source": { "level1": { "level2": [ { "level3": "testlevel3_1" }, { "level3": "testlevel3_2" } ] } } } ] ##数据集成Elasticsearch reader插件配置 "parameter": { "column": [ "level1", "level1.level2", "level1.level2[0]", "level1.level2.level3" ], "multi":{ "multi":true } } ##写端结果:1行数据4列 column1(level1): {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]} column2(level1.level2): [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}] column3(level1.level2[0]): {"level3":"testlevel3_1"} column4(level1.level2.level3): null
说明获取的节点上层有数组时结果为null,如上样例获取level1.level2.level3不会报错,同步结果为null,需要配置为level1.level2[0].level3或level1.level2[1].level3,当前不支持level1.level2[*].level3。
不支持key出现"."的数据, 如"level1.level2":{"level3":"testlevel3_1"}, 此时该条数据获取结果为null。
场景三:数组属性拆分为多行
背景说明:附属信息有一对多的情况,需要将数组列拆成多行。
配置形式:属性[*].子属性
效果示意:源端数据{ "splitKey" :[1,2,3,4,5]},拆完后写到目标端为5行:{"splitKey[0]":1,"splitKey[1]":2,"splitKey[2]":3,"splitKey[3]":4,"splitKey[4]":5}
脚本配置:
"multi":{ "multi":true, "key": "headers" }
说明向导模式下配置拆多行数组列名,会自动生成脚本配置,具有相同效果。
value必须为List,否则会报错。
配置示例:
## 读端:Elasticsearch中的原始数据 [ { "_index": "lmtestjson", "_type": "_doc", "_id": "nhxmIYMBKDL4VkVLyXRN", "_score": 1.0, "_source": { "headers": [ { "remoteip": "192.0.2.1" }, { "remoteip": "192.0.2.2" } ] } }, { "_index": "lmtestjson", "_type": "_doc", "_id": "wRxsIYMBKDL4VkVLcXqf", "_score": 1.0, "_source": { "headers": [ { "remoteip": "192.0.2.3" }, { "remoteip": "192.0.2.4" } ] } } ] ##数据集成Elasticsearch reader插件配置 { "column":[ "headers[*].remoteip" ] "multi":{ "multi":true, "key": "headers" } } ##写端结果:4行 192.0.2.1 192.0.2.2 192.0.2.3 192.0.2.4
场景四:数组属性去重归并
背景说明:数组去重归并,将一个数组属性去重归并后写入为字符串属性,数组属性可以为子属性如name1.name2,去重采用tostring结果作为标准。
配置形式:属性[]。
column里面带有 [] 关键字就会认为对该属性做去重归并。
脚本配置:
"multi":{ "multi":true }
说明向导模式暂不支持配置。
配置示例:
## 读端:Elasticsearch中的原始数据 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "4nbUOoMB4GR_1Dmryj8O", "_score": 1.0, "_source": { "feature1": [ "value1", "value1", "value2", "value2", "value3" ] } } ] ##数据集成Elasticsearch reader插件配置 "parameter": { "column":[ "feature1[]" ], "multi":{ "multi":true } } ##写端结果:1行1列数据 "value1,value2,value3"
场景五:多属性合一同步
背景说明:多属性选择处理,返回第一个有值的属性,都不存在时将写入null。
配置形式:属性1|属性2|...
column里面带有 "|"关键字就会对该项做多属性选择。
脚本配置:
"multi":{ "multi":true }
说明向导模式暂不支持该配置。
配置示例:
##读端:Elasticsearch中的原始数据 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "v3ShOoMB4GR_1DmrZN22", "_score": 1.0, "_source": { "feature1": "feature1", "feature2": [ 1, 2, 3 ], "feature3": { "child": "feature3" } } }] ##数据集成Elasticsearch reade插件配置 "parameter": { "column":[ "feature1|feature2|feature3" ], "multi":{ "multi":true } } ##写端结果:1行1列数据 "feature1"
场景六:多属性选择同步
背景说明:多属性选择处理 ,返回第一个有值的属性,都不存在时写入null。
配置形式:属性1|属性2|...
column里面带有 "|"关键字就会对该项做多属性选择
脚本配置:
"multi":{ "multi":true }
说明向导模式暂不支持该配置。
配置示例:
##读端:Elasticsearch中的原始数据 "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "v3ShOoMB4GR_1DmrZN22", "_score": 1.0, "_source": { "feature1": "feature1", "feature2": [ 1, 2, 3 ], "feature3": { "child": "feature3" } } }] ##数据集成Elasticsearch reader插件配置 "parameter": { "column":[ "feature1,feature2,feature3" ], "multi":{ "multi":true } } ##写端结果:1行1列数据 "feature1,[1,2,3],{"child":"feature3"}"
附录:ElasticSearch写入的格式期望是数组类型
支持以下两种方式将源端数据按照数组类型写入ElasticSearch。
按JSON格式解析源端数据
例如:源端数据为
"[1,2,3,4,5]"
,配置json_array=true对其进行解析,同步将以数组格式写入ElasticSearch。"parameter" : { { "name":"docs_1", "type":"keyword", "json_array":true } }
按分隔符解析源端数据
例如:源端数据为
"1,2,3,4,5"
, 配置分隔符splitter=","对其进行解析,同步将以数组格式写入ElasticSearch。说明一个任务仅支持配置一种分隔符,splitter全局唯一,不支持多array字段配置为不同的分隔符。例如源端字段列col1="1,2,3,4,5" , col2="6-7-8-9-10", splitter无法针对每列单独配置使用。
"parameter" : { "column": [ { "name": "docs_2", "array": true, "type": "long" } ], "splitter":","//注意:splitter配置与column配置同级。 }
相关文档
数据集成支持其他更多数据源接入,更多信息,请参见支持的数据源及同步方案。