本文为您介绍Elasticsearch Reader的工作原理、功能和参数。

工作原理

Elasticsearch在公共资源组上支持Elasticsearch5.x版本,在独享数据集成资源组上支持Elasticsearch5.x和6.x版本。独享数据集成资源组的详情请参见独享数据集成资源组

Elasticsearch Reader的工作原理如下:
  • 通过Elasticsearch的_searchscrollslice(即游标分片)方式实现,slice结合数据集成任务的task多线程分片机制使用。
  • 根据Elasticsearch中的Mapping配置,转换数据类型。

更多详情请参见Elasticsearch官方文档

基本配置

注意 实际运行时,请删除下述代码中的注释。
{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" //错误记录数。
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,
            "throttle":false
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ //读取列。
                    "id",
                    "name"
                ],
                "endpoint":"", //服务地址。
                "index":"",  //索引。
                "password":"",  //密码。
                "scroll":"",  //scroll标志。
                "search":"",  //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
                "type":"default",
                "username":""  //用户名。
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" //版本号。
}

高级功能

  • 支持全量拉取

    支持将Elasticsearch中一个文档的所有内容拉取为一个字段。

  • 支持提取半结构化到结构化数据
    分类 描述
    产生背景 Elasticsearch中的数据特征为字段不固定,且有中文名、数据使用深层嵌套的形式。为更好地方便下游业务对数据的计算和存储需求,特推出从半结构化到结构化的转换解决方案。
    实现原理 将Elasticsearch获取到的JSON数据,利用JSON工具的路径获取特性,将嵌套数据扁平化为一维结构的数据。然后将数据映射至结构化数据表中,拆分Elasticsearch复合结构数据至多个结构化数据表。
    解决方案
    • JSON有嵌套的情况,通过path路径来解决:
      • 属性
      • 属性.子属性
      • 属性[0].子属性
    • 附属信息有一对多的情况,需要进行拆表拆行处理,进行遍历。

      属性[*].子属性

    • 数组归并,一个字符串数组内容,归并为一个属性,并进行去重。

      属性[] 去重

    • 多属性合一,将多个属性合并为一个属性。

      属性1,属性2

    • 多属性选择处理

      属性1|属性2

参数说明

参数 描述 是否必选 默认值
datasource 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。
index Elasticsearch中的index名。
type Elasticsearch中indextype名。 index名
search Elasticsearch的query参数。
pageSize 每次读取数据的条数。 100
scroll Elasticsearch的分页参数,设置游标存放时间。
sort 返回结果的排序字段。
retryCount 失败后重试的次数。 300
connTimeOut 客户端连接超时时间。 600,000
readTimeOut 客户端读取超时时间。 600,000
multiThread http请求,是否有多线程。 true

向导开发介绍

打开新建的数据同步节点,即可进行同步任务的配置,详情请参见通过向导模式配置任务

您需要在数据同步任务的编辑页面进行以下配置:
  1. 选择数据源。
    配置同步任务的数据来源数据去向ES
    参数 描述
    数据源 通常填写您配置的数据源名称。
    索引 Elasticsearch中的index名。
    索引类型 Elasticsearch中index的type名。
    检索查询条件 Elasticsearch的query参数。
    分页大小 每次读取数据的条数,默认为100。
    游标时间 分页参数,设置游标存放时间。
    高级配置 高级配置包括以下内容:
    • 排序方式:返回结果的排序字段。
    • 重试次数:失败后重试的次数。
    • 连接超时时间:客户端连接超时时间。
    • 读取超时时间:客户端读取超时时间。
    • 是否多线程并发:HTTP请求,是否有多线程。
    • 全文作为一列:是否将Elasticsearch的数据拉取为一个字段。

      例如,Elasticsearch Reader需要读取Elasticsearch的所有数据作为一列同步至MaxCompute,则需要设置全文作为一列。设置Elasticsearch Reader中的column为contentcontenthits[]中的一行信息_source全内容。

    • 将数组进行列拆多行:是否将数组进行列拆多行的处理,需要辅助设置子属性。
  2. 字段映射,即上述参数说明中的column

    来源或目标端有Lindom、HBase、Tair、Elasticsearch数据源,字段无需连线,直接编辑即可保存。

  3. 通道控制。通道控制
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。

脚本开发介绍

配置一个从Elasticsearch读取数据的JSON示例,使用脚本开发的详情请参见通过脚本模式配置任务
注意 实际运行时,请删除下述代码中的注释。
{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" //错误记录数。
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,
            "throttle":false
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ //读取列。
                    "id",
                    "name"
                ],
                "endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200", //服务地址。
                "index":"aliyun_es_xx",  //索引。
                "password":"*******",  //密码。
                "multiThread":true,
                "scroll":"5m",  //scroll标志。
                "pageSize":5000,
                "connTimeOut":600000,
                "readTimeOut":600000,
                "retryCount":30,
                "retrySleepTime":"10000",
                "search":{
                            "range":{
                                "gmt_modified":{
                                    "gte":0
                                }
                            }
                        },  //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
                "type":"doc",
                "username":"aliyun_di"  //用户名。
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" //版本号。
}