All Products
Search
Document Center

DataWorks:Elasticsearch data source

Last Updated:Feb 26, 2024

DataWorks provides Elasticsearch Reader and Elasticsearch Writer for you to read data from and write data to Elasticsearch data sources. This topic describes the capabilities of synchronizing data from or to Elasticsearch data sources.

Background information

You can use a shared resource group to run a synchronization task to read data from or write data to an Elasticsearch V5.X cluster. You can use an exclusive resource group for Data Integration to run a synchronization task to read data from or write data to an Elasticsearch V5.X, V6.X, or V7.X cluster. For information about exclusive resource groups for Data Integration, see Create and use an exclusive resource group for Data Integration.

Elasticsearch is an open source service that is released based on the Apache License. The service is a popular search engine for enterprises. Elasticsearch is a distributed search and analytics engine built on top of Apache Lucene. The following description provides the mappings between the core concepts of Elasticsearch and those of a relational database:

Relational database instance -> Database -> Table -> Row -> Column
Elasticsearch        -> Index              -> Types       -> Documents       -> Fields

An Elasticsearch cluster can contain multiple indexes (databases). Each index can contain multiple types (tables). Each type can contain multiple documents (rows). Each document can contain multiple fields (columns). Elasticsearch Writer obtains data records from a reader and uses the RESTful API of Elasticsearch to write the data records to Elasticsearch at a time.

Supported Elasticsearch versions

DataWorks allows you to add only Alibaba Cloud Elasticsearch V5.X, V6.X, and V7.X clusters as data sources. Self-managed Elasticsearch clusters are not supported.

Limits

Batch data read and write

  • Elasticsearch Reader obtains shard information on the server for data synchronization. You must make sure that the shards on the server are in the active state during data synchronization. Otherwise, data inconsistency may occur.

  • If you add an Alibaba Cloud Elasticsearch V6.X or V7.X cluster to DataWorks as a data source and configure a synchronization task for the data source, you can use only an exclusive resource group for Data Integration to run the synchronization task.

  • Fields of the scaled_float data type cannot be synchronized.

  • Indexes that contain fields with the keyword $ref in field names cannot be synchronized.

Supported data types

Data type

Elasticsearch Reader for batch data read

Elasticsearch Writer for batch data write

Elasticsearch Writer for real-time data write

binary

Supported

Supported

Supported

boolean

Supported

Supported

Supported

keyword

Supported

Supported

Supported

constant_keyword

Not supported

Not supported

Not supported

wildcard

Not supported

Not supported

Not supported

long

Supported

Supported

Supported

integer

Supported

Supported

Supported

short

Supported

Supported

Supported

byte

Supported

Supported

Supported

double

Supported

Supported

Supported

float

Supported

Supported

Supported

half_float

Not supported

Not supported

Not supported

scaled_float

Not supported

Not supported

Not supported

unsigned_long

Not supported

Not supported

Not supported

date

Supported

Supported

Supported

date_nanos

Not supported

Not supported

Not supported

alias

Not supported

Not supported

Not supported

object

Supported

Supported

Supported

flattened

Not supported

Not supported

Not supported

nested

Supported

Supported

Supported

join

Not supported

Not supported

Not supported

integer_range

Supported

Supported

Supported

float_range

Supported

Supported

Supported

long_range

Supported

Supported

Supported

double_range

Supported

Supported

Supported

date_range

Supported

Supported

Supported

ip_range

Not supported

Supported

Supported

ip

Supported

Supported

Supported

version

Supported

Supported

Supported

murmur3

Not supported

Not supported

Not supported

aggregate_metric_double

Not supported

Not supported

Not supported

histogram

Not supported

Not supported

Not supported

text

Supported

Supported

Supported

annotated-text

Not supported

Not supported

Not supported

completion

Supported

Not supported

Not supported

search_as_you_type

Not supported

Not supported

Not supported

token_count

Supported

Not supported

Not supported

dense_vector

Not supported

Not supported

Not supported

rank_feature

Not supported

Not supported

Not supported

rank_features

Not supported

Not supported

Not supported

geo_point

Supported

Supported

Supported

geo_shape

Supported

Supported

