This topic describes working principles, features, and parameters that are supported by Elasticsearch Reader and how to configure Elasticsearch Reader by using the codeless user interface (UI) and code editor.

Limits

DataWorks allows you to add Alibaba Cloud Elasticsearch V5.X, V6.X, and V7.X clusters as data sources. Self-managed Elasticsearch clusters are not supported.

How Elasticsearch Reader works

Elasticsearch Reader can read data from Elasticsearch V5.X clusters by using shared resource groups, and it can read data from Elasticsearch V5.X, V6.X, and V7.X clusters by using exclusive resource groups for Data Integration. For more information about exclusive resource groups for Data Integration, see Create and use an exclusive resource group for Data Integration.

Elasticsearch Reader works in the following way:
  • Uses the _search, scroll, and slice APIs of Elasticsearch. The slices are processed by multiple threads of a Data Integration node.
  • Converts data types based on the data type mapping of Elasticsearch.

For more information, visit the Elasticsearch official website.

Basic configuration

Notice Delete the comments from the following code before you run the code.
{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
"record":"0" // The maximum number of dirty records allowed. 
        },
        "jvmOption":"",
        "speed":{
"concurrent":3,// The maximum number of parallel threads.
            "throttle":true,//
                     "mbps":"12",// The maximum transmission rate.
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
"column":[ // The columns from which you want to read data. 
                    "id",
                    "name"
                ],
"endpoint":"", // The endpoint. 
"index":"",  // The index. 
"password":"",  // The password. 
"scroll":"",  // The scroll ID. 
"search":"",  // The search criteria. The value is the same as the Elasticsearch query that calls the _search API. 
                "type":"default",
"username":""  // The username. 
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
"version":"2.0" // The version number. 
}

Advanced features

  • Extracts all data.

    You can extract all data of an Elasticsearch document to a field.

  • Converts semi-structured data to structured data.
    Item Description
    Background information Data in Elasticsearch is deeply nested, has various field types and lengths, and may use Chinese characters. To facilitate data computing and storage for downstream services, Elasticsearch Reader can convert semi-structured data to structured data.
    Principle Elasticsearch Reader uses a JSON tool to obtain data paths and flatten nested JSON-formatted data obtained from an Elasticsearch cluster to single-dimensional data. Then, Elasticsearch Reader maps the data to structured tables. This way, Elasticsearch data in a complex structure is converted to multiple structured tables.
    Solution
    • Uses paths to parse nested JSON-formatted data.
      • Property
      • Property.Child property
      • Property[0].Child property
    • Traverses all data of a property that has multiple child properties and splits the data to multiple tables or multiple rows.

      Property[*].Child property

    • Merges data in a string array to one property and removes duplicates.

      Property[] where duplicates are removed

    • Merges multiple properties to one property.

      Property 1,Property 2

    • Selects properties from multiple properties for processing.

      Property 1|Property 2

Parameters

Parameter Description Required Default value
datasource The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. Yes No default value
index The name of the index in an Elasticsearch cluster. Yes No default value
type The name of the index type in an Elasticsearch cluster. No Index name
search The query parameter of Elasticsearch. Yes No default value
pageSize The number of data records to read at a time. No 100
scroll The parameter that is used to specify the timestamp of the snapshot taken for a scroll. Yes No default value
sort The field based on which the returned results are sorted. No No default value
retryCount The number of retries after a failure. No 300
connTimeOut The connection timeout period of the client. No 600,000
readTimeOut The data read timeout period of the client. No 600,000
multiThread Specifies whether to use multiple threads for an HTTP request. No true

Configure Elasticsearch Reader by using the codeless UI

Create a synchronization node and configure the node. For more information, see Configure a synchronization node by using the codeless UI.

