This topic describes the parameters that are supported by Elasticsearch Writer and how to configure Elasticsearch Writer by using the codeless user interface (UI) and code editor.

Supported Elasticsearch versions

DataWorks allows you to add only Alibaba Cloud Elasticsearch V5.X, V6.X, and V7.X clusters as data sources. Self-managed Elasticsearch clusters are not supported.

Supported data types

Data typeElasticsearch Reader for batch data readElasticsearch Writer for batch data writeElasticsearch Writer for real-time data write
binarySupported Supported Supported
booleanSupported Supported Supported
keywordSupported Supported Supported
constant_keyword Not supportedNot supportedNot supported
wildcardNot supportedNot supportedNot supported
longSupported Supported Supported
integerSupported Supported Supported
shortSupported Supported Supported
byteSupported Supported Supported
doubleSupported Supported Supported
floatSupported Supported Supported
half_floatNot supportedNot supportedNot supported
scaled_floatNot supportedNot supportedNot supported
unsigned_longNot supportedNot supportedNot supported
dateSupported Supported Supported
date_nanosNot supportedNot supportedNot supported
aliasNot supportedNot supportedNot supported
objectSupported Supported Supported
flattenedNot supportedNot supportedNot supported
nestedSupported Supported Supported
joinNot supportedNot supportedNot supported
integer_rangeSupported Supported Supported
float_rangeSupported Supported Supported
long_rangeSupported Supported Supported
double_rangeSupported Supported Supported
date_rangeSupported Supported Supported
ip_rangeNot supportedSupported Supported
ipSupported Supported Supported
versionSupported Supported Supported
murmur3Not supportedNot supportedNot supported
aggregate_metric_doubleNot supportedNot supportedNot supported
histogramNot supportedNot supportedNot supported
textSupported Supported Supported
annotated-textNot supportedNot supportedNot supported
completionSupported Not supportedNot supported
search_as_you_typeNot supportedNot supportedNot supported
token_countSupported Not supportedNot supported
dense_vectorNot supportedNot supportedNot supported
rank_featureNot supportedNot supportedNot supported
rank_featuresNot supportedNot supportedNot supported
geo_pointSupported Supported Supported
geo_shapeSupported Supported Supported
pointNot supportedNot supportedNot supported
shapeNot supportedNot supportedNot supported
percolatorNot supportedNot supportedNot supported
stringSupported Supported Supported

Background information

Elasticsearch Writer can write data to Elasticsearch V5.X data sources by using the shared resource group for Data Integration and to Elasticsearch V5.X, V6.X, and V7.X data sources by using exclusive resource groups for Data Integration. For information about exclusive resource groups for Data Integration, see Create and use an exclusive resource group for Data Integration.

Elasticsearch is an open source service that is released based on the Apache License. The service is a popular search engine for enterprises. Elasticsearch is a distributed search and analytics engine built on top of Apache Lucene. The following description provides the mappings between the core concepts of Elasticsearch and those of a relational database:
Relational database instance -> Database -> Table -> Row -> Column
Elasticsearch        -> Index              -> Types       -> Documents       -> Fields

An Elasticsearch cluster can contain multiple indexes (databases). Each index can contain multiple types (tables). Each type can contain multiple documents (rows). Each document can contain multiple fields (columns). Elasticsearch Writer obtains data records from a reader and uses the RESTful API of Elasticsearch to write the data records to Elasticsearch at a time.

Parameters

ParameterDescriptionRequiredDefault value
datasourceThe name of the data source. If no data sources are available, add an Elasticsearch cluster to DataWorks as a data source. For more information, see Add an Elasticsearch data source. YesNo default value
indexThe name of the index in the destination Elasticsearch cluster. YesNo default value
indexTypeThe name of the index type in the destination Elasticsearch cluster. NoElasticsearch
cleanupSpecifies whether to delete the existing data from the index. Valid values:
  • true: Deletes the original index and creates an index whose name is the same name as the original index. This way, the existing data in the index is deleted.
  • false: Retains the existing data in the index.