Supported

point

Not supported

Not supported

Not supported

shape

Not supported

Not supported

Not supported

percolator

Not supported

Not supported

Not supported

string

Supported

Supported

Supported

How Elasticsearch Reader works

Elasticsearch Reader works in the following way:

  • Uses the _search, scroll, and slice APIs of Elasticsearch. The slices are processed by multiple threads of a synchronization task in Data Integration.

  • Converts data types based on the mapping configuration of Elasticsearch.

For more information, see the documentation for open source Elasticsearch.

Note

Elasticsearch Reader obtains shard information on the server for data synchronization. You must make sure that the shards on the server are in the active state during data synchronization. Otherwise, data inconsistency may occur.

Basic configuration

Important

You must delete the comments from the following sample code before you run the code.

{
 "order":{
  "hops":[
   {
    "from":"Reader",
    "to":"Writer"
   }
  ]
 },
 "setting":{
  "errorLimit":{
   "record":"0" // The maximum number of dirty data records allowed. 
  },
  "jvmOption":"",
  "speed":{
   "concurrent":3,// The maximum number of parallel threads.
   "throttle":true,//
                     "mbps":"12",// The maximum transmission rate. Unit: MB/s. 
  }
 },
 "steps":[
  {
   "category":"reader",
   "name":"Reader",
   "parameter":{
    "column":[ // The names of the fields. 
     "id",
     "name"
    ],
    "endpoint":"", // The endpoint of the Elasticsearch cluster. 
    "index":"",  // The name of the index from which you want to read data. 
    "password":"",  // The password of the Elasticsearch cluster. 
    "scroll":"",  // The scroll ID. 
    "search":"",  // The search criteria. The value is the same as the Elasticsearch query that uses the _search API. 
    "type":"default",
    "username":""  // The username of the Elasticsearch cluster. 
   },
   "stepType":"elasticsearch"
  },
  {
   "stepType": "elasticsearch",
            "parameter": {
                "column": [ // The names of the fields to which you want to write data.
                    {
                        "name": "id",
                        "type": "integer"
                    }, 
                    {
                        "name": "name",
                        "type": "text"
                    }
                ],
                "index": "test",   // The name of the index to which you want to write data.
                 "indexType": "",   // The type of the index to which you want to write data. If you use an Elasticsearch V7.0 cluster, leave this parameter empty.
                "actionType": "index",  // The write mode.
                "cleanup": false,         // Specifies whether to create an index before data write.
                "datasource": "test",   // The name of the data source.
                "primaryKeyInfo": {     // The value assignment method of the primary key.
                    "fieldDelimiterOrigin": ",",
                    "column": [
                        "id"
                    ],
                    "type": "specific",
                    "fieldDelimiter": ","
                },
                "dynamic": false,  // Specifies whether to use the dynamic mapping mechanism to establish mappings between fields.
                "batchSize": 1024   // The number of documents to write at a time.
            },
            "name": "Writer",
            "category": "writer"
  }
 ],
 "type":"job",
 "version":"2.0" // The version number. 
}

Advanced features

Develop a data synchronization task

For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.

Add a data source

Before you configure a data synchronization task to synchronize data from or to a specific data source, you must add the data source to DataWorks. For more information, see Add and manage data sources.

Configure a batch synchronization task to synchronize data of a single table

Configure a real-time synchronization task to synchronize data of a single table

For more information about the configuration procedure, see Configure a real-time synchronization task in DataStudio.

Configure synchronization settings to implement batch synchronization of all data in a database or one-time synchronization of full data and real-time synchronization of incremental data in a single table or a database

For more information about the configuration procedure, see Configure a synchronization task in Data Integration.

Appendix: Code and parameters

Appendix: Configure a batch synchronization task by using the code editor

If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.

Code for Elasticsearch Reader