Perform the following steps on the configuration tab of the synchronization node:
  1. Configure data sources.
    Configure Source and Target for the synchronization node. ES
    Parameter Description
    Connection The name of the data source that you have configured.
    Index The name of the index in the source Elasticsearch cluster.
    Query condition The query parameter of Elasticsearch.
    Paginate size The number of data records to read at a time. Default value: 100.
    Scroll parameter The parameter that is used to specify the timestamp of the snapshot taken for a scroll.
    Advanced Settings The Advanced Settings section contains the following parameters:
    • Sort by: the field based on which the returned results are sorted.
    • All text as one field: specifies whether to store all data of an Elasticsearch cluster in a column.

      For example, if you want Elasticsearch Reader to read all data from an Elasticsearch cluster as a column and synchronize it to MaxCompute, you must specify the All text as one field parameter. The column that stores the data is set to content, which contains the source information in the hits[] parameter.

      Note _id is an inherent attribute of Elasticsearch data. This attribute cannot be extracted and written to the destination by using a data synchronization node. You can set All text as one field to Yes to enable Elasticsearch Reader to read and synchronize each data record in an Elasticsearch cluster to the destination as a field. For this setting, the full parameter in the JSON script is set to true. Then, you can use the GET_JSON_OBJECT function or other JSON functions to extract the _id attribute for further processing.
    • Split array fields name: specifies whether to split an array to multiple rows. If you enable this feature, you must specify child properties.
  2. Configure field mappings. This operation is equivalent to setting the column parameter that is described in the preceding section.
    Fields in the source on the left have a one-to-one mapping with fields in the destination on the right. You can click Add to add a field. To remove an added field, move the pointer over the field and click the Remove icon.
    Note The field mapping is not required if the source or destination is Lindorm, HBase, Tair, or Elasticsearch.
    Field mappings
    Operation Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish mappings between fields with the same name. The data types of the fields must match.
    Map Fields in the Same Line Click Map Fields in the Same Line to establish mappings between fields in the same row. The data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove the mappings that are established.
    Auto Layout Click Auto Layout. Then, the system sorts the fields based on specific rules.
    Add Click Add to add a field. You can add fields of the following types:
    • You can enter constants. Each constant must be enclosed in single quotation marks ('), such as 'abc' and '123'.
    • You can use scheduling parameters, such as ${bizdate}.
    • You can enter functions that are supported by relational databases, such as now() and count(1).
    • If the field that you entered cannot be parsed, the value of Type for the field is Unidentified.
  3. Configure channel control policies. Channel control
    Parameter Description
    Expected Maximum Concurrency The maximum number of parallel threads that the synchronization node uses to read data from the source or write data to the destination. You can configure the parallelism for the synchronization node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and specify a maximum transmission rate to prevent heavy read workloads on the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to an appropriate value based on the configurations of the source.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Distributed Execution

    The distributed execution mode that allows you to split your node into pieces and distribute them to multiple Elastic Compute Service (ECS) instances for parallel execution. This speeds up synchronization. If you use a large number of parallel threads to run your synchronization node in distributed execution mode, excessive access requests are sent to the data sources. Therefore, before you use the distributed execution mode, you must evaluate the access load on the data sources. You can enable this mode only if you use an exclusive resource group for Data Integration. For more information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration and Create and use an exclusive resource group for Data Integration.

Configure Elasticsearch Reader by using the code editor

In the following code, a synchronization node is configured to read data from an Elasticsearch cluster by using the code editor. For more information, see Create a synchronization node by using the code editor.
Notice Delete the comments from the following code before you run the code.
{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" // The maximum number of dirty data records allowed. 
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,
            "throttle":false
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ // The columns from which you want to read data. 
                    "id",
                    "name"
                ],
                "endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200",// The endpoint. 
                "index":"aliyun_es_xx",  // The index. 
                "password":"*******",  // The password. 
                "multiThread":true,
                "scroll":"5m",// The scroll ID. 
                "pageSize":5000,
                "connTimeOut":600000,
                "readTimeOut":600000,
                "retryCount":30,
                "retrySleepTime":"10000",
                "search":{
                            "range":{
                                "gmt_modified":{
                                    "gte":0
                                }
                            }
                        },  // The search criteria. The value is the same as the Elasticsearch query that calls the _search API. 
                "type":"doc",
                "username":"aliyun_di"// The username. 
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" // The version number. 
}

Configure the resource group for Data Integration

  1. On the configuration tab of the synchronization node, click the Resource Group configuration tab in the right-side navigation pane.
  2. On the Resource Group configuration tab, select the desired exclusive resource group for Data Integration from the Exclusive Resource Groups drop-down list.
    Exclusive resource group for Data Integration
    Note
    • (Recommended) By default, you can select an exclusive resource group for Data Integration on the Resource Group configuration tab. To ensure the stability and performance of data synchronization, we recommend that you use exclusive resource groups for Data Integration.
    • If you want to select the shared resource group for Data Integration, click More in the lower-right corner of the page. In the Warning message, click OK. On the Resource Group configuration tab, select the shared resource group. For more information about custom resource groups for Data Integration and the shared resource group for Data Integration, see Shared resource groups.