Nofalse
batchSizeThe number of data records to write at a time. No1,000
trySizeThe maximum number of retries that can be performed after a failure occurs. No30
timeoutThe connection timeout of the client. No600,000
discoverySpecifies whether to enable node discovery.
  • true: Enables the node discovery mechanism. Data Integration connects to a random node in the Elasticsearch cluster. The server list in the client is polled and regularly updated.
  • false: Disables the node discovery mechanism. In this case, Data Integration connects to the Elasticsearch cluster.
Nofalse
compressionSpecifies whether to enable compression for an HTTP request. Notrue
multiThreadSpecifies whether to use multiple threads for an HTTP request. Notrue
ignoreWriteErrorSpecifies whether to ignore write errors and proceed with data write operations without retries. Nofalse
ignoreParseErrorSpecifies whether to ignore format parsing errors and proceed with data write operations. Notrue
aliasThe alias feature of Elasticsearch is similar to the view feature of a database. For example, if you create an alias named my_index_alias for the index my_index, the operations that are performed on my_index_alias also take effect on my_index.

If you configure the alias parameter, the alias that you specify is created for the index after data is written to the index.

NoNo default value
aliasModeThe mode in which an alias is added after data is written to the index. Valid values: append and exclusive.
  • If you set the aliasMode parameter to append, an alias is added for the index. One alias maps multiple indexes.
  • If you set the aliasMode parameter to exclusive, the existing alias of the index is deleted and a new alias is added for the index. One alias maps one index.

Elasticsearch Writer can convert aliases to actual index names. You can use aliases to migrate data from one index to another index, search for data across multiple indexes in a unified manner, and create a view on a subset of data in an index.

Noappend
settingsThe settings of the index. The settings must follow the specifications of open source Elasticsearch. NoNo default value
columnThe fields of the document. The parameters for each field include basic parameters such as name and type, and advanced parameters such as analyzer, format, and array.
Elasticsearch supports the following types of fields:
- id  // The id type corresponds to the _id type in Elasticsearch and can be considered as the unique primary key. Data that has the same ID is overwritten and not indexed. 
- string
- text
- keyword
- long
- integer
- short
- byte
- double
- float
- date
- boolean
- binary
- integer_range
- float_range
- long_range
- double_range
- date_range
- geo_point
- geo_shape
- ip
- token_count
- array
- object
- nested
The following information describes the field types:
  • If the field type is text, you can configure the analyzer, norms, and index_options parameters. Example:
    {
        "name": "col_text",
        "type": "text",
        "analyzer": "ik_max_word"
        }
  • If the field type is date, you can configure the column parameter to enable Elasticsearch Writer to parse source data.
    • If you want to write data that is read by a Reader plug-in from source fields to the fields in the Elasticsearch document, you need to configure the following settings:
      • You must configure origin:true. This way, data in source fields can be directly written to the fields in the Elasticsearch document.
      • You must configure the format parameter. This indicates that when Elasticsearch Writer creates mappings between source fields and destination fields, the format parameter is required for destination fields. Sample code:
          {
             "parameter":{
               "column":[{
                   "name": "col_date",
                   "type": "date",
                   "format": "yyyy-MM-dd HH:mm:ss",
                   "origin": true
                }]
           }
        }
    • If you want to use Data Integration to convert the time zone, you can configure the Timezone parameter. Sample code:
        {
           "parameter" :{
             "column": [{
                "name": "col_date",
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss",
               "Timezone": "UTC"
             }]
         }
      }
  • If the field type is geo_shape, you can configure the tree (geohash or quadtree) and precision parameters. Sample code:
    {
        "name": "col_geo_shape",
        "type": "geo_shape",
        "tree": "quadtree",
        "precision": "10m"
        }

If you want to define attributes related to the Elasticsearch cluster in addition to the field type when you configure the column parameter, you can configure the other_params parameter in column to define the attributes. The other_params parameter is used when Elasticsearch Writer updates the mapping configurations of the Elasticsearch cluster.

 {
   "name": "guid",
   "other_params":
    {
       "doc_values": false
      },
    "type": "text"
  }

