本文为您介绍Elasticsearch Reader的工作原理、功能和参数。

使用限制

  • 不支持同步scaled_float类型的字段。
  • 不支持同步字段中带有关键字 $ref的索引。

支持的Elasticsearch版本

DataWorks平台目前仅支持配置阿里云Elasticsearch5.x、6.x、7.x版本数据源,不支持配置自建Elasticsearch数据源。
重要 如果您使用的是6.x及以上版本,仅支持使用独享数据集成资源组

支持的字段类型

类型离线读(Elasticsearch Reader)离线写(Elasticsearch Writer)实时写
binary支持 支持 支持
boolean支持 支持 支持
keyword支持 支持 支持
constant_keyword 不支持不支持不支持
wildcard不支持不支持不支持
long支持 支持 支持
integer支持 支持 支持
short支持 支持 支持
byte支持 支持 支持
double支持 支持 支持
float支持 支持 支持
half_float不支持不支持不支持
scaled_float不支持不支持不支持
unsigned_long不支持不支持不支持
date支持 支持 支持
date_nanos不支持不支持不支持
alias不支持不支持不支持
object支持 支持 支持
flattened不支持不支持不支持
nested支持 支持 支持
join不支持不支持不支持
integer_range支持 支持 支持
float_range支持 支持 支持
long_range支持 支持 支持
double_range支持 支持 支持
date_range支持 支持 支持
ip_range不支持支持 支持
ip支持 支持 支持
version支持 支持 支持
murmur3不支持不支持不支持
aggregate_metric_double不支持不支持不支持
histogram不支持不支持不支持
text支持 支持 支持
annotated-text不支持不支持不支持
completion支持 不支持不支持
search_as_you_type不支持不支持不支持
token_count支持 不支持不支持
dense_vector不支持不支持不支持
rank_feature不支持不支持不支持
rank_features不支持不支持不支持
geo_point支持 支持 支持
geo_shape支持 支持 支持
point不支持不支持不支持
shape不支持不支持不支持
percolator不支持不支持不支持
string支持 支持 支持

工作原理

Elasticsearch在公共资源组上支持Elasticsearch5.x版本,在独享数据集成资源组上支持Elasticsearch5.x、6.x和7.x版本。独享数据集成资源组的详情请参见新增和使用独享数据集成资源组

Elasticsearch Reader的工作原理如下:
  • 通过Elasticsearch的_searchscrollslice(即游标分片)方式实现,slice结合数据集成任务的task多线程分片机制使用。
  • 根据Elasticsearch中的Mapping配置,转换数据类型。

更多详情请参见Elasticsearch官方文档

说明 Elasticsearch Reader会获取Server端shard信息用于数据同步,需要确保在任务同步中Server端的shards处于存活状态,否则会存在数据不一致风险。

基本配置

重要 实际运行时,请删除下述代码中的注释。
{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" //错误记录数。
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,//并发数
            "throttle":true,//
                     "mbps":"12",//限流
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ //读取列。
                    "id",
                    "name"
                ],
                "endpoint":"", //服务地址。
                "index":"",  //索引。
                "password":"",  //密码。
                "scroll":"",  //scroll标志。
                "search":"",  //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
                "type":"default",
                "username":""  //用户名。
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" //版本号。
}

高级功能

  • 支持全量拉取

    支持将Elasticsearch中一个文档的所有内容拉取为一个字段。配置详情请参见场景一:全量拉取

  • 支持提取半结构化到结构化数据
    分类描述相关文档
    产生背景 Elasticsearch中的数据特征为字段不固定,且有中文名、数据使用深层嵌套的形式。为更好地方便下游业务对数据的计算和存储需求,特推出从半结构化到结构化的转换解决方案。
    实现原理 将Elasticsearch获取到的JSON数据,利用JSON工具的路径获取特性,将嵌套数据扁平化为一维结构的数据。然后将数据映射至结构化数据表中,拆分Elasticsearch复合结构数据至多个结构化数据表。
    解决方案 JSON有嵌套的情况,通过path路径来解决。
    • 属性
    • 属性.子属性
    • 属性[0].子属性
    场景二:嵌套或对象字段属性同步
    附属信息有一对多的情况,需要进行拆表拆行处理,进行遍历。

    属性[*].子属性

    场景三:数组属性拆分为多行
    数组归并,一个字符串数组内容,归并为一个属性,并进行去重。

    属性[]

    场景四:数组属性去重归并
    多属性合一,将多个属性合并为一个属性。

    属性1,属性2

    场景五:多属性合一同步
    多属性选择处理。

    属性1|属性2

    场景六:多属性选择同步

