The Elasticsearch data source connects DataWorks Data Integration to Alibaba Cloud Elasticsearch for bidirectional data synchronization. Use it to read data from Elasticsearch indexes into downstream systems, write data from upstream sources into Elasticsearch, or both.
Supported sync modes:
| Mode | Direction |
|---|---|
| Offline read (Elasticsearch Reader) | Elasticsearch → downstream |
| Offline write (Elasticsearch Writer) | Upstream → Elasticsearch |
| Real-time write | Upstream → Elasticsearch (streaming) |
Supported versions and resource groups
DataWorks supports Alibaba Cloud Elasticsearch 5.x, 6.x, 7.x, and 8.x. Self-managed Elasticsearch data sources are not supported.
Version support varies by resource group:
| Resource group | Supported versions |
|---|---|
| Public resource group | Elasticsearch 5.x |
| Serverless resource group (recommended) | Elasticsearch 5.x, 6.x, 7.x, 8.x |
| Exclusive resource group for Data Integration | Elasticsearch 5.x, 6.x, 7.x, 8.x |
For setup instructions, see Use a serverless resource group and Use exclusive resource groups for Data Integration.
Limitations
-
Elasticsearch Reader fetches shard information from the server before syncing. All shards must be active during synchronization — inactive shards cause data inconsistency.
-
For Elasticsearch 6.x or later, use a serverless resource group or an exclusive resource group for Data Integration. Public resource groups do not support Elasticsearch 6.x and later.
-
Fields of the
scaled_floattype cannot be synchronized. -
Indexes with
$refin field names cannot be synchronized.
Key concepts
Elasticsearch maps to a relational database as follows:
| Elasticsearch | Relational database |
|---|---|
| Elasticsearch (instance) | Relational DB (instance) |
| Index | Databases |
| Types | Tables |
| Documents | Rows |
| Fields | Columns |
Supported field types
| Type | Offline read | Offline write | Real-time write |
|---|---|---|---|
| binary | Supported | Supported | Supported |
| boolean | Supported | Supported | Supported |
| keyword | Supported | Supported | Supported |
| constant_keyword | Not supported | Not supported | Not supported |
| wildcard | Not supported | Not supported | Not supported |
| long | Supported | Supported | Supported |
| integer | Supported | Supported | Supported |
| short | Supported | Supported | Supported |
| byte | Supported | Supported | Supported |
| double | Supported | Supported | Supported |
| float | Supported | Supported | Supported |
| half_float | Not supported | Not supported | Not supported |
| scaled_float | Not supported | Not supported | Not supported |
| unsigned_long | Not supported | Not supported | Not supported |
| date | Supported | Supported | Supported |
| date_nanos | Not supported | Not supported | Not supported |
| alias | Not supported | Not supported | Not supported |
| object | Supported | Supported | Supported |
| flattened | Not supported | Not supported | Not supported |
| nested | Supported | Supported | Supported |
| join | Not supported | Not supported | Not supported |
| integer_range | Supported | Supported | Supported |
| float_range | Supported | Supported | Supported |
| long_range | Supported | Supported | Supported |
| double_range | Supported | Supported | Supported |
| date_range | Supported | Supported | Supported |
| ip_range | Not supported | Supported | Supported |
| ip | Supported | Supported | Supported |
| version | Supported | Supported | Supported |
| murmur3 | Not supported | Not supported | Not supported |
| aggregate_metric_double | Not supported | Not supported | Not supported |
| histogram | Not supported | Not supported | Not supported |
| text | Supported | Supported | Supported |
| annotated-text | Not supported | Not supported | Not supported |
| completion | Supported | Not supported | Not supported |
| search_as_you_type | Not supported | Not supported | Not supported |
| token_count | Supported | Not supported | Not supported |
| dense_vector | Not supported | Not supported | Not supported |
| rank_feature | Not supported | Not supported | Not supported |
| rank_features | Not supported | Not supported | Not supported |
| geo_point | Supported | Supported | Supported |
| geo_shape | Supported | Supported | Supported |
| point | Not supported | Not supported | Not supported |
| shape | Not supported | Not supported | Not supported |
| percolator | Not supported | Not supported | Not supported |
| string | Supported | Supported | Supported |
How it works
Elasticsearch Reader uses the _search scroll slice API to read data. The slice mechanism works alongside the multi-threaded sharding in Data Integration tasks, so multiple threads read from different slices in parallel. Data types are converted based on the mapping configuration in Elasticsearch.
For more information about the scroll API, see the official Elasticsearch documentation.
Advanced features
Elasticsearch Reader supports two advanced data extraction modes beyond standard field-level reads:
-
Full document pull — read an entire Elasticsearch document as a single JSON string field. See Scenario 1: Full data pull.
-
Semi-structured to structured extraction — flatten nested JSON, split arrays into rows, deduplicate array values, merge multiple properties, or selectively pick the first non-null property. See Scenarios 2–6 for configuration details.
Add a data source
Before configuring a sync task, add the Elasticsearch data source in DataWorks. Follow the instructions in Data source management. Parameter descriptions are available directly in the DataWorks console when you add the data source.
Develop a data synchronization task
Configure an offline sync task (single table)
-
For a step-by-step walkthrough, see Configure a task in the codeless UI or Configure a task in the code editor.
-
For the full parameter reference and script examples, see Appendix 1: Script demos and parameter descriptions.
Configure a real-time write task (single table)
See Configure a real-time synchronization task in DataStudio.
Configure a full-database offline or real-time sync task
See Configure a real-time full-database synchronization task.
Basic configuration
The following example shows a minimal Reader-to-Writer configuration. Remove all comments before running.
{
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0" // Maximum number of error records allowed.
},
"jvmOption": "",
"speed": {
"concurrent": 3, // Number of concurrent threads.
"throttle": true,
"mbps": "12" // Throttle rate. 1 Mbps = 1 MB/s.
}
},
"steps": [
{
"category": "reader",
"name": "Reader",
"parameter": {
"column": [ // Columns to read.
"id",
"name"
],
"endpoint": "", // Elasticsearch endpoint.
"index": "", // Index name.
"password": "",
"scroll": "", // Scroll context duration (e.g., "5m").
"search": "", // Query body — same format as Elasticsearch _search API.
"type": "default",
"username": ""
},
"stepType": "elasticsearch"
},
{
"category": "writer",
"name": "Writer",
"parameter": {
"column": [ // Columns to write, with type definitions.
{
"name": "id",
"type": "integer"
},
{
"name": "name",
"type": "text"
}
],
"index": "test", // Destination index.
"indexType": "", // Leave blank for Elasticsearch 7.x.
"actionType": "index", // Write mode: index or update.
"cleanup": false, // If true, deletes and recreates the index before writing.
"datasource": "test", // Data source name configured in DataWorks.
"primaryKeyInfo": {
"fieldDelimiterOrigin": ",",
"column": ["id"],
"type": "specific",
"fieldDelimiter": ","
},
"dynamic": false, // If false, mappings are generated from column config.
"batchSize": 1024 // Documents written per batch.
},
"name": "Writer",
"category": "writer",
"stepType": "elasticsearch"
}
],
"type": "job",
"version": "2.0"
}
Appendix 1: Script demos and parameter descriptions
Reader script demo
{
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"jvmOption": "",
"speed": {
"concurrent": 3,
"throttle": false
}
},
"steps": [
{
"category": "reader",
"name": "Reader",
"parameter": {
"column": [
"id",
"name"
],
"endpoint": "http://es-cn-xxx.elasticsearch.aliyuncs.com:9200",
"index": "aliyun_es_xx",
"password": "*******",
"multiThread": true,
"scroll": "5m",
"pageSize": 5000,
"connTimeOut": 600000,
"readTimeOut": 600000,
"retryCount": 30,
"retrySleepTime": "10000",
"search": {
"range": {
"gmt_modified": {
"gte": 0
}
}
},
"type": "doc",
"username": "aliyun_di"
},
"stepType": "elasticsearch"
},
{
"category": "writer",
"name": "Writer",
"parameter": {},
"stepType": "stream"
}
],
"type": "job",
"version": "2.0"
}
Reader script parameters
| Parameter | Description | Required | Default value |
|---|---|---|---|
datasource |
Name of the data source as configured in DataWorks. Must match the added data source name. | Yes | None |
index |
Elasticsearch index name. | Yes | None |
type |
Elasticsearch index type. | No | Index name |
search |
Elasticsearch query body. Uses the same format as the _search API. |
Yes | None |
pageSize |
Number of records to read per page. | No | 100 |
scroll |
Scroll context duration. Controls how long the scroll context stays alive between page reads. If set too low, the scroll expires when idle time between pages is too long, causing data loss. If set too high, concurrent queries may exceed the server's max_open_scroll_context limit, causing query errors. |
Yes | None |
strictMode |
If true and a shard.failed error occurs, the read stops immediately to prevent partial data from being synchronized. |
No | true |
sort |
Field to sort results by. | No | None |
retryCount |
Number of retries after a failure. | No | 300 |
connTimeOut |
Client connection timeout in milliseconds. | No | 600,000 |
readTimeOut |
Client read timeout in milliseconds. | No | 600,000 |
multiThread |
Whether to use multiple threads for HTTP requests. | No | true |
preemptiveAuth |
Whether to use preemptive authentication for HTTP requests. | No | false |
retrySleepTime |
Interval between retries in milliseconds. | No | 1,000 |
discovery |
Node discovery mode. true: connects to a random cluster node and periodically refreshes the server list. false: sends all requests to the configured endpoint. |
No | false |
compression |
Whether to compress request bodies with GZIP. Requires http.compression to be enabled on the Elasticsearch node. |
No | false |
dateFormat |
Required if a date field has no format in its mapping. Specify all date formats used in the field, for example: "yyyy-MM-dd||yyyy-MM-dd HH:mm:ss". |
No | None |
full |
If true, reads the entire document as a single JSON string field. See Scenario 1: Full data pull. |
No | None |
multi |
Enables advanced multi-property extraction. Has two sub-properties: multi.key and multi.mult. See Advanced features for the five supported use cases. |
No | None |
Writer script demo
An Elasticsearch instance inside a Virtual Private Cloud (VPC) cannot be reached from a public resource group due to network isolation. Use a serverless resource group or an exclusive resource group for Data Integration to access instances in a VPC.
{
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,
"concurrent": 1,
"mbps": "12"
}
},
"steps": [
{
"category": "reader",
"name": "Reader",
"parameter": {},
"stepType": "stream"
},
{
"category": "writer",
"name": "Writer",
"parameter": {
"datasource": "xxx",
"index": "test-1",
"type": "default",
"cleanup": true,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"discovery": false,
"primaryKeyInfo": {
"type": "pk",
"fieldDelimiter": ",",
"column": []
},
"batchSize": 1000,
"dynamic": false,
"esPartitionColumn": [
{
"name": "col1",
"comment": "xx",
"type": "STRING"
}
],
"column": [
{ "name": "pk", "type": "id" },
{ "name": "col_ip", "type": "ip" },
{ "name": "col_array", "type": "long", "array": true },
{ "name": "col_double", "type": "double" },
{ "name": "col_long", "type": "long" },
{ "name": "col_integer", "type": "integer" },
{ "name": "col_keyword", "type": "keyword" },
{
"name": "col_text",
"type": "text",
"analyzer": "ik_max_word",
"other_params": { "doc_values": false }
},
{ "name": "col_geo_point", "type": "geo_point" },
{ "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss" },
{ "name": "col_nested1", "type": "nested" },
{ "name": "col_nested2", "type": "nested" },
{ "name": "col_object1", "type": "object" },
{ "name": "col_object2", "type": "object" },
{ "name": "col_integer_array", "type": "integer", "array": true },
{
"name": "col_geo_shape",
"type": "geo_shape",
"tree": "quadtree",
"precision": "10m"
}
]
},
"stepType": "elasticsearch"
}
],
"type": "job",
"version": "2.0"
}
Writer script parameters
| Parameter | Description | Required | Default value |
|---|---|---|---|
datasource |
Elasticsearch data source name in DataWorks. If not yet created, see Configure an Elasticsearch data source. | Yes | None |
index |
Destination Elasticsearch index name. | Yes | None |
indexType |
Elasticsearch index type. | No | Elasticsearch |
cleanup |
Pre-write index behavior. true: deletes the existing index and recreates it before writing (all existing data is lost). false: retains existing data in the index. |
No | false |
batchSize |
Number of documents to insert per batch. | No | 1,000 |
trySize |
Number of retries after a write failure. | No | 30 |
timeout |
Client timeout in milliseconds. | No | 600,000 |
discovery |
Node discovery mode. true: connects to a random cluster node and periodically refreshes the server list. false: connects directly to the Elasticsearch cluster endpoint. |
No | false |
compression |
Whether to compress HTTP request bodies. | No | true |
multiThread |
Whether to use multiple threads for HTTP requests. | No | true |
ignoreWriteError |
If true, write errors are skipped without retrying and the task continues. |
No | false |
ignoreParseError |
If true, data format parsing errors are skipped and the task continues. |
No | true |
alias |
Alias to create for the index after data is imported. An alias works like a database view — operations on the alias apply to the underlying index. | No | None |
aliasMode |
How the alias is attached after import. append: adds the current index to the alias, which can point to multiple indexes. exclusive: removes the existing alias mapping first, then adds the current index — the alias points to exactly one index. Aliases support index migration and unified multi-index queries. |
No | append |
settings |
Index settings applied when creating the index. Follows the standard Elasticsearch index settings format. | No | None |
column |
Field definitions for the documents to write. Each field requires at minimum name and type. See the column type reference below. |
Yes | None |
dynamic |
Controls whether Elasticsearch dynamic mapping is used for fields not in column. true: Elasticsearch automatically maps unmapped fields. false: mappings are generated and updated only from the column configuration. For Elasticsearch 7.x, set "esVersion": "7" in the code editor when using dynamic mapping. Important
Enable |
No | false |
actionType |
Write action type. index: uses the Index.Builder of the Elasticsearch SDK. If no _id is specified, Elasticsearch generates one automatically. If _id is specified, the entire document is replaced. update: updates specific fields in an existing document by _id. If the _id does not exist, the document is inserted. Each update retrieves the full document to modify specific fields — this can significantly affect performance. If actionType is update, primaryKeyInfo is required. |
No | index |
primaryKeyInfo |
Defines how the document _id is set. See the primary key configuration reference below. |
Yes | specific |
esPartitionColumn |
Routing configuration for partitioned writes. Concatenates the values of the specified columns (with an empty string separator) and sets the result as the Elasticsearch routing value, directing documents to a specific shard. If not specified, _id is used as the routing key to distribute documents evenly across shards. |
No | false |
enableWriteNull |
Controls whether null fields from the source are written to Elasticsearch. true: null fields are written; the corresponding Elasticsearch field is null. false: null fields are not written; the field is absent in the Elasticsearch document. |
No | true |
Column type reference
The column parameter supports the following field types:
id // Maps to _id in Elasticsearch. Documents with the same ID are overwritten, not re-indexed.
string
text
keyword
long
integer
short
byte
double
float
date
boolean
binary
integer_range
float_range
long_range
double_range
date_range
geo_point
geo_shape
ip
token_count
array
object
nested
Type-specific configuration:
-
text — supports additional properties such as
analyzer,norms, andindex_options:{ "name": "col_text", "type": "text", "analyzer": "ik_max_word" } -
date — two methods for parsing source data:
-
Method 1 (write as-is): Set
"origin": trueto write the field value directly to Elasticsearch without conversion. Also configure"format"so the Writer sets the correct format in the mapping: ``json { "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss", "origin": true }`` -
Method 2 (timezone conversion): Omit
originand add"Timezone"so Data Integration converts the timezone before writing: ``json { "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss", "Timezone": "UTC" }``
-
-
geo_shape — supports
tree(geohashorquadtree) andprecision:{ "name": "col_geo_shape", "type": "geo_shape", "tree": "quadtree", "precision": "10m" } -
other_params — use this property inside any
columnentry to set Elasticsearch mapping properties not covered by the standard field types:{ "name": "guid", "type": "text", "other_params": { "doc_values": false } }
Primary key configuration reference
The primaryKeyInfo parameter supports three modes:
-
Business primary key (`pk`) — sets
_idto the value of a specific field:"primaryKeyInfo": { "type": "pk", "column": ["id"] } -
Composite primary key (`specific`) — sets
_idby concatenating multiple field values, separated byfieldDelimiter:In the codeless UI, the Primary key column configuration only lists fields that already exist in the Elasticsearch index.
"primaryKeyInfo": { "type": "specific", "fieldDelimiter": ",", "column": ["col1", "col2"] } -
No primary key (`nopk`) — Elasticsearch generates
_idautomatically:"primaryKeyInfo": { "type": "nopk" }
Appendix 2: Write data to Elasticsearch in an array format
Two methods are available for writing source data to Elasticsearch as an array.
Parse as JSON
If the source data is already a JSON array string such as "[1,2,3,4,5]", set "json_array": true to parse it:
{
"name": "docs_1",
"type": "keyword",
"json_array": true
}
Parse with a separator
If the source data is a delimited string such as "1,2,3,4,5", set "array": true on the column and configure splitter at the parameter level:
{
"parameter": {
"column": [
{
"name": "docs_2",
"array": true,
"type": "long"
}
],
"splitter": "," // Must be at the same level as "column", not inside it.
}
}
splitteris a global parameter — a task supports only one separator. If multiple array fields use different delimiters (for example,col1="1,2,3"andcol2="6-7-8"), you cannot configure separate splitters for each field.
Appendix 3: Scenarios
Scenario 1: Full data pull
Pull an entire Elasticsearch document as a single JSON string field. Set "full": true in the Reader parameter.
Input — raw Elasticsearch document:
"hits": [
{
"_index": "mutiltest_1",
"_type": "_doc",
"_id": "IXgdO4MB4GR_1DmrjTXP",
"_score": 1.0,
"_source": {
"feature1": "value1",
"feature2": "value2",
"feature3": "value3"
}
}
]
Reader configuration:
"parameter": {
"column": ["content"],
"full": true
}
Output — one row, one column:
{"_index":"mutiltest_1","_type":"_doc","_id":"IXgdO4MB4GR_1DmrjTXP","_source":{"feature1":"value1","feature2":"value2","feature3":"value3"},"sort":["IXgdO4MB4GR_1DmrjTXP"]}
Scenario 2: Synchronize properties of nested or object fields
Use dot notation and array index notation to extract specific properties from object or nested fields:
-
property— top-level property -
property.sub-property— nested sub-property -
property[0].sub-property— element at a specific array index
Configuration is not available in the codeless UI for this scenario. Use the code editor.
Reader configuration:
"multi": {
"multi": true
}
Input — raw Elasticsearch document:
"hits": [
{
"_index": "mutiltest_1",
"_type": "_doc",
"_id": "7XAOOoMB4GR_1Dmrrust",
"_score": 1.0,
"_source": {
"level1": {
"level2": [
{ "level3": "testlevel3_1" },
{ "level3": "testlevel3_2" }
]
}
}
}
]
Column configuration:
"column": [
"level1",
"level1.level2",
"level1.level2[0]",
"level1.level2.level3"
]
Output — one row, four columns:
column1 (level1): {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]}
column2 (level1.level2): [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]
column3 (level1.level2[0]): {"level3":"testlevel3_1"}
column4 (level1.level2.level3): null
If a parent node in the path is an array, the result isnull. For example,level1.level2.level3returnsnullbecauselevel2is an array. Uselevel1.level2[0].level3orlevel1.level2[1].level3instead. Thelevel1.level2[*].level3wildcard is not currently supported.
Keys that contain a period (.) are not supported. For example,{"level1.level2": {"level3": "value"}}returnsnullfor this path.
Scenario 3: Split array properties into multiple rows
Expand a one-to-many array into separate rows using the [*] wildcard in the column path.
Configuration: property[*].sub-property
In the codeless UI, set Split multi-row array column name to generate the equivalent script configuration.
Reader configuration:
"multi": {
"multi": true,
"key": "headers"
}
The value of key must be an array field. If it is not, an error occurs.
Input — two Elasticsearch documents:
[
{
"_source": {
"headers": [
{ "remoteip": "192.0.2.1" },
{ "remoteip": "192.0.2.2" }
]
}
},
{
"_source": {
"headers": [
{ "remoteip": "192.0.2.3" },
{ "remoteip": "192.0.2.4" }
]
}
}
]
Column configuration:
"column": ["headers[*].remoteip"]
Output — four rows:
192.0.2.1
192.0.2.2
192.0.2.3
192.0.2.4
Scenario 4: Deduplicate and merge array properties
Deduplicate and merge an array field into a single comma-separated string. Include [] in the column name to activate deduplication. Deduplication uses the toString result as the comparison key.
Configuration: property[]
Configuration is not available in the codeless UI. Use the code editor.
Reader configuration:
"multi": {
"multi": true
}
Input:
"_source": {
"feature1": ["value1", "value1", "value2", "value2", "value3"]
}
Column configuration:
"column": ["feature1[]"]
Output — one row, one column:
"value1,value2,value3"
Scenario 5: Merge and synchronize multiple properties
Merge multiple properties into a single column. All named properties are concatenated into one value.
Configuration: property1,property2,...
Configuration is not available in the codeless UI. Use the code editor.
Reader configuration:
"multi": {
"multi": true
}
Input:
"_source": {
"feature1": "feature1",
"feature2": [1, 2, 3],
"feature3": { "child": "feature3" }
}
Column configuration:
"column": ["feature1,feature2,feature3"]
Output — one row, one column:
"feature1,[1,2,3],{"child":"feature3"}"
Scenario 6: Selectively synchronize multiple properties
Return the first non-null value from a list of properties. If none of the specified properties have a value, null is written.
Configuration: property1|property2|...
Configuration is not available in the codeless UI. Use the code editor.
Reader configuration:
"multi": {
"multi": true
}
Input:
"_source": {
"feature1": "feature1",
"feature2": [1, 2, 3],
"feature3": { "child": "feature3" }
}
Column configuration:
"column": ["feature1|feature2|feature3"]
Output — one row, one column (first non-null value):
"feature1"
References
Data Integration supports additional data sources. For a full list, see Data sources and synchronization.