If you want to write source data to Elasticsearch as arrays, you can enable Elasticsearch Writer to parse the source data in the JSON format or based on a specified delimiter. For more information, see Appendix: Write data to Elasticsearch as arrays.

YesNo default value
dynamicSpecifies whether to use the dynamic mapping mechanism of Elasticsearch to establish mappings for fields that are written to the index.
  • true: Uses the dynamic mapping mechanism of Elasticsearch to establish mappings.
  • false: Establishes field mappings and updates the mapping configurations of the index based on the setting of the column parameter. If you leave the dynamic parameter empty, false is used as its value.

In Elasticsearch V7.X, the default value of the type parameter is _doc. If you set this parameter to true to use the dynamic mapping mechanism, set the type parameter to _doc and the esVersion parameter to 7.

You must add the following parameter configuration that specifies the version information to the code: "esVersion": "7".

Nofalse
actionTypeThe type of the action that can be performed when data is written to the destination Elasticsearch cluster. Data Integration supports only the following types of actions: index and update. Default value: index.
  • index: Elasticsearch Writer uses Index.Builder of an Elasticsearch SDK to construct a request for writing multiple data records at a time. In index mode, Elasticsearch Writer first checks whether an ID is specified for the document that you want to insert.
    • If no ID is specified, Elasticsearch Writer generates a unique ID. In this case, the document is directly inserted into the destination Elasticsearch cluster.
    • If an ID is specified, the existing document is replaced with the document that you want to insert. You cannot modify specific fields in the document.
      Note The replace operation in this case is different from the replace operation in Elasticsearch in which specific fields can be modified.
  • update: Elasticsearch Writer updates the existing document based on the ID that you specify. If the specified ID does not exist in the index, a new document is inserted. If the specified ID exists in the index, Elasticsearch Writer updates the fields that are specified in the column parameter to the document. Other fields in the document remain unchanged. Each time Elasticsearch Writer updates the document, Elasticsearch Writer obtains the information of the whole document to update specific fields in the document. In update mode, filter conditions are not supported, and the update operation can be performed based only on a specified ID. Each time Elasticsearch Writer updates the document, Elasticsearch Writer needs to obtain the information of the document, which greatly affects data synchronization performance.
    Note If you set the actionType parameter to update, you must configure the primaryKeyInfo parameter.
Noindex
primaryKeyInfoSpecifies the value assignment method of the _id column that is used as the primary key to write data to Elasticsearch.
  • Business primary key (pk): The value of the _id column is a specific field.
    "parameter": {
        "primaryKeyInfo": {
            "type": "pk",
            "column": ["id"]}
    }
  • Composite primary key (specific): The value of the _id column is obtained by concatenating the values of specific fields. The delimiter that is used to concatenate the values is the same as the delimiter that is specified by using the fieldDelimiter parameter.
    Note The names of the fields whose values you want to concatenate are the names of fields to which Elasticsearch Writer writes data. When you configure the batch synchronization node by using the codeless UI, the system extracts only fields that exist in the index as the value of the Primary key value method parameter.
    "parameter": {
        "primaryKeyInfo": {
            "type": "specific",
            "fieldDelimiter": ",",
            "column": ["col1","col2"]}
    }
  • No primary key (nopk): The value of the _id column is automatically generated when data is written to Elasticsearch.
    "primaryKeyInfo": {
        "type": "nopk"
    }
Yesspecific
esPartitionColumnSpecifies whether to enable partitioning for the index when data is written to Elasticsearch. This parameter is used to change the routing setting of the Elasticsearch cluster.
  • true: Enables partitioning. In this case, the value of a specific column is used to insert a document to a specific shard or update a document in a specific shard. If you set this parameter to true, you must specify the partition key column.
    {    "esPartitionColumn": [
            {
                "name":"col1",
                "comment":"xx",
                "type": "STRING"
                }
            ],
        }
  • false: Disables partitioning. If you leave this parameter empty, the values of the _id column are used to evenly distribute documents to shards. This prevents data skew.