参数说明

参数描述是否必选默认值
datasource数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。
indexElasticsearch中的index名。
typeElasticsearch中indextype名。index名
searchElasticsearch的query参数。
pageSize每次读取数据的条数。100
scrollElasticsearch的分页参数,设置游标存放时间。
  • 设置的过小时,如果获取两页数据间隔时间超出scroll,会导致游标过期,进而丢失数据。
  • 设置的过大时,如果同一时刻发起的查询过多,超出服务端max_open_scroll_context配置时,会导致数据查询报错。
sort返回结果的排序字段。
retryCount失败后重试的次数。300
connTimeOut客户端连接超时时间。600,000
readTimeOut客户端读取超时时间。600,000
multiThreadhttp请求,是否有多线程。true
preemptiveAuthhttp是否使用抢先模式请求false
retrySleepTime失败后重试的时间间隔。1000
discovery是否开启节点发现。
  • true:与集群中随机一个节点进行连接。启用节点发现将轮询并定期更新客户机中的服务器列表,并对发现的节点发起查询请求。
  • false:对配置的endpoint发起查询请求。
false
compression是否使用GZIP压缩请求正文,使用时需要在es节点上启用http.compression设置。false
dateFormat待同步字段存在date类型,且该字段mapping没有format配置时,需要配置dateFormat参数。配置形式如下: "dateFormat" : "yyyy-MM-dd||yyyy-MM-dd HH:mm:ss",该配置需要包含同步date类型字段的所有格式。
full是否将全文档内容作为一个字段同步至目标端,将Elasticsearch的查询数据作为一个字段,配置详情请参见场景一:全量拉取
multi该配置是一个高级功能具有五种用法,两个子属性分别为multi.keymulti.mult,配置详情请参见高级功能中表格内容。

向导开发介绍

打开新建的数据同步节点,即可进行同步任务的配置,详情请参见通过向导模式配置离线同步任务

您需要在数据同步任务的编辑页面进行以下配置:
  1. 选择数据源。
    配置同步任务的数据来源数据去向ES
    参数描述
    数据源通常填写您配置的数据源名称。
    索引Elasticsearch中的index名。
    检索查询条件Elasticsearch的query参数。
    分页大小每次读取数据的条数,默认为100。
    游标时间分页参数,设置游标存放时间。
    高级配置高级配置包括以下内容:
    • 排序方式:返回结果的排序字段。
    • 全文作为一列:是否将Elasticsearch的数据拉取为一个字段。

      例如,Elasticsearch Reader需要读取Elasticsearch的所有数据作为一列同步至MaxCompute,则需要设置全文作为一列。设置Elasticsearch Reader中的column为contentcontenthits[]中的一行信息_source全内容。

      说明 _id属性为Elasticsearch数据的固有属性,目前无法通过数据集成同步任务单独抽取并写入目的端。您可以将Elasticsearch Reader中全文作为一列参数配置为(即JSON脚本中full参数配置为true),将Elasticsearch中的每个数据都作为一个字段同步到目的端,然后在目的端使用get_json_object函数或其他JSON处理函数,将_id值单独取出来做后续处理。
    • 拆多行数组列名:是否将数组进行列拆多行的处理,需要辅助设置子属性,例如:属性[*].子属性。
  2. 字段映射,即上述参数说明中的column
    左侧的源头表字段和右侧的目标表字段为一一对应关系。单击添加一行可以增加单个字段,鼠标放至需要删除的字段上,即可单击删除图标进行删除 。
    说明 来源或目标端有Lindom、HBase、Tair、Elasticsearch数据源,字段无需连线,直接编辑即可保存。
    字段映射
    参数描述
    同名映射单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射单击取消映射,可以取消建立的映射关系。
    自动排版可以根据相应的规律自动排版。
    添加一行单击添加一行,您可以输入以下类型的字段:
    • 可以配合调度参数使用,例如${bizdate}等。
    • 可以输入关系数据库支持的函数,例如now()count(1)等。
    • 如果您输入的值无法解析,则类型显示为未识别。
  3. 通道控制。通道配置
    参数描述
    任务期望最大并发数数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本开发介绍