{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" // The maximum number of dirty data records allowed. 
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,
            "throttle":false
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ // The names of the fields. 
                    "id",
                    "name"
                ],
                "endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200",// The endpoint of the Elasticsearch cluster. 
                "index":"aliyun_es_xx",  // The name of the index. 
                "password":"*******",  // The password of the Elasticsearch cluster. 
                "multiThread":true,
                "scroll":"5m",  // The scroll ID. 
                "pageSize":5000,
                "connTimeOut":600000,
                "readTimeOut":600000,
                "retryCount":30,
                "retrySleepTime":"10000",
                "search":{
                            "range":{
                                "gmt_modified":{
                                    "gte":0
                                }
                            }
                        },  // The search criteria. The value is the same as the Elasticsearch query that calls the _search API. 
                "type":"doc",
                "username":"aliyun_di"  // The username of the Elasticsearch cluster. 
            },
            "stepType":"elasticsearch"
        },
        {
            "category":"writer",
            "name":"Writer",
            "parameter":{ },
            "stepType":"stream"
        }
    ],
    "type":"job",
    "version":"2.0" // The version number. 
}

Parameters in code for Elasticsearch Reader

Parameter

Description

Required

Default value

datasource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

Yes

No default value

index

The name of the index from which you want to read data.

Yes

No default value

type

The name of the index type in the Elasticsearch cluster.

No

Index name

search

The query parameter of Elasticsearch.

Yes

No default value

pageSize

The number of data records to read at a time.

No

100

scroll

The parameter that is used to specify the timestamp of the snapshot taken for a scroll.

  • If you set this parameter to an excessively small value and the interval at which two pages of data is obtained exceeds the value of the scroll parameter, the scroll may expire and data may be lost.

  • If you set this parameter to an excessively large value and the number of query requests initiated at the same point in time exceeds the value of the max_open_scroll_context parameter of the server side, an error may be reported for data queries.

Yes

No default value

strictMode

Specifies whether to read data from the Elasticsearch cluster in strict mode. In strict mode, if a shard-related error is reported, Elasticsearch Reader stops reading data to prevent some data from failing to be read.

No

true

sort

The field based on which the returned results are sorted.

No

No default value

retryCount

The number of retries after a failure.

No

300

connTimeOut

The connection timeout period of the client.

No

600,000

readTimeOut

The timeout period of data read of the client.

No

600,000

multiThread

Specifies whether to use multiple threads for an HTTP request.

No

true

preemptiveAuth

Specifies whether to use the preemptive request mode for an HTTP request.

No

false

retrySleepTime

The interval between retries after a failure.

No

1000

discovery

Specifies whether to enable the node discovery mechanism.

  • true: Enables the node discovery mechanism. Data Integration connects to a random node in the Elasticsearch cluster. If node discovery is enabled, the server list in the client is polled and regularly updated, and the nodes that are discovered are queried.

  • false: Disables the node discovery mechanism. Data Integration initiates a request to the endpoint of the Elasticsearch cluster.

No

false

compression

Specifies whether to compress a request body in the GZIP format. If you set this parameter to true, you must enable the http.compression settings on the Elasticsearch cluster.

No

false

dateFormat

The dateFormat parameter is required if the fields to be synchronized include fields of a date data type and the format setting is not configured for mappings of the fields. You must specify all formats that are required to synchronize the fields of a date data type in this parameter, such as "dateFormat" : "yyyy-MM-dd||yyyy-MM-dd HH:mm:ss".

No

No default value

full

Specifies whether to synchronize all fields in a document in an Elasticsearch cluster to the destination as a field, and use the queried data in an Elasticsearch cluster as a field. For more information, see Scenario 1: Extract all data.

No

No default value

multi

You can configure this parameter to enable the advanced feature that supports five solutions to help Elasticsearch Reader convert semi-structured data into structured data. The two child properties are multi.key and multi.mult. For more information about the configurations of the advanced feature, see Advanced features.

No

No default value

Code for Elasticsearch Writer

