The Elasticsearch data source provides a bidirectional channel to read data from and write data to Elasticsearch. This topic describes the data synchronization capabilities that DataWorks supports for Elasticsearch.
Usage notes
Elasticsearch 5.x is supported on public resource groups. Elasticsearch 5.x, 6.x, 7.x, and 8.x are supported on serverless resource groups (recommended) and exclusive resource groups for Data Integration.
For more information about serverless resource groups, see Use a serverless resource group.
For more information about exclusive resource groups for Data Integration, see Use an exclusive resource group for Data Integration.
Elasticsearch is a distributed search and data analysis engine based on Lucene. As a mainstream, enterprise-level search engine, it is an open source product that complies with the Apache open source license. The following table maps the core concepts of Elasticsearch to those of a relational database.
Elasticsearch | Relational database |
Elasticsearch (instance) | Relational DB (instance) |
Index | Databases |
Types | Tables |
Documents | Rows |
Fields | Columns |
An Elasticsearch instance can contain multiple indexes (databases). Each index can contain multiple types (tables). Each type can contain multiple documents (rows). Each document can contain multiple fields (columns). The Elasticsearch writer plug-in uses the REST API of Elasticsearch to write data read from a reader to Elasticsearch in batches.
Supported versions
DataWorks supports only Alibaba Cloud Elasticsearch 5.x, 6.x, 7.x, and 8.x data sources. You cannot add self-managed Elasticsearch data sources.
Limitations
The following limits apply when you perform offline read and write operations on an Elasticsearch data source:
Elasticsearch Reader fetches shard information from the server for data synchronization. You must ensure that the shards on the server are active during data synchronization. Otherwise, data inconsistency may occur.
If you use Elasticsearch 6.x or later, you must use serverless resource groups (recommended) or exclusive resource groups for Data Integration.
Fields of the scaled_float type cannot be synchronized.
Indexes that contain the keyword
$refin fields cannot be synchronized.
Supported field types
Type | Offline read (Elasticsearch Reader) | Offline write (Elasticsearch Writer) | Real-time write |
binary | Supported | Supported | Supported |
boolean | Supported | Support | Supported |
keyword | Supported | Supported | Supported |
constant_keyword | Not supported | Not supported | Not supported |
wildcard | Not supported | Not supported | Not supported |
long | Supported | Supported | Supported |
integer | Supported | Supported | Supported |
short | Supported | Supported | Supported |
byte | Supported | Supported | Supported |
double | Supported | Supported | Supported |
float | Support | Supported | Supported |
half_float | Not supported | Not supported | Not supported |
scaled_float | Not supported | Not supported | Not supported |
unsigned_long | Not supported | Not supported | Not supported |
date | Supported | Supported | Supported |
date_nanos | Not supported | Not supported | Not supported |
alias | Not supported | Not supported | Not supported |
object | Support | Supported | Supported |
flattened | Not supported | Not supported | Not supported |
nested | Supported | Support | Supported |
join | Not supported | Not supported | Not supported |
integer_range | Supported | Support | Supported |
float_range | Supported | Supported | Supported |
long_range | Supported | Supported | Supported |
double_range | Supported | Supported | Supported |
date_range | Supported | Supported | Supported |
ip_range | Not supported | Supported | Supported |
ip | Support | Supported | Supported |
version | Support | Supported | Supported |
murmur3 | Not supported | Not supported | Not supported |
aggregate_metric_double | Not supported | Not supported | Not supported |
histogram | Not supported | Not supported | Not supported |
text | Supported | Supported | Supported |
annotated-text | Not supported | Not supported | Not supported |
completion | Supported | Not supported | Not supported |
search_as_you_type | Not supported | Not supported | Not supported |
token_count | Supported | Not supported | Not supported |
dense_vector | Not supported | Not supported | Not supported |
rank_feature | Not supported | Not supported | Not supported |
rank_features | Not supported | Not supported | Not supported |
geo_point | Supported | Supported | Supported |
geo_shape | Supported | Supported | Supported |
point | Not supported | Not supported | Not supported |
shape | Not supported | Not supported | Not supported |
percolator | Not supported | Not supported | Not supported |
string | Support | Support | Supported |
How it works
Elasticsearch Reader works as follows:
It uses the _search scroll slice method of Elasticsearch. The slice feature works in conjunction with the multi-threaded sharding mechanism of Data Integration tasks.
It transforms data types based on the mapping configuration in Elasticsearch.
For more information, see the official Elasticsearch documentation.
Elasticsearch Reader fetches shard information from the server for data synchronization. You must ensure that the shards on the server are active during data synchronization. Otherwise, data inconsistency may occur.
Basic configuration
You must remove the comments before you run the code.
{
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
},
"setting":{
"errorLimit":{
"record":"0" // The number of error records.
},
"jvmOption":"",
"speed":{
"concurrent":3,// The number of concurrent threads.
"throttle":true,//
"mbps":"12",// Throttling. 1 Mbps = 1 MB/s.
}
},
"steps":[
{
"category":"reader",
"name":"Reader",
"parameter":{
"column":[ // The columns to read.
"id",
"name"
],
"endpoint":"", // The endpoint.
"index":"", // The index.
"password":"", // The password.
"scroll":"", // The scroll flag.
"search":"", // The query parameter. This is the same as the query content of Elasticsearch. The _search API is used and renamed to search.
"type":"default",
"username":"" // The username.
},
"stepType":"elasticsearch"
},
{
"stepType": "elasticsearch",
"parameter": {
"column": [ // The columns to write.
{
"name": "id",
"type": "integer"
},
{
"name": "name",
"type": "text"
}
],
"index": "test", // The destination index.
"indexType": "", // The type of the destination index. Leave this empty for Elasticsearch 7.x.
"actionType": "index", // The write mode.
"cleanup": false, // Specifies whether to reindex.
"datasource": "test", // The name of the data source.
"primaryKeyInfo": { // The method to obtain the primary key value.
"fieldDelimiterOrigin": ",",
"column": [
"id"
],
"type": "specific",
"fieldDelimiter": ","
},
"dynamic": false, // Dynamic mapping.
"batchSize": 1024 // The number of documents to write in a batch.
},
"name": "Writer",
"category": "writer"
}
],
"type":"job",
"version":"2.0" // The version number.
}Advanced features
Full data pull
You can pull all content of a document in Elasticsearch as a single field. For more information about the configuration, see Scenario 1: Full data pull.
Extraction of semi-structured data to structured data
Category
Description
References
Background
Data in Elasticsearch often has unfixed fields, Chinese field names, and deep nesting. To meet downstream business requirements for data computing and storage, DataWorks provides a solution to transform this semi-structured data into structured data.
-
How it works
The path retrieval feature of JSON tools flattens nested JSON data from Elasticsearch into a one-dimensional structure. This data is then mapped to structured data tables. The composite data structure from Elasticsearch is split into multiple structured data tables.
-
Solution
If the JSON data is nested, you can use a path to resolve it.
property
property.sub-property
property[0].sub-property
Scenario 2: Synchronize properties of nested or object fields
If a one-to-many relationship exists, you can traverse the data and split it into different tables and rows.
property[*].sub-property
You can merge the content of a string array into a single property and remove duplicates.
property[]
You can merge multiple properties into a single property.
property1,property2
You can selectively process multiple properties.
property1|property2
Add a data source
Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data Source Management. You can view the infotips of parameters in the DataWorks console to understand the meanings of the parameters when you add a data source.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
Configure an offline synchronization task for a single table
For more information about the procedure, see Configure a batch synchronization task using the codeless UI and Configuration in the code editor.
For all parameters and a script demo for the code editor, see Appendix 1: Script demos and parameter descriptions.
Configure a real-time write task for a single table
For more information about the procedure, see Configure a real-time synchronization task in DataStudio.
Configure an offline write task for an entire database or a full/incremental real-time write task for a single table or an entire database
For more information about the procedure, see Configure a real-time synchronization task for an entire database.
Appendix 1: Script demos and parameter descriptions
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configuration in the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
Reader script demo
{
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
},
"setting":{
"errorLimit":{
"record":"0" // The number of error records.
},
"jvmOption":"",
"speed":{
"concurrent":3,
"throttle":false
}
},
"steps":[
{
"category":"reader",
"name":"Reader",
"parameter":{
"column":[ // The columns to read.
"id",
"name"
],
"endpoint":"http://es-cn-xxx.elasticsearch.aliyuncs.com:9200", // The endpoint.
"index":"aliyun_es_xx", // The index.
"password":"*******", // The password.
"multiThread":true,
"scroll":"5m", // The scroll flag.
"pageSize":5000,
"connTimeOut":600000,
"readTimeOut":600000,
"retryCount":30,
"retrySleepTime":"10000",
"search":{
"range":{
"gmt_modified":{
"gte":0
}
}
}, // The query parameter. This is the same as the query content of Elasticsearch. The _search API is used and renamed to search.
"type":"doc",
"username":"aliyun_di" // The username.
},
"stepType":"elasticsearch"
},
{
"category":"writer",
"name":"Writer",
"parameter":{ },
"stepType":"stream"
}
],
"type":"job",
"version":"2.0" // The version number.
}Reader script parameters
Parameter | Description | Required | Default value |
datasource | The name of the data source. You can add a data source in the code editor. The value of this parameter must be the same as the name of the added data source. | Yes | None |
index | The name of the index in Elasticsearch. | Yes | None |
type | The index type in Elasticsearch. | No | Index name |
search | The query parameter of Elasticsearch. | Yes | None |
pageSize | The number of data entries to read at a time. | No | 100 |
scroll | The paging parameter of Elasticsearch. It specifies the duration for which a scroll is kept alive.
| Yes | None |
strictMode | Specifies whether to read data from Elasticsearch in strict mode. If a shard.failed error occurs in Elasticsearch, the read operation stops to prevent a small amount of data from being read. | No | true |
sort | The field by which to sort the results. | No | None |
retryCount | The number of retries after a failure. | No | 300 |
connTimeOut | The connection timeout period for the client. | No | 600,000 |
readTimeOut | The read timeout period for the client. | No | 600,000 |
multiThread | Specifies whether to use multiple threads for HTTP requests. | No | true |
preemptiveAuth | Specifies whether to use preemptive authentication for HTTP requests. | No | false |
retrySleepTime | The interval between retries after a failure. | No | 1000 |
discovery | Specifies whether to enable node discovery.
| No | false |
compression | Specifies whether to use GZIP to compress the request body. If you use this feature, you must enable the http.compression setting on the Elasticsearch node. | No | false |
dateFormat | If a field to be synchronized is of the date type and the mapping of this field does not have a format configuration, you must configure the dateFormat parameter. The configuration is in the following format: | No | None |
full | Specifies whether to synchronize the full content of a document as a single field to the destination. The queried data from Elasticsearch is treated as a single field. For more information about the configuration, see Scenario 1: Full data pull. | No | None |
multi | This is an advanced feature with five usage scenarios. It has two sub-properties: | No | None |
Writer script demo
{
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle":true,// If throttle is set to false, the mbps parameter does not take effect, which means no throttling. If throttle is set to true, throttling is enabled.
"concurrent":1, // The number of concurrent jobs.
"mbps":"12"// Throttling. 1 Mbps = 1 MB/s.
}
},
"steps": [
{
"category": "reader",
"name": "Reader",
"parameter": {
},
"stepType": "stream"
},
{
"category": "writer",
"name": "Writer",
"parameter": {
"datasource":"xxx",
"index": "test-1",
"type": "default",
"cleanup": true,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"discovery": false,
"primaryKeyInfo":{
"type":"pk",
"fieldDelimiter":",",
"column":[]
},
"batchSize": 1000,
"dynamic":false,
"esPartitionColumn":[
{
"name":"col1",
"comment":"xx",
"type":"STRING"
}
],
"column": [
{
"name": "pk",
"type": "id"
},
{
"name": "col_ip",
"type": "ip"
},
{
"name": "col_array",
"type": "long",
"array": true
},
{
"name": "col_double",
"type": "double"
},
{
"name": "col_long",
"type": "long"
},
{
"name": "col_integer",
"type": "integer"
},
{
"name": "col_keyword",
"type": "keyword"
},
{
"name": "col_text",
"type": "text",
"analyzer": "ik_max_word",
"other_params":
{
"doc_values": false
}
},
{
"name": "col_geo_point",
"type": "geo_point"
},
{
"name": "col_date",
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
{
"name": "col_nested1",
"type": "nested"
},
{
"name": "col_nested2",
"type": "nested"
},
{
"name": "col_object1",
"type": "object"
},
{
"name": "col_object2",
"type": "object"
},
{
"name": "col_integer_array",
"type": "integer",
"array": true
},
{
"name": "col_geo_shape",
"type": "geo_shape",
"tree": "quadtree",
"precision": "10m"
}
]
},
"stepType": "elasticsearch"
}
],
"type": "job",
"version": "2.0"
}An Elasticsearch instance in a VPC cannot be accessed from a default resource group due to network isolation. You must use a serverless resource group (recommended) or an exclusive resource group for Data Integration to access the VPC for data synchronization. For more information about how to add a resource group, see Serverless resource groups.
Writer script parameters
Parameter | Description | Required | Default value |
datasource | The Elasticsearch data source to which you want to synchronize data. If you have not created the data source in DataWorks, create one. For more information, see Configure an Elasticsearch data source. | Yes | None |
index | The name of the index in Elasticsearch. | Yes | None |
indexType | The type of the index in Elasticsearch. | No | Elasticsearch |
cleanup | Specifies whether to delete data from an existing index.
| No | false |
batchSize | The number of documents to insert into Elasticsearch at a time for a synchronization task. | No | 1,000 |
trySize | The number of retries after a failed attempt to write data to Elasticsearch. | No | 30 |
timeout | The timeout period for the client. | No | 600,000 |
discovery | Specifies whether to enable the node discovery feature for the task.
| No | false |
compression | Enables compression for HTTP requests. | No | true |
multiThread | Specifies whether to use multiple threads for HTTP requests. | No | true |
ignoreWriteError | Ignores write errors, does not retry, and continues to write. | No | false |
ignoreParseError | Ignores data format parsing errors and continues to write. | No | true |
alias | An alias in Elasticsearch is similar to a view in a database. For example, if you create an alias named my_index_alias for an index named my_index, operations on my_index_alias are the same as operations on my_index. If you configure an alias, an alias is created for the specified index after data is imported. | No | None |
aliasMode | The mode in which an alias is added after data is imported. Valid values: append and exclusive.
The alias is then converted to the actual index name. Aliases can be used for index migration and unified queries of multiple indexes. They can also be used to implement the functionality of views. | No | append |
settings | The settings for creating an index. This is consistent with the official Elasticsearch settings. | No | None |
column | The column parameter is used to configure the field information for multiple documents. For each field, you can configure basic settings such as name and type, and extension settings such as Analyzer, Format, and Array. The following field types are supported by Elasticsearch. The following section describes the column types:
To configure properties other than type in the column, you can use the other_params parameter. This parameter is configured in the column and is used to describe Elasticsearch properties other than type in the column when updating mappings. If you want to write source data to Elasticsearch as an array, you can parse the source data in JSON format or by specifying a separator. For more information about the configuration, see Appendix 2: Write data to Elasticsearch in an array format. | Yes | None |
dynamic | Specifies whether the synchronization task adds a mapping for a non-existent field found in a document using the dynamic mapping mechanism of Elasticsearch.
The default type for Elasticsearch 7.x is _doc. When using automatic mappings in Elasticsearch, set _doc and esVersion to 7. You need to switch to the code editor and add a version parameter: Important If a field mapping error occurs, you can enable this parameter to try to resolve the issue. However, this method may cause field types to be inconsistent with expectations or cause data exceptions. Evaluate the risks based on your data structure before deciding whether to enable it. | No | false |
actionType | The action type for writing data to Elasticsearch. Data Integration supports index and update. The default value of actionType is index.
| No | index |
primaryKeyInfo | Defines the method to obtain the primary key value when writing to Elasticsearch.
| Yes | specific |
esPartitionColumn | Specifies whether to enable partitioning when writing to Elasticsearch. This is used to modify the routing parameter in Elasticsearch.
| No | false |
enableWriteNull | Specifies whether to synchronize null value fields from the source to Elasticsearch. Valid values:
| No | true |
Appendix 2: Write data to Elasticsearch in an array format
You can use the following two methods to write source data to Elasticsearch as an array.
Parse source data in JSON format
For example, if the source data is
"[1,2,3,4,5]", set json_array=true to parse the data. The data is then written to Elasticsearch in an array format."parameter" : { { "name":"docs_1", "type":"keyword", "json_array":true } }Parse source data using a separator
For example, if the source data is
"1,2,3,4,5", set the separator splitter="," to parse the data. The data is then written to Elasticsearch in an array format.NoteA task supports only one type of separator. The splitter parameter is globally unique. You cannot configure different separators for multiple array fields. For example, if the source fields are
col1="1,2,3,4,5"andcol2="6-7-8-9-10", you cannot configure a separate splitter for each field."parameter" : { "column": [ { "name": "docs_2", "array": true, "type": "long" } ], "splitter":","// Note: The splitter configuration is at the same level as the column configuration. }
Appendix 3: Scenarios
Scenario 1: Full data pull
Background: You can pull the query results of a document in Elasticsearch as a single field.
Example:
## Reader: Raw data in Elasticsearch "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "IXgdO4MB4GR_1DmrjTXP", "_score": 1.0, "_source": { "feature1": "value1", "feature2": "value2", "feature3": "value3" } }] ## Data Integration Elasticsearch Reader plug-in configuration "parameter": { "column": [ "content" ], "full":true } ## Writer result: One row and one column are synchronized to the destination. {"_index":"mutiltest_1","_type":"_doc","_id":"IXgdO4MB4GR_1DmrjTXP","_source":{"feature1":"value1","feature2":"value2","feature3":"value3"},"sort":["IXgdO4MB4GR_1DmrjTXP"]}
Scenario 2: Synchronize properties of nested or object fields
Background: When you synchronize properties of Object or nested fields, you can use a path to resolve them.
Configuration:
property
property.sub-property
property[0].sub-property
Script configuration:
"multi":{ "multi":true }NoteConfiguration is not currently available in the codeless UI.
Example:
## Reader: Raw data in Elasticsearch "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "7XAOOoMB4GR_1Dmrrust", "_score": 1.0, "_source": { "level1": { "level2": [ { "level3": "testlevel3_1" }, { "level3": "testlevel3_2" } ] } } } ] ## Data Integration Elasticsearch reader plug-in configuration "parameter": { "column": [ "level1", "level1.level2", "level1.level2[0]", "level1.level2.level3" ], "multi":{ "multi":true } } ## Writer result: One row of data with four columns column1(level1): {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]} column2(level1.level2): [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}] column3(level1.level2[0]): {"level3":"testlevel3_1"} column4(level1.level2.level3): nullNoteIf a parent node of the retrieved node contains an array, the result is null. For example, retrieving `level1.level2.level3` does not report an error, but the synchronization result is null. You must configure the path as `level1.level2[0].level3` or `level1.level2[1].level3`. The `level1.level2[*].level3` format is not currently supported.
Data that contains a period (.) in the key is not supported, such as `{"level1.level2":{"level3":"testlevel3_1"}}`. In this case, the retrieval result for this data entry is null.
Scenario 3: Split array properties into multiple rows
Background: If a one-to-many relationship exists, you can split the array column into multiple rows.
Configuration: property[*].sub-property
Effect: Source data such as `{ "splitKey" :[1,2,3,4,5]}` is split and written to the destination as five separate rows.
Script configuration:
"multi":{ "multi":true, "key": "headers" }NoteIn the codeless UI, configuring Split Multi-row Array Column Name automatically generates an equivalent script configuration.
The value must be a list. Otherwise, an error occurs.
Example:
## Reader: Raw data in Elasticsearch [ { "_index": "lmtestjson", "_type": "_doc", "_id": "nhxmIYMBKDL4VkVLyXRN", "_score": 1.0, "_source": { "headers": [ { "remoteip": "192.0.2.1" }, { "remoteip": "192.0.2.2" } ] } }, { "_index": "lmtestjson", "_type": "_doc", "_id": "wRxsIYMBKDL4VkVLcXqf", "_score": 1.0, "_source": { "headers": [ { "remoteip": "192.0.2.3" }, { "remoteip": "192.0.2.4" } ] } } ] ## Data Integration Elasticsearch reader plug-in configuration { "column":[ "headers[*].remoteip" ] "multi":{ "multi":true, "key": "headers" } } ## Writer result: 4 rows 192.0.2.1 192.0.2.2 192.0.2.3 192.0.2.4
Scenario 4: Deduplicate and merge array properties
Background: You can deduplicate and merge an array property, and then write the result as a string property. The array property can be a sub-property, such as `name1.name2`. The `toString` result is used as the standard for deduplication.
Configuration: `property[]`.
If the column name contains `[]`, deduplication and merging are performed on this property.
Script configuration:
"multi":{ "multi":true }NoteThis parameter cannot be configured in the codeless UI.
Example:
## Reader: Raw data in Elasticsearch "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "4nbUOoMB4GR_1Dmryj8O", "_score": 1.0, "_source": { "feature1": [ "value1", "value1", "value2", "value2", "value3" ] } } ] ## Data Integration Elasticsearch reader plug-in configuration "parameter": { "column":[ "feature1[]" ], "multi":{ "multi":true } } ## Writer result: One row and one column of data "value1,value2,value3"
Scenario 5: Merge and synchronize multiple properties
Background: You can selectively process multiple properties. The first property that has a value is returned. If none of the specified properties exist, null is written.
Configuration: `property1|property2|...`
If the column name contains the `|` separator, multiple properties are selected for this item.
Script configuration:
"multi":{ "multi":true }NoteThis parameter cannot be configured in the codeless UI.
Example:
## Reader: Raw data in Elasticsearch "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "v3ShOoMB4GR_1DmrZN22", "_score": 1.0, "_source": { "feature1": "feature1", "feature2": [ 1, 2, 3 ], "feature3": { "child": "feature3" } } }] ## Data Integration Elasticsearch reader plug-in configuration "parameter": { "column":[ "feature1|feature2|feature3" ], "multi":{ "multi":true } } ## Writer result: One row and one column of data "feature1"
Scenario 6: Selectively synchronize multiple properties
Background: You can selectively process multiple properties. The first property that has a value is returned. If none of the specified properties exist, null is written.
Configuration: `property1|property2|...`
If the column name contains the `|` separator, multiple properties are selected for this item.
Script configuration:
"multi":{ "multi":true }NoteThis parameter cannot be configured in the codeless UI.
Example:
## Reader: Raw data in Elasticsearch "hits": [ { "_index": "mutiltest_1", "_type": "_doc", "_id": "v3ShOoMB4GR_1DmrZN22", "_score": 1.0, "_source": { "feature1": "feature1", "feature2": [ 1, 2, 3 ], "feature3": { "child": "feature3" } } }] ## Data Integration Elasticsearch reader plug-in configuration "parameter": { "column":[ "feature1,feature2,feature3" ], "multi":{ "multi":true } } ## Writer result: One row and one column of data "feature1,[1,2,3],{"child":"feature3"}"
References
Data Integration supports additional data sources. For more information, see Supported data source types and synchronization operations.