配置一个从Elasticsearch读取数据的JSON示例,使用脚本开发的详情请参见通过脚本模式配置离线同步任务
重要 实际运行时,请删除下述代码中的注释。
{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" //错误记录数。
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,
            "throttle":false
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ //读取列。
                    "id",
                    "name"
                ],
                "endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200", //服务地址。
                "index":"aliyun_es_xx",  //索引。
                "password":"*******",  //密码。
                "multiThread":true,
                "scroll":"5m",  //scroll标志。
                "pageSize":5000,
                "connTimeOut":600000,
                "readTimeOut":600000,
                "retryCount":30,
                "retrySleepTime":"10000",
                "search":{
                            "range":{
                                "gmt_modified":{
                                    "gte":0
                                }
                            }
                        },  //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
                "type":"doc",
                "username":"aliyun_di"  //用户名。
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" //版本号。
}

配置数据集成资源组

  1. 单击数据同步任务编辑页面右侧的数据集成资源组配置
  2. 根据提示选择对应的独享数据集成资源组
    独享数据集成资源组-zh
    说明
    • (推荐)数据集成资源组配置页面默认支持选择独享数据集成资源组,为确保数据同步的稳定性和性能要求,推荐使用独享数据集成资源组。
    • 如果您需要选择公共资源组,请在页面右下方单击更多选项,在弹出的警告对话框单击确认,在数据集成资源组配置子页面进行选择。关于自定义数据集成资源组和公共资源组,详情请参见公共资源组

场景一:全量拉取

说明 向导模式暂不支持该配置。
  • 背景说明:将Elasticsearch中文档查询的结果拉取为一个字段。
  • 配置示例:
    
    ## 读端:Elasticsearch中的原始数据
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "IXgdO4MB4GR_1DmrjTXP",
            "_score": 1.0,
            "_source": {
                "feature1": "value1",
                "feature2": "value2",
                "feature3": "value3"
            }
        }]
    
    ##数据集成Elasticsearch Reader插件配置
    "parameter": {
      "column": [
          "content"
      ],
      "full":true
    }
    
    ##写端结果:同步至目标端1行1列
    {"_index":"mutiltest_1","_type":"_doc","_id":"IXgdO4MB4GR_1DmrjTXP","_source":{"feature1":"value1","feature2":"value2","feature3":"value3"},"sort":["IXgdO4MB4GR_1DmrjTXP"]}