{
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":1, // The maximum number of parallel threads. 
            "mbps":"12"// The maximum transmission rate. Unit: MB/s. 
        }
    },
    "steps": [
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {

            },
            "stepType": "stream"
        },
        {
            "category": "writer",
            "name": "Writer",
            "parameter": {
                "datasource":"xxx",
                "index": "test-1",
                "type": "default",
                "cleanup": true,
                "settings": {
                        "number_of_shards": 1,
                        "number_of_replicas": 0
                },
                "discovery": false,
                "primaryKeyInfo":{
                    "type":"pk",    
                     "fieldDelimiter":",",
                     "column":[]
                    },
                "batchSize": 1000,
                "dynamic":false,
                "esPartitionColumn":[
                    {
                        "name":"col1",  
                        "comment":"xx", 
                        "type":"STRING" 
                        }
                     ],
                "column": [
                    {
                        "name": "pk",
                        "type": "id"
                    },
                    {
                        "name": "col_ip",
                        "type": "ip"
                    },
                    {
                        "name": "col_array",
                        "type": "long",
                        "array": true,
                    },
                    {
                        "name": "col_double",
                        "type": "double"
                    },
                    {
                        "name": "col_long",
                        "type": "long"
                    },
                    {
                        "name": "col_integer",
                        "type": "integer"
                    {
                        "name": "col_keyword",
                        "type": "keyword"
                    },
                    {
                        "name": "col_text",
                        "type": "text",
                        "analyzer": "ik_max_word",
                        "other_params":
                            {
                                "doc_values": false
                            },
                    },
                    {
                        "name": "col_geo_point",
                        "type": "geo_point"
                    },
                    {
                        "name": "col_date",
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss"
                    },
                    {
                        "name": "col_nested1",
                        "type": "nested"
                    },
                    {
                        "name": "col_nested2",
                        "type": "nested"
                    },
                    {
                        "name": "col_object1",
                        "type": "object"
                    },
                    {
                        "name": "col_object2",
                        "type": "object"
                    },
                    {
                        "name": "col_integer_array",
                        "type": "integer",
                        "array": true
                    },
                    {
                        "name": "col_geo_shape",
                        "type": "geo_shape",
                        "tree": "quadtree",
                        "precision": "10m"
                    }
                ]
            },
            "stepType": "elasticsearch"
        }
    ],
    "type": "job",
    "version": "2.0"
}
Note

A connection failure may occur if you use the shared resource group for Data Integration to connect to an Elasticsearch cluster that is deployed in a virtual private cloud (VPC). To connect to an Elasticsearch cluster that is deployed in a VPC and synchronize data from or to the Elasticsearch cluster, use an exclusive resource group for Data Integration. For information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration.

Parameters in code for Elasticsearch Writer

Parameter

Description

Required

Default value

datasource

The name of the data source. If no data sources are available, add an Elasticsearch cluster to DataWorks as a data source.

Yes

No default value

index

The name of the index to which you want to write data.

Yes

No default value

indexType

The name of the index type in the destination Elasticsearch cluster.

No

Elasticsearch

cleanup

Specifies whether to delete the existing data from the index before data write. Valid values:

  • true: Deletes the original index and creates an index whose name is the same as the original index. This way, the existing data in the index is deleted.

  • false: Retains the existing data in the index.

No

false

batchSize

The number of data records to write at a time.

No

1,000

trySize

The maximum number of retries that are allowed after a write failure occurs.

No

30

timeout

The connection timeout of the client.

No

600,000

discovery

Specifies whether to enable node discovery.

  • true: Enables the node discovery mechanism. Data Integration connects to a random node in the Elasticsearch cluster. The server list in the client is polled and regularly updated.

  • false: Disables the node discovery mechanism. In this case, Data Integration connects to the Elasticsearch cluster.

No

false

compression

Specifies whether to enable compression for an HTTP request.

No

true

multiThread

Specifies whether to use multiple threads for an HTTP request.

No

true

ignoreWriteError

Specifies whether to ignore write errors and proceed with data write operations without retries.

No

false

ignoreParseError

Specifies whether to ignore format parsing errors and proceed with data write operations.

No

true

alias

The alias of the index to which you want to write data. The alias feature of Elasticsearch is similar to the view feature of a database. For example, if you create an alias named my_index_alias for the index my_index, the operations that are performed on my_index_alias also take effect on my_index.

If you configure the alias parameter, the alias that you specify is created for the index after data is written to the index.

No

No default value

aliasMode

The mode in which an alias is added after data is written to the index. Valid values: append and exclusive.

  • If you set the aliasMode parameter to append, an alias is added for the index. One alias maps to multiple indexes.

  • If you set the aliasMode parameter to exclusive, the existing alias of the index is deleted and a new alias is added for the index. One alias maps to one index.