Nofalse
enableWriteNullSpecifies whether source fields whose values are NULL can be synchronized to Elasticsearch. Valid values:
  • true: Source fields whose values are NULL can be synchronized to Elasticsearch. After the fields are synchronized, the values of the fields in Elasticsearch are also NULL.
  • false: Source fields whose values are NULL cannot be synchronized to Elasticsearch. The fields are not displayed in Elasticsearch.
Nofalse

Configure Elasticsearch Writer by using the code editor

For information about how to configure a data synchronization node by using the code editor, see Configure a batch synchronization node by using the code editor.

In the following code, a data synchronization node is configured to write data to Elasticsearch. For more information about the parameters, see the preceding parameter description.
{
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":1, // The maximum number of parallel threads. 
            "mbps":"12" // The maximum transmission rate.
        }
    },
    "steps": [
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {

            },
            "stepType": "stream"
        },
        {
            "category": "writer",
            "name": "Writer",
            "parameter": {
                "datasource": "xxx",
                "index": "test-1",
                "type": "default",
                "cleanup": true,
                "settings": {
                        "number_of_shards": 1,
                        "number_of_replicas": 0
                },
                "discovery": false,
                "primaryKeyInfo": {
                     "type": "pk",    
                     "fieldDelimiter": ",",
                     "column": []
                    },
                "batchSize": 1000,
                "dynamic": false,
                "esPartitionColumn": [
                      {
                         "name": "col1",  
                         "comment": "xx", 
                         "type": "STRING" 
                         }
                     ],
                "column": [
                    {
                        "name": "pk",
                        "type": "id"
                    },
                    {
                        "name": "col_ip",
                        "type": "ip"
                    },
                    {
                        "name": "col_array",
                        "type": "long",
                        "array": true,
                    },
                    {
                        "name": "col_double",
                        "type": "double"
                    },
                    {
                        "name": "col_long",
                        "type": "long"
                    },
                    {
                        "name": "col_integer",
                        "type": "integer"
                    {
                        "name": "col_keyword",
                        "type": "keyword"
                    },
                    {
                        "name": "col_text",
                        "type": "text",
                        "analyzer": "ik_max_word",
                        "other_params":
                            {
                                "doc_values": false
                            },
                    },
                    {
                        "name": "col_geo_point",
                        "type": "geo_point"
                    },
                    {
                        "name": "col_date",
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss"
                    },
                    {
                        "name": "col_nested1",
                        "type": "nested"
                    },
                    {
                        "name": "col_nested2",
                        "type": "nested"
                    },
                    {
                        "name": "col_object1",
                        "type": "object"
                    },
                    {
                        "name": "col_object2",
                        "type": "object"
                    },
                    {
                        "name": "col_integer_array",
                        "type": "integer",
                        "array": true
                    },
                    {
                        "name": "col_geo_shape",
                        "type": "geo_shape",
                        "tree": "quadtree",
                        "precision": "10m"
                    }
                ]
            },
            "stepType": "elasticsearch"
        }
    ],
    "type": "job",
    "version": "2.0"
}
Note A connection failure may occur if you use the shared resource group for Data Integration to connect to an Elasticsearch cluster that is deployed in a virtual private cloud (VPC). To write data to an Elasticsearch cluster that is deployed in a VPC, use an exclusive resource group for Data Integration. For more information about how to create an exclusive resource group for Data Integration, see Exclusive resource groups for Data Integration.Create and use a custom resource group for Data Integration

Configure Elasticsearch Writer by using the codeless UI