场景二:嵌套或对象字段属性同步

  • 背景说明:Object对象或nested嵌套字段的属性时,通过path路径来解决。
  • 配置形式:
    • 属性
    • 属性.子属性
    • 属性[0].子属性
  • 脚本配置:
    "multi":{
        "multi":true
    }
    说明 向导模式暂不支持配置。
  • 配置示例:
    ## 读端:Elasticsearch中的原始数据
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "7XAOOoMB4GR_1Dmrrust",
            "_score": 1.0,
            "_source": {
                "level1": {
                    "level2": [
                        {
                            "level3": "testlevel3_1"
                        },
                        {
                            "level3": "testlevel3_2"
                        }
                    ]
                }
            }
        }
    ]
    ##数据集成Elasticsearch reader插件配置
    "parameter": {
      "column": [
          "level1",
          "level1.level2",
          "level1.level2[0]",
          "level1.level2.level3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##写端结果:1行数据4列
    column1(level1):            {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]}
    column2(level1.level2):     [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]
    column3(level1.level2[0]):  {"level3":"testlevel3_1"}
    column4(level1.level2.level3):  null
    说明
    • 获取的节点上层有数组时结果为null,如上样例获取level1.level2.level3不会报错,同步结果为null,需要配置为level1.level2[0].level3或level1.level2[1].level3,当前不支持level1.level2[*].level3。
    • 不支持key出现"."的数据, 如"level1.level2":{"level3":"testlevel3_1"}, 此时该条数据获取结果为null。

场景三:数组属性拆分为多行

  • 背景说明:附属信息有一对多的情况,需要将数组列拆成多行。
  • 配置形式:属性[*].子属性
  • 效果示意:源端数据{ "splitKey" :[1,2,3,4,5]},拆完后写到目标端为5行:{"splitKey[0]":1,"splitKey[1]":2,"splitKey[2]":3,"splitKey[3]":4,"splitKey[4]":5}
  • 脚本配置:
    "multi":{   
           "multi":true,    
            "key": "headers"
    }
    说明
    • 向导模式下配置拆多行数组列名,会自动生成脚本配置,具有相同效果。
    • value必须为List,否则会报错。
  • 配置示例:
## 读端:Elasticsearch中的原始数据
[
    {
        "_index": "lmtestjson",
        "_type": "_doc",
        "_id": "nhxmIYMBKDL4VkVLyXRN",
        "_score": 1.0,
        "_source": {
            "headers": [
                {
                    "remoteip": "192.0.2.1"
                },
                {
                    "remoteip": "192.0.2.2"
                }
            ]
        }
    },
    {
        "_index": "lmtestjson",
        "_type": "_doc",
        "_id": "wRxsIYMBKDL4VkVLcXqf",
        "_score": 1.0,
        "_source": {
            "headers": [
                {
                    "remoteip": "192.0.2.3"
                },
                {
                    "remoteip": "192.0.2.4"
                }
            ]
        }
    }
]
##数据集成Elasticsearch reader插件配置
{
   "column":[
      "headers[*].remoteip"
  ]
  "multi":{
      "multi":true,
      "key": "headers"
  }
}

##写端结果:4行
192.0.2.1
192.0.2.2
192.0.2.3
192.0.2.4

场景四:数组属性去重归并

  • 背景说明:数组去重归并,将一个数组属性去重归并后写入为字符串属性,数组属性可以为子属性如name1.name2,去重采用tostring结果作为标准。
  • 配置形式:属性[]。

    column里面带有 [] 关键字就会认为对该属性做去重归并。

  • 脚本配置:
    "multi":{
        "multi":true
    }
    说明 向导模式暂不支持配置。
  • 配置示例:
    ## 读端:Elasticsearch中的原始数据
    "hits": [
    {
        "_index": "mutiltest_1",
        "_type": "_doc",
        "_id": "4nbUOoMB4GR_1Dmryj8O",
        "_score": 1.0,
        "_source": {
            "feature1": [
                "value1",
                "value1",
                "value2",
                "value2",
                "value3"
            ]
        }
    }
    ]
    ##数据集成Elasticsearch reader插件配置
    "parameter": {
      "column":[
            "feature1[]"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##写端结果:1行1列数据
    "value1,value2,value3"

场景五:多属性合一同步

  • 背景说明:多属性选择处理,返回第一个有值的属性,都不存在时将写入null。
  • 配置形式:属性1|属性2|...

    column里面带有 "|"关键字就会对该项做多属性选择。

  • 脚本配置:
    "multi":{    
        "multi":true
    }
    说明 向导模式暂不支持该配置。
  • 配置示例:
    ##读端:Elasticsearch中的原始数据
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "v3ShOoMB4GR_1DmrZN22",
            "_score": 1.0,
            "_source": {
                "feature1": "feature1",
                "feature2": [
                    1,
                    2,
                    3
                ],
                "feature3": {
                    "child": "feature3"
                }
            }
        }]
    
    ##数据集成Elasticsearch reade插件配置
    "parameter": {
      "column":[
            "feature1|feature2|feature3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##写端结果:1行1列数据
    "feature1"

场景六:多属性选择同步

  • 背景说明:多属性选择处理 ,返回第一个有值的属性,都不存在时写入null。
  • 配置形式:属性1|属性2|...

    column里面带有 "|"关键字就会对该项做多属性选择

  • 脚本配置:
    "multi":{
        "multi":true
    }
    说明 向导模式暂不支持该配置。
  • 配置示例:
    ##读端:Elasticsearch中的原始数据
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "v3ShOoMB4GR_1DmrZN22",
            "_score": 1.0,
            "_source": {
                "feature1": "feature1",
                "feature2": [
                    1,
                    2,
                    3
                ],
                "feature3": {
                    "child": "feature3"
                }
            }
        }]
    ##数据集成Elasticsearch reader插件配置
    "parameter": {
      "column":[
            "feature1,feature2,feature3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ##写端结果:1行1列数据
    "feature1,[1,2,3],{"child":"feature3"}"