Elasticsearch Writer can convert aliases to actual index names. You can use aliases to migrate data from one index to another index, search for data across multiple indexes in a unified manner, and create a view on a subset of data in an index.

No

append

settings

The settings of the index. The settings must follow the specifications of open source Elasticsearch.

No

No default value

column

The fields of the document. The parameters for each field include basic parameters such as name and type, and advanced parameters such as analyzer, format, and array.

Elasticsearch Writer supports the following types of fields:

- id  // The id type corresponds to the _id type in Elasticsearch and can be considered as the unique primary key. When Elasticsearch Writer writes data to Elasticsearch, data that has the same ID is overwritten and not indexed. 
- string
- text
- keyword
- long
- integer
- short
- byte
- double
- float
- date
- boolean
- binary
- integer_range
- float_range
- long_range
- double_range
- date_range
- geo_point
- geo_shape
- ip
- token_count
- array
- object
- nested

The following information describes the field types:

  • If the type of fields to which you want to write data is text, you can configure parameters such as analyzer, norms, and index_options. Example:

    {
        "name": "col_text",
        "type": "text",
        "analyzer": "ik_max_word"
        }
  • If the type of fields to which you want to write data is date, you can use one of the following methods to enable Elasticsearch Writer to parse source data:

    • Method 1: Enable Elasticsearch Writer to directly write data that is read from the source to fields of a date data type in the Elasticsearch data source:

      • Configure the origin:true setting.

      • Configure the format parameter. The format parameter specifies the format properties that must be configured in mappings of fields when Elasticsearch Writer creates mappings for the fields. Sample code:

          {
             "parameter":{
               "column":[{
                   "name": "col_date",
                   "type": "date",
                   "format": "yyyy-MM-dd HH:mm:ss",
                   "origin": true
                }]
           }
        }
    • Method 2: Convert the time zone. If you want to enable Data Integration to convert the time zone, configure the Timezone parameter. Sample code:

      The format parameter specifies the time format that is obtained after Data Integration converts the time zone.

        {
           "parameter" :{
             "column": [{
                "name": "col_date",
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss",
               "Timezone": "UTC"
             }]
         }
      }
  • If the type of fields to which you want to write data is geo_shape, you can configure the tree (geohash or quadtree) and precision parameters. Sample code:

    {
        "name": "col_geo_shape",
        "type": "geo_shape",
        "tree": "quadtree",
        "precision": "10m"
        }

If you want to define properties related to the Elasticsearch data source in addition to the field type when you configure the column parameter, you can configure the other_params parameter and define the properties in the other_params parameter. The configurations in the other_params parameter are used when Elasticsearch Writer updates the mappings of the destination index in the Elasticsearch data source.

 {
   "name": "guid",
   "other_params":
    {
       "doc_values": false
      },
    "type": "text"
  }

If you want to write source data to Elasticsearch as arrays, you can enable Elasticsearch Writer to parse the source data in the JSON format or based on a specified delimiter. For more information, see Appendix: Write data to Elasticsearch as arrays.

Yes

No default value

dynamic

Specifies whether to use the dynamic mapping mechanism of Elasticsearch to establish mappings for fields that are written to the index.

  • true: Uses the dynamic mapping mechanism of Elasticsearch to establish mappings.

  • false: Establishes field mappings and updates the mapping configurations of the index based on the setting of the column parameter. This is the default value. If you leave the dynamic parameter empty, the default value is used.

In Elasticsearch V7.X, the default value of the type parameter is _doc. If you set this parameter to true to use the dynamic mapping mechanism, set the type parameter to _doc and the esVersion parameter to 7.

You must add the following parameter configuration that specifies the version information to the code: "esVersion": "7".

No

false

actionType