Open the created data synchronization node and configure the node. For information about how to configure a data synchronization node by using the codeless UI, see Configure a batch synchronization node by using the codeless UI. In this example, a data synchronization node that is used to synchronize data to Elasticsearch is configured by using the codeless UI.

  1. Configure data sources. Configure Source and Target for the data synchronization node. Elasticsearch Writer
    ParameterDescription
    ConnectionThe name of the data source to which you want to write data. This parameter is equivalent to the datasource parameter that is described in the Parameters section.
    IndexThe name of the index to which you want to write data. This parameter is equivalent to the index parameter that is described in the Parameters section.
    Delete the original indexSpecifies whether to delete the existing data in the index. This parameter is equivalent to the cleanup parameter that is described in the Parameters section.
    Write ModeThe write mode. Valid values: index and update. This parameter is equivalent to the ActionType parameter that is described in the Parameters section.
    ElasticSearch auto mappingSpecifies whether to use the dynamic mapping mechanism of Elasticsearch to establish mappings. This parameter is equivalent to the dynamic parameter that is described in the Parameters section.
    Primary key value methodSpecifies the value assignment method of the _id column that is used as the primary key to write data to Elasticsearch. This parameter is equivalent to the primaryKeyInfo parameter that is described in the Parameters section.
    Write batch sizeThe number of data records to write at a time. This parameter is equivalent to the batchSize parameter that is described in the Parameters section.
    Enable partitionSpecifies whether to enable partitioning for the index when data is written to Elasticsearch. This parameter is equivalent to the esPartitionColumn parameter that is described in the Parameters section.
    Enable node discoverySpecifies whether to enable node discovery. This parameter is equivalent to the discovery parameter that is described in the Parameters section.
    SettingsThe settings of the index. This parameter is equivalent to the settings parameter that is described in the Parameters section.
  2. Configure field mappings. This operation is equivalent to setting the column parameter that is described in the Parameters section. Fields in the source on the left have a one-to-one mapping with fields in the destination on the right. Field mappings
  3. Configure channel control policies.Channel control
    ParameterDescription
    Expected Maximum ConcurrencyThe maximum number of parallel threads that the data synchronization node uses to read data from the source or write data to the destination. You can configure the parallelism for the data synchronization node on the codeless UI.
    Bandwidth ThrottlingSpecifies whether to enable throttling. You can enable throttling and specify a maximum transmission rate to prevent heavy read workloads on the source. We recommend that you enable throttling and set the maximum transmission rate to an appropriate value based on the configurations of the source.
    Dirty Data Records AllowedThe maximum number of dirty data records allowed.
    Distributed ExecutionThe distributed execution mode that can split the node into slices and distributes them to multiple Elastic Compute Service (ECS) instances for parallel running. This speeds up synchronization. If you use a large number of parallel threads to run your data synchronization node in distributed execution mode, excessive access requests are sent to the data sources. Therefore, before you use the distributed execution mode, you must evaluate the access loads on the data sources. You can enable the distributed execution mode only if you use an exclusive resource group for Data Integration to run your data synchronization node. For more information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration and Create and use an exclusive resource group for Data Integration.

Appendix: Write data to Elasticsearch as arrays

You can use one of the following methods to configure Elasticsearch Writer to write data to Elasticsearch as arrays.

  • Enable Elasticsearch Writer to parse source data in the JSON format

    For example, the value of a source field is "[1,2,3,4,5]". If you want to enable Elasticsearch Writer to write the value to Elasticsearch as an array, you can set the json_array parameter to true in the code of the node. This way, Elasticsearch Writer writes the source data record to Elasticsearch as an array.

    "parameter" : {
      {
        "name":"docs_1",
        "type":"keyword",
        "json_array":true
      }
    }
  • Enable Elasticsearch Writer to parse source data based on a specified delimiter

    For example, the value of a source field is "1,2,3,4,5". If you want to enable Elasticsearch Writer to write the value to Elasticsearch as an array, you can set the splitter parameter to a comma (,) in the code of the node. This way, Elasticsearch Writer parses the value based on the delimiter and writes the value to Elasticsearch as an array.

    Note A data synchronization node supports only one type of delimiter. You cannot specify different delimiters for different fields that you want to write to Elasticsearch as arrays. For example, you cannot specify a comma (,) as a delimiter for the source field col1="1,2,3,4,5" and a hyphen (-) as a delimiter for the source field col2="6-7-8-9-10".
    "parameter" : {
          "column": [
            {
              "name": "docs_2",
              "array": true,
              "type": "long"
            }
          ],
          "splitter":","// You must configure the splitter parameter at the same level as the column parameter. 
    }