This topic describes how Elasticsearch Reader works, its features, and its parameters.

How it works

The shared resource group supports Elasticsearch Writer for Elasticsearch 5.X. Exclusive resource groups for Data Integration support Elasticsearch Writer for Elasticsearch 5.X and 6.X. For more information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration.

Elasticsearch Reader works in the following way:
  • Uses the _search, scroll, and slice APIs of Elasticsearch. The slices are processed by multiple threads of a Data Integration node.
  • Converts data types based on the mapping configuration of Elasticsearch.

For more information, visit the Elasticsearch official website.

Basic configuration

Notice Delete the comments from the following code before you run the code.
{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" // The number of error records.
         },
        "jvmOption":"",
        "speed":{
            "concurrent":3,
            "throttle":false
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ // The columns to be read.
                     "id",
                    "name"
                ],
                "endpoint":"", // The endpoint.
                 "index":"",  // The index.
                 "password":"",  // The password.
                 "scroll":"",  // The scroll ID.
                 "search":"",  // The search criteria. The value is the same as the Elasticsearch query that uses the _search API.
                 "type":"default",
                "username":""  // The username.
             },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" // The version number.
}

Advanced features

  • Supports full extraction.

    You can extract all data of an Elasticsearch document to a field.

  • Converts semi-structured data to structured data.
    Item Description
    Background Data in Elasticsearch is deeply nested. Elasticsearch may contain fields of various types and lengths and may use Chinese names. To facilitate data computing and storage in downstream services, Elasticsearch Reader can convert semi-structured data to structured data.
    How it works Elasticsearch Reader flattens nested JSON data obtained from Elasticsearch to single-dimensional data by using a JSON tool to obtain paths. Then, Elasticsearch Reader maps the data to structured tables. This way, Elasticsearch data in a complex structure is converted to multiple structured tables.
    Solution
    • Uses paths to parse nested JSON data.
      • Property
      • Property. Child property
      • Property[0]. Child property
    • Traverses all data of a property that has multiple child properties and splits the data to multiple tables or multiple rows.

      Property[*]. Child property

    • Merges data in a string array to one property and removes duplicates.

      Property[] where duplicates are removed

    • Merges multiple properties to one property.

      Property 1,Property 2

    • Selects properties for processing from multiple properties.

      Property 1|Property 2

Parameters

Parameter Description Required Default value
datasource The name of the connection. It must be the same as the name of the added connection. You can add connections in the code editor. Yes N/A
index The index name in Elasticsearch. Yes N/A
type The type name in the index of Elasticsearch. No Index name
search The query parameter of Elasticsearch. Yes N/A
pageSize The number of data records to read at a time. No 100
scroll The scroll parameter of Elasticsearch, which sets the timestamp of the snapshot that is taken for a scroll. Yes N/A
sort The field based on which the returned results are sorted. No N/A
retryCount The number of retries after a failure. No 300
connTimeOut The connection timeout period of the client. No 600,000
readTimeOut The data reading timeout period of the client. No 600,000
multiThread Specifies whether to use multiple threads for an HTTP request. No true

Codeless UI mode

Create a sync node and configure the node. For more information, see Create a sync node by using the codeless UI.

Perform the following steps on the configuration tab of the sync node:
  1. Configure the connections.
    Configure the connections to the source and destination data stores for the sync node.ES
    Parameter Description
    Connection The name of the connection that you have configured.
    Index The index name in Elasticsearch.
    Index Type The type name in the index of Elasticsearch.
    Search Condition The query parameter of Elasticsearch.
    Records Per Read The number of data records to read at a time. Default value: 100.
    Scroll Snapshot Timestamp The scroll parameter, which sets the timestamp of the snapshot that is taken for a scroll.
    Advanced Settings The Advanced Settings section contains the following parameters:
    • Sort By: the field based on which the returned results are sorted.
    • Maximum Retries: the number of retries after a failure.
    • Connect Timeout: the connection timeout period of the client.
    • Read Timeout: the data reading timeout period of the client.
    • Multi Thread: specifies whether to use multiple threads for an HTTP request.
    • Create Column Recording ES Data: specifies whether to store all data of Elasticsearch in a column.

      For example, if you want Elasticsearch Reader to read all data from Elasticsearch as a column and synchronize it to MaxCompute, you must set the Create Column Recording ES Data parameter. The column that stores the data is set to content, which contains the source information in the hits[] parameter.

    • Split Array into Multi Rows: specifies whether to split an array to multiple rows. If you enable this feature, you must specify child properties.
  2. Configure field mappings. It is equivalent to setting the column parameter in the preceding parameter description.

    The field mapping is not required if the source or destination data store is Lindom, HBase, Tair, or Elasticsearch.

  3. Configure channel control policies.Channel
    Parameter Description
    Expected Maximum Concurrency The maximum number of concurrent threads that the sync node uses to read data from the source data store and write data to the destination data store. You can configure the concurrency for the sync node on the codeless user interface (UI).
    Bandwidth Throttling You can enable bandwidth throttling and set a maximum transmission rate to avoid heavy read workload of the source data store. We recommend that you enable bandwidth throttling and set the maximum transmission rate to a proper value based on the configurations of the source data store.
    Dirty Data Records Allowed The maximum number of dirty data records that are allowed.

Code Editor mode

The following example shows how to configure a sync node in JSON to read data from Elasticsearch. For more information about how to use the code editor, see Create a sync node by using the code editor.
Notice Delete the comments from the following code before you run the code.
{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" // The number of error records.
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,
            "throttle":false
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ // The columns to be read.
                    "id",
                    "name"
                ],
                "endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200", // The endpoint.
                "index":"aliyun_es_xx",  // The index.
                "password":"*******",// The password.
                "multiThread":true,
                "scroll":"",  // The scroll ID.
                "pageSize":5000,
                "connTimeOut":600000,
                "readTimeOut":600000,
                "retryCount":30,
                "retrySleepTime":"10000",
                "search":{
                            "range":{
                                "gmt_modified":{
                                    "gte":0
                                }
                            }
                        },  // The search criteria. The value is the same as the Elasticsearch query that uses the _search API.
                "type":"doc",
                "username":"aliyun_di"  // The username.
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" // The version number.
}