The type of the action that can be performed when data is written to the destination Elasticsearch cluster. Data Integration supports only the following types of actions: index and update. Default value: index.

  • index: Elasticsearch Writer uses Index.Builder of an Elasticsearch SDK to construct a request for writing multiple data records at a time. In index mode, Elasticsearch Writer first checks whether an ID is specified for the document that you want to insert.

    • If no ID is specified, Elasticsearch Writer generates a unique ID. In this case, the document is directly inserted into the destination Elasticsearch cluster.

    • If an ID is specified, the existing document is replaced with the document that you want to insert. You cannot modify specific fields in the document.

      Note

      The replace operation in this case is different from the replace operation in Elasticsearch in which specific fields can be modified.

  • update: Elasticsearch Writer updates the existing document based on the ID that you specify. If the specified ID does not exist in the index, a new document is inserted. If the specified ID exists in the index, Elasticsearch Writer updates the fields that are specified in the column parameter to the document. Other fields in the document remain unchanged. Each time Elasticsearch Writer updates the document, Elasticsearch Writer obtains the information of the whole document to update specific fields in the document. In update mode, filter conditions are not supported, and the update operation can be performed based only on a specified ID. Each time Elasticsearch Writer updates the document, Elasticsearch Writer needs to obtain the information of the document, which greatly affects data synchronization performance.

    Note

    If you set the actionType parameter to update, you must configure the primaryKeyInfo parameter.

No

index

primaryKeyInfo

Specifies the value assignment method of the _id column that is used as the primary key to write data to Elasticsearch.

  • Business primary key (pk): The value of the _id column is a specific field.

    "parameter":{
    "primaryKeyInfo":{
    "type":"pk",
    "column":["id"]}
    }
  • Composite primary key (specific): The value of the _id column is obtained by concatenating the values of specific fields. The delimiter that is used to concatenate the values is the same as the delimiter that is specified by using the fieldDelimiter parameter.

    Note

    The names of the fields whose values you want to concatenate are the names of fields to which Elasticsearch Writer writes data. When you configure the batch synchronization task by using the codeless UI, the system extracts only fields that exist in the index as the configurations of primary key columns.

    "parameter":{
    "primaryKeyInfo":{
    "type":"specific",
    "fieldDelimiter":",",
    "column":["col1","col2"]}
    }
  • No primary key (nopk): The value of the _id column is automatically generated when data is written to Elasticsearch.

    "primaryKeyInfo":{
    "type":"nopk"
    }

Yes

specific

esPartitionColumn

Specifies whether to enable partitioning for the index when data is written to Elasticsearch. This parameter is used to change the routing setting of the Elasticsearch cluster.

  • true: Enables partitioning. In this case, the value of a specific column is used to insert a document to a specific shard or update a document in a specific shard. If you set this parameter to true, you must specify the partition key columns.

    {    "esPartitionColumn": [
            {
                "name":"col1",
                "comment":"xx",
                "type":"STRING"
                }
            ],
        }
  • false: Disables partitioning. If you leave this parameter empty, the values of the _id column are used to evenly distribute documents to shards. This prevents data skew.

No

false

enableWriteNull

Specifies whether source fields whose values are NULL can be synchronized to Elasticsearch. Valid values:

  • true: Source fields whose values are NULL can be synchronized to Elasticsearch. After the fields are synchronized, the values of the fields in Elasticsearch are also NULL.

  • false: Source fields whose values are NULL cannot be synchronized to Elasticsearch. The fields are not displayed in Elasticsearch.

No

true

Scenario 1: Extract all data

  • Description: You can extract all fields in a document in an Elasticsearch cluster to a field.

  • Example:

    
    ## Source: raw data in an Elasticsearch cluster
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "IXgdO4MB4GR_1DmrjTXP",
            "_score": 1.0,
            "_source": {
                "feature1": "value1",
                "feature2": "value2",
                "feature3": "value3"
            }
        }]
    
    ## Configurations of Elasticsearch Reader
    "parameter": {
      "column": [
          "content"
      ],
      "full":true
    }
    
    ## Destination: Data is synchronized to one column of a row in the destination table.
    {"_index":"mutiltest_1","_type":"_doc","_id":"IXgdO4MB4GR_1DmrjTXP","_source":{"feature1":"value1","feature2":"value2","feature3":"value3"},"sort":["IXgdO4MB4GR_1DmrjTXP"]}

Scenario 2: Synchronize nested JSON-formatted data or object properties

  • Description: You can use paths to parse nested JSON-formatted data or object properties.

  • Configuration format:

    • Property

    • Property.Child property

    • Property[0].Child property

  • Configuration in the code editor:

    "multi":{
        "multi":true
    }
    Note

    This configuration is not supported in the codeless UI.

  • Example:

    ## Source: raw data in an Elasticsearch cluster
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "7XAOOoMB4GR_1Dmrrust",
            "_score": 1.0,
            "_source": {
                "level1": {
                    "level2": [
                        {
                            "level3": "testlevel3_1"
                        },
                        {
                            "level3": "testlevel3_2"
                        }
                    ]
                }
            }
        }
    ]
    ## Configurations of Elasticsearch Reader
    "parameter": {
      "column": [
          "level1",
          "level1.level2",
          "level1.level2[0]",
          "level1.level2.level3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ## Destination: four columns of a row
    column1(level1):            {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]}
    column2(level1.level2):     [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]
    column3(level1.level2[0]):  {"level3":"testlevel3_1"}
    column4(level1.level2.level3):  null
    Note
    • If an upper-level property of a child property that you configure as a column contains an array, you must specify the upper-level property in the format of Property[N]. Otherwise, null is synchronized as the value of the column. In the preceding example, level2 contains an array, and level1.level2.level3 is specified as a column. As a result, no error message is returned, but null is synchronized as the value of the level1.level2.level3 column. In this example, to obtain the data of level3, you must specify level1.level2[0].level3 or level1.level2[1].level3 in the column parameter. level1.level2[*].level3 is not supported.

    • Data that contains periods (.) in the key cannot be read. For example, if the data to read is "level1.level2":{"level3":"testlevel3_1"}, null is returned.

Scenario 3: Split an array into multiple rows

  • Description: If data of a property has multiple child properties, you need to split the data to multiple rows.

  • Configuration format: Property[*].Child property.

  • Example: After the source data { "splitKey" :[1,2,3,4,5]} is split, the source data is written to five rows in the destination: { "splitKey[0]":1,"splitKey[1]":2,"splitKey[2]":3,"splitKey[3]":4,"splitKey[4]":5}.

  • Configuration in the code editor:

    "multi":{   
           "multi":true,    
            "key": "headers"
    }
    Note
    • If you configure Split array fields name in the codeless UI, a script is automatically generated, and the code in the script has the same effect as the code configured in the code editor.

    • The value of the source data must be in the List format. Otherwise, an error is reported.

  • Example:

    ## Source: raw data in an Elasticsearch cluster
    [
        {
            "_index": "lmtestjson",
            "_type": "_doc",
            "_id": "nhxmIYMBKDL4VkVLyXRN",
            "_score": 1.0,
            "_source": {
                "headers": [
                    {
                        "remoteip": "192.0.2.1"
                    },
                    {
                        "remoteip": "192.0.2.2"
                    }
                ]
            }
        },
        {
            "_index": "lmtestjson",
            "_type": "_doc",
            "_id": "wRxsIYMBKDL4VkVLcXqf",
            "_score": 1.0,
            "_source": {
                "headers": [
                    {
                        "remoteip": "192.0.2.3"
                    },
                    {
                        "remoteip": "192.0.2.4"
                    }
                ]
            }
        }
    ]
    ## Configurations of Elasticsearch Reader
    {
       "column":[
          "headers[*].remoteip"
      ]
      "multi":{
          "multi":true,
          "key": "headers"
      }
    }
    
    ## Destination: four rows
    192.0.2.1
    192.0.2.2
    192.0.2.3
    192.0.2.4

Scenario 4: Merge data in an array into one property and remove duplicates

  • Description: You can merge data in an array into one property and remove duplicates. The child properties can be name1.name 2, and the data is deduplicated based on the toString result.

  • Configuration format: Property []

    If the data in a column contains the keyword [], the data is merged, and duplicates are removed.

  • Configuration in the code editor:

    "multi":{
        "multi":true
    }
    Note

    This configuration is not supported in the codeless UI.

  • Example:

    ## Source: raw data in an Elasticsearch cluster
    "hits": [
    {
        "_index": "mutiltest_1",
        "_type": "_doc",
        "_id": "4nbUOoMB4GR_1Dmryj8O",
        "_score": 1.0,
        "_source": {
            "feature1": [
                "value1",
                "value1",
                "value2",
                "value2",
                "value3"
            ]
        }
    }
    ]
    ## Configurations of Elasticsearch Reader
    "parameter": {
      "column":[
            "feature1[]"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ## Destination: one column of a row
    "value1,value2,value3"

Scenario 5: Merge multiple properties into one property

  • Description: Multiple properties are selectively processed, and the first property that has values is returned. If no value exists, null is returned.

  • Configuration format: property 1|property 2|...

    You can use this method if the value that you specify for the column parameter contains vertical bars (|).

  • Configuration in the code editor:

    "multi":{    
        "multi":true
    }
    Note

    This configuration is not supported in the codeless UI.

  • Example:

    ## Source: raw data in an Elasticsearch cluster
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "v3ShOoMB4GR_1DmrZN22",
            "_score": 1.0,
            "_source": {
                "feature1": "feature1",
                "feature2": [
                    1,
                    2,
                    3
                ],
                "feature3": {
                    "child": "feature3"
                }
            }
        }]
    
    ## Configurations of Elasticsearch Reader
    "parameter": {
      "column":[
            "feature1|feature2|feature3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ## Destination: one column of a row
    "feature1"

Scenario 6: Select properties from multiple properties for processing

  • Description: If you select properties from multiple properties for processing, the first property that has values is returned. If no value exists, null is returned.

  • Configuration format: property 1|property 2|...

    This method is used if the value that you specify for the column parameter contains vertical bars (|).

  • Configuration in the code editor:

    "multi":{
        "multi":true
    }
    Note

    This configuration is not supported in the codeless UI.

  • Example:

    ## Source: raw data in an Elasticsearch cluster
    "hits": [
        {
            "_index": "mutiltest_1",
            "_type": "_doc",
            "_id": "v3ShOoMB4GR_1DmrZN22",
            "_score": 1.0,
            "_source": {
                "feature1": "feature1",
                "feature2": [
                    1,
                    2,
                    3
                ],
                "feature3": {
                    "child": "feature3"
                }
            }
        }]
    ## Configurations of Elasticsearch Reader
    "parameter": {
      "column":[
            "feature1,feature2,feature3"
      ],
      "multi":{
            "multi":true
        }
    }
    
    ## Destination: one column of a row
    "feature1,[1,2,3],{"child":"feature3"}"

Appendix: Write data to Elasticsearch as arrays

You can use one of the following methods to configure Elasticsearch Writer to write data to Elasticsearch as arrays.

  • Enable Elasticsearch Writer to parse source data in the JSON format

    For example, the value of a source field is "[1,2,3,4,5]". If you want to enable Elasticsearch Writer to write the value to Elasticsearch as an array, you can set the json_array parameter to true in the code of the synchronization task. This way, Elasticsearch Writer writes the source data record to Elasticsearch as an array.

    "parameter" : {
      {
        "name":"docs_1",
        "type":"keyword",
        "json_array":true
      }
    }
  • Enable Elasticsearch Writer to parse source data based on a specified delimiter

    For example, the value of a source field is "1,2,3,4,5". If you want to enable Elasticsearch Writer to write the value to Elasticsearch as an array, you can set the splitter parameter to a comma (,) in the code of the synchronization task. This way, Elasticsearch Writer parses the value based on the delimiter and writes the value to Elasticsearch as an array.

    Note

    A synchronization task supports only one type of delimiter. You cannot specify different delimiters for different fields that you want to write to Elasticsearch as arrays. For example, you cannot specify a comma (,) as a delimiter for the source field col1="1,2,3,4,5" and a hyphen (-) as a delimiter for the source field col2="6-7-8-9-10".

    "parameter" : {
          "column": [
            {
              "name": "docs_2",
              "array": true,
              "type": "long"
            }
          ],
          "splitter":","// You must configure the splitter parameter at the same level as the column parameter. 
    }

References

For information about data source types supported by Data Integration, see Supported data source types and synchronization operations.