All Products
Search
Document Center

DataWorks:Elasticsearch

Last Updated:Mar 27, 2026

The Elasticsearch data source connects DataWorks Data Integration to Alibaba Cloud Elasticsearch for bidirectional data synchronization. Use it to read data from Elasticsearch indexes into downstream systems, write data from upstream sources into Elasticsearch, or both.

Supported sync modes:

Mode Direction
Offline read (Elasticsearch Reader) Elasticsearch → downstream
Offline write (Elasticsearch Writer) Upstream → Elasticsearch
Real-time write Upstream → Elasticsearch (streaming)

Supported versions and resource groups

DataWorks supports Alibaba Cloud Elasticsearch 5.x, 6.x, 7.x, and 8.x. Self-managed Elasticsearch data sources are not supported.

Version support varies by resource group:

Resource group Supported versions
Public resource group Elasticsearch 5.x
Serverless resource group (recommended) Elasticsearch 5.x, 6.x, 7.x, 8.x
Exclusive resource group for Data Integration Elasticsearch 5.x, 6.x, 7.x, 8.x

For setup instructions, see Use a serverless resource group and Use exclusive resource groups for Data Integration.

Limitations

  • Elasticsearch Reader fetches shard information from the server before syncing. All shards must be active during synchronization — inactive shards cause data inconsistency.

  • For Elasticsearch 6.x or later, use a serverless resource group or an exclusive resource group for Data Integration. Public resource groups do not support Elasticsearch 6.x and later.

  • Fields of the scaled_float type cannot be synchronized.

  • Indexes with $ref in field names cannot be synchronized.

Key concepts

Elasticsearch maps to a relational database as follows:

Elasticsearch Relational database
Elasticsearch (instance) Relational DB (instance)
Index Databases
Types Tables
Documents Rows
Fields Columns

Supported field types

Type Offline read Offline write Real-time write
binary Supported Supported Supported
boolean Supported Supported Supported
keyword Supported Supported Supported
constant_keyword Not supported Not supported Not supported
wildcard Not supported Not supported Not supported
long Supported Supported Supported
integer Supported Supported Supported
short Supported Supported Supported
byte Supported Supported Supported
double Supported Supported Supported
float Supported Supported Supported
half_float Not supported Not supported Not supported
scaled_float Not supported Not supported Not supported
unsigned_long Not supported Not supported Not supported
date Supported Supported Supported
date_nanos Not supported Not supported Not supported
alias Not supported Not supported Not supported
object Supported Supported Supported
flattened Not supported Not supported Not supported
nested Supported Supported Supported
join Not supported Not supported Not supported
integer_range Supported Supported Supported
float_range Supported Supported Supported
long_range Supported Supported Supported
double_range Supported Supported Supported
date_range Supported Supported Supported
ip_range Not supported Supported Supported
ip Supported Supported Supported
version Supported Supported Supported
murmur3 Not supported Not supported Not supported
aggregate_metric_double Not supported Not supported Not supported
histogram Not supported Not supported Not supported
text Supported Supported Supported
annotated-text Not supported Not supported Not supported
completion Supported Not supported Not supported
search_as_you_type Not supported Not supported Not supported
token_count Supported Not supported Not supported
dense_vector Not supported Not supported Not supported
rank_feature Not supported Not supported Not supported
rank_features Not supported Not supported Not supported
geo_point Supported Supported Supported
geo_shape Supported Supported Supported
point Not supported Not supported Not supported
shape Not supported Not supported Not supported
percolator Not supported Not supported Not supported
string Supported Supported Supported

How it works

Elasticsearch Reader uses the _search scroll slice API to read data. The slice mechanism works alongside the multi-threaded sharding in Data Integration tasks, so multiple threads read from different slices in parallel. Data types are converted based on the mapping configuration in Elasticsearch.

For more information about the scroll API, see the official Elasticsearch documentation.

Advanced features

Elasticsearch Reader supports two advanced data extraction modes beyond standard field-level reads:

  • Full document pull — read an entire Elasticsearch document as a single JSON string field. See Scenario 1: Full data pull.

  • Semi-structured to structured extraction — flatten nested JSON, split arrays into rows, deduplicate array values, merge multiple properties, or selectively pick the first non-null property. See Scenarios 2–6 for configuration details.

Add a data source

Before configuring a sync task, add the Elasticsearch data source in DataWorks. Follow the instructions in Data source management. Parameter descriptions are available directly in the DataWorks console when you add the data source.

Develop a data synchronization task

Configure an offline sync task (single table)

Configure a real-time write task (single table)

See Configure a real-time synchronization task in DataStudio.

Configure a full-database offline or real-time sync task

See Configure a real-time full-database synchronization task.

Basic configuration

The following example shows a minimal Reader-to-Writer configuration. Remove all comments before running.

{
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  },
  "setting": {
    "errorLimit": {
      "record": "0"         // Maximum number of error records allowed.
    },
    "jvmOption": "",
    "speed": {
      "concurrent": 3,      // Number of concurrent threads.
      "throttle": true,
      "mbps": "12"          // Throttle rate. 1 Mbps = 1 MB/s.
    }
  },
  "steps": [
    {
      "category": "reader",
      "name": "Reader",
      "parameter": {
        "column": [         // Columns to read.
          "id",
          "name"
        ],
        "endpoint": "",     // Elasticsearch endpoint.
        "index": "",        // Index name.
        "password": "",
        "scroll": "",       // Scroll context duration (e.g., "5m").
        "search": "",       // Query body — same format as Elasticsearch _search API.
        "type": "default",
        "username": ""
      },
      "stepType": "elasticsearch"
    },
    {
      "category": "writer",
      "name": "Writer",
      "parameter": {
        "column": [         // Columns to write, with type definitions.
          {
            "name": "id",
            "type": "integer"
          },
          {
            "name": "name",
            "type": "text"
          }
        ],
        "index": "test",            // Destination index.
        "indexType": "",            // Leave blank for Elasticsearch 7.x.
        "actionType": "index",      // Write mode: index or update.
        "cleanup": false,           // If true, deletes and recreates the index before writing.
        "datasource": "test",       // Data source name configured in DataWorks.
        "primaryKeyInfo": {
          "fieldDelimiterOrigin": ",",
          "column": ["id"],
          "type": "specific",
          "fieldDelimiter": ","
        },
        "dynamic": false,           // If false, mappings are generated from column config.
        "batchSize": 1024           // Documents written per batch.
      },
      "name": "Writer",
      "category": "writer",
      "stepType": "elasticsearch"
    }
  ],
  "type": "job",
  "version": "2.0"
}

Appendix 1: Script demos and parameter descriptions

Reader script demo

{
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  },
  "setting": {
    "errorLimit": {
      "record": "0"
    },
    "jvmOption": "",
    "speed": {
      "concurrent": 3,
      "throttle": false
    }
  },
  "steps": [
    {
      "category": "reader",
      "name": "Reader",
      "parameter": {
        "column": [
          "id",
          "name"
        ],
        "endpoint": "http://es-cn-xxx.elasticsearch.aliyuncs.com:9200",
        "index": "aliyun_es_xx",
        "password": "*******",
        "multiThread": true,
        "scroll": "5m",
        "pageSize": 5000,
        "connTimeOut": 600000,
        "readTimeOut": 600000,
        "retryCount": 30,
        "retrySleepTime": "10000",
        "search": {
          "range": {
            "gmt_modified": {
              "gte": 0
            }
          }
        },
        "type": "doc",
        "username": "aliyun_di"
      },
      "stepType": "elasticsearch"
    },
    {
      "category": "writer",
      "name": "Writer",
      "parameter": {},
      "stepType": "stream"
    }
  ],
  "type": "job",
  "version": "2.0"
}

Reader script parameters

Parameter Description Required Default value
datasource Name of the data source as configured in DataWorks. Must match the added data source name. Yes None
index Elasticsearch index name. Yes None
type Elasticsearch index type. No Index name
search Elasticsearch query body. Uses the same format as the _search API. Yes None
pageSize Number of records to read per page. No 100
scroll Scroll context duration. Controls how long the scroll context stays alive between page reads. If set too low, the scroll expires when idle time between pages is too long, causing data loss. If set too high, concurrent queries may exceed the server's max_open_scroll_context limit, causing query errors. Yes None
strictMode If true and a shard.failed error occurs, the read stops immediately to prevent partial data from being synchronized. No true
sort Field to sort results by. No None
retryCount Number of retries after a failure. No 300
connTimeOut Client connection timeout in milliseconds. No 600,000
readTimeOut Client read timeout in milliseconds. No 600,000
multiThread Whether to use multiple threads for HTTP requests. No true
preemptiveAuth Whether to use preemptive authentication for HTTP requests. No false
retrySleepTime Interval between retries in milliseconds. No 1,000
discovery Node discovery mode. true: connects to a random cluster node and periodically refreshes the server list. false: sends all requests to the configured endpoint. No false
compression Whether to compress request bodies with GZIP. Requires http.compression to be enabled on the Elasticsearch node. No false
dateFormat Required if a date field has no format in its mapping. Specify all date formats used in the field, for example: "yyyy-MM-dd||yyyy-MM-dd HH:mm:ss". No None
full If true, reads the entire document as a single JSON string field. See Scenario 1: Full data pull. No None
multi Enables advanced multi-property extraction. Has two sub-properties: multi.key and multi.mult. See Advanced features for the five supported use cases. No None

Writer script demo

An Elasticsearch instance inside a Virtual Private Cloud (VPC) cannot be reached from a public resource group due to network isolation. Use a serverless resource group or an exclusive resource group for Data Integration to access instances in a VPC.
{
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  },
  "setting": {
    "errorLimit": {
      "record": "0"
    },
    "speed": {
      "throttle": true,
      "concurrent": 1,
      "mbps": "12"
    }
  },
  "steps": [
    {
      "category": "reader",
      "name": "Reader",
      "parameter": {},
      "stepType": "stream"
    },
    {
      "category": "writer",
      "name": "Writer",
      "parameter": {
        "datasource": "xxx",
        "index": "test-1",
        "type": "default",
        "cleanup": true,
        "settings": {
          "number_of_shards": 1,
          "number_of_replicas": 0
        },
        "discovery": false,
        "primaryKeyInfo": {
          "type": "pk",
          "fieldDelimiter": ",",
          "column": []
        },
        "batchSize": 1000,
        "dynamic": false,
        "esPartitionColumn": [
          {
            "name": "col1",
            "comment": "xx",
            "type": "STRING"
          }
        ],
        "column": [
          { "name": "pk", "type": "id" },
          { "name": "col_ip", "type": "ip" },
          { "name": "col_array", "type": "long", "array": true },
          { "name": "col_double", "type": "double" },
          { "name": "col_long", "type": "long" },
          { "name": "col_integer", "type": "integer" },
          { "name": "col_keyword", "type": "keyword" },
          {
            "name": "col_text",
            "type": "text",
            "analyzer": "ik_max_word",
            "other_params": { "doc_values": false }
          },
          { "name": "col_geo_point", "type": "geo_point" },
          { "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss" },
          { "name": "col_nested1", "type": "nested" },
          { "name": "col_nested2", "type": "nested" },
          { "name": "col_object1", "type": "object" },
          { "name": "col_object2", "type": "object" },
          { "name": "col_integer_array", "type": "integer", "array": true },
          {
            "name": "col_geo_shape",
            "type": "geo_shape",
            "tree": "quadtree",
            "precision": "10m"
          }
        ]
      },
      "stepType": "elasticsearch"
    }
  ],
  "type": "job",
  "version": "2.0"
}

Writer script parameters

Parameter Description Required Default value
datasource Elasticsearch data source name in DataWorks. If not yet created, see Configure an Elasticsearch data source. Yes None
index Destination Elasticsearch index name. Yes None
indexType Elasticsearch index type. No Elasticsearch
cleanup Pre-write index behavior. true: deletes the existing index and recreates it before writing (all existing data is lost). false: retains existing data in the index. No false
batchSize Number of documents to insert per batch. No 1,000
trySize Number of retries after a write failure. No 30
timeout Client timeout in milliseconds. No 600,000
discovery Node discovery mode. true: connects to a random cluster node and periodically refreshes the server list. false: connects directly to the Elasticsearch cluster endpoint. No false
compression Whether to compress HTTP request bodies. No true
multiThread Whether to use multiple threads for HTTP requests. No true
ignoreWriteError If true, write errors are skipped without retrying and the task continues. No false
ignoreParseError If true, data format parsing errors are skipped and the task continues. No true
alias Alias to create for the index after data is imported. An alias works like a database view — operations on the alias apply to the underlying index. No None
aliasMode How the alias is attached after import. append: adds the current index to the alias, which can point to multiple indexes. exclusive: removes the existing alias mapping first, then adds the current index — the alias points to exactly one index. Aliases support index migration and unified multi-index queries. No append
settings Index settings applied when creating the index. Follows the standard Elasticsearch index settings format. No None
column Field definitions for the documents to write. Each field requires at minimum name and type. See the column type reference below. Yes None
dynamic Controls whether Elasticsearch dynamic mapping is used for fields not in column. true: Elasticsearch automatically maps unmapped fields. false: mappings are generated and updated only from the column configuration. For Elasticsearch 7.x, set "esVersion": "7" in the code editor when using dynamic mapping.
Important

Enable dynamic only to work around field mapping errors. It may produce unexpected field types or data exceptions — evaluate your data structure before enabling it.

No false
actionType Write action type. index: uses the Index.Builder of the Elasticsearch SDK. If no _id is specified, Elasticsearch generates one automatically. If _id is specified, the entire document is replaced. update: updates specific fields in an existing document by _id. If the _id does not exist, the document is inserted. Each update retrieves the full document to modify specific fields — this can significantly affect performance. If actionType is update, primaryKeyInfo is required. No index
primaryKeyInfo Defines how the document _id is set. See the primary key configuration reference below. Yes specific
esPartitionColumn Routing configuration for partitioned writes. Concatenates the values of the specified columns (with an empty string separator) and sets the result as the Elasticsearch routing value, directing documents to a specific shard. If not specified, _id is used as the routing key to distribute documents evenly across shards. No false
enableWriteNull Controls whether null fields from the source are written to Elasticsearch. true: null fields are written; the corresponding Elasticsearch field is null. false: null fields are not written; the field is absent in the Elasticsearch document. No true

Column type reference

The column parameter supports the following field types:

id          // Maps to _id in Elasticsearch. Documents with the same ID are overwritten, not re-indexed.
string
text
keyword
long
integer
short
byte
double
float
date
boolean
binary
integer_range
float_range
long_range
double_range
date_range
geo_point
geo_shape
ip
token_count
array
object
nested

Type-specific configuration:

  • text — supports additional properties such as analyzer, norms, and index_options:

    {
      "name": "col_text",
      "type": "text",
      "analyzer": "ik_max_word"
    }
  • date — two methods for parsing source data:

    • Method 1 (write as-is): Set "origin": true to write the field value directly to Elasticsearch without conversion. Also configure "format" so the Writer sets the correct format in the mapping: ``json { "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss", "origin": true } ``

    • Method 2 (timezone conversion): Omit origin and add "Timezone" so Data Integration converts the timezone before writing: ``json { "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss", "Timezone": "UTC" } ``

  • geo_shape — supports tree (geohash or quadtree) and precision:

    {
      "name": "col_geo_shape",
      "type": "geo_shape",
      "tree": "quadtree",
      "precision": "10m"
    }
  • other_params — use this property inside any column entry to set Elasticsearch mapping properties not covered by the standard field types:

    {
      "name": "guid",
      "type": "text",
      "other_params": {
        "doc_values": false
      }
    }

Primary key configuration reference

The primaryKeyInfo parameter supports three modes:

  • Business primary key (`pk`) — sets _id to the value of a specific field:

    "primaryKeyInfo": {
      "type": "pk",
      "column": ["id"]
    }
  • Composite primary key (`specific`) — sets _id by concatenating multiple field values, separated by fieldDelimiter:

    In the codeless UI, the Primary key column configuration only lists fields that already exist in the Elasticsearch index.
    "primaryKeyInfo": {
      "type": "specific",
      "fieldDelimiter": ",",
      "column": ["col1", "col2"]
    }
  • No primary key (`nopk`) — Elasticsearch generates _id automatically:

    "primaryKeyInfo": {
      "type": "nopk"
    }

Appendix 2: Write data to Elasticsearch in an array format

Two methods are available for writing source data to Elasticsearch as an array.

Parse as JSON

If the source data is already a JSON array string such as "[1,2,3,4,5]", set "json_array": true to parse it:

{
  "name": "docs_1",
  "type": "keyword",
  "json_array": true
}

Parse with a separator

If the source data is a delimited string such as "1,2,3,4,5", set "array": true on the column and configure splitter at the parameter level:

{
  "parameter": {
    "column": [
      {
        "name": "docs_2",
        "array": true,
        "type": "long"
      }
    ],
    "splitter": ","    // Must be at the same level as "column", not inside it.
  }
}
splitter is a global parameter — a task supports only one separator. If multiple array fields use different delimiters (for example, col1="1,2,3" and col2="6-7-8"), you cannot configure separate splitters for each field.

Appendix 3: Scenarios

Scenario 1: Full data pull

Pull an entire Elasticsearch document as a single JSON string field. Set "full": true in the Reader parameter.

Input — raw Elasticsearch document:

"hits": [
  {
    "_index": "mutiltest_1",
    "_type": "_doc",
    "_id": "IXgdO4MB4GR_1DmrjTXP",
    "_score": 1.0,
    "_source": {
      "feature1": "value1",
      "feature2": "value2",
      "feature3": "value3"
    }
  }
]

Reader configuration:

"parameter": {
  "column": ["content"],
  "full": true
}

Output — one row, one column:

{"_index":"mutiltest_1","_type":"_doc","_id":"IXgdO4MB4GR_1DmrjTXP","_source":{"feature1":"value1","feature2":"value2","feature3":"value3"},"sort":["IXgdO4MB4GR_1DmrjTXP"]}

Scenario 2: Synchronize properties of nested or object fields

Use dot notation and array index notation to extract specific properties from object or nested fields:

  • property — top-level property

  • property.sub-property — nested sub-property

  • property[0].sub-property — element at a specific array index

Configuration is not available in the codeless UI for this scenario. Use the code editor.

Reader configuration:

"multi": {
  "multi": true
}

Input — raw Elasticsearch document:

"hits": [
  {
    "_index": "mutiltest_1",
    "_type": "_doc",
    "_id": "7XAOOoMB4GR_1Dmrrust",
    "_score": 1.0,
    "_source": {
      "level1": {
        "level2": [
          { "level3": "testlevel3_1" },
          { "level3": "testlevel3_2" }
        ]
      }
    }
  }
]

Column configuration:

"column": [
  "level1",
  "level1.level2",
  "level1.level2[0]",
  "level1.level2.level3"
]

Output — one row, four columns:

column1 (level1):               {"level2":[{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]}
column2 (level1.level2):        [{"level3":"testlevel3_1"},{"level3":"testlevel3_2"}]
column3 (level1.level2[0]):     {"level3":"testlevel3_1"}
column4 (level1.level2.level3): null
If a parent node in the path is an array, the result is null. For example, level1.level2.level3 returns null because level2 is an array. Use level1.level2[0].level3 or level1.level2[1].level3 instead. The level1.level2[*].level3 wildcard is not currently supported.
Keys that contain a period (.) are not supported. For example, {"level1.level2": {"level3": "value"}} returns null for this path.

Scenario 3: Split array properties into multiple rows

Expand a one-to-many array into separate rows using the [*] wildcard in the column path.

Configuration: property[*].sub-property

In the codeless UI, set Split multi-row array column name to generate the equivalent script configuration.

Reader configuration:

"multi": {
  "multi": true,
  "key": "headers"
}
The value of key must be an array field. If it is not, an error occurs.

Input — two Elasticsearch documents:

[
  {
    "_source": {
      "headers": [
        { "remoteip": "192.0.2.1" },
        { "remoteip": "192.0.2.2" }
      ]
    }
  },
  {
    "_source": {
      "headers": [
        { "remoteip": "192.0.2.3" },
        { "remoteip": "192.0.2.4" }
      ]
    }
  }
]

Column configuration:

"column": ["headers[*].remoteip"]

Output — four rows:

192.0.2.1
192.0.2.2
192.0.2.3
192.0.2.4

Scenario 4: Deduplicate and merge array properties

Deduplicate and merge an array field into a single comma-separated string. Include [] in the column name to activate deduplication. Deduplication uses the toString result as the comparison key.

Configuration: property[]

Configuration is not available in the codeless UI. Use the code editor.

Reader configuration:

"multi": {
  "multi": true
}

Input:

"_source": {
  "feature1": ["value1", "value1", "value2", "value2", "value3"]
}

Column configuration:

"column": ["feature1[]"]

Output — one row, one column:

"value1,value2,value3"

Scenario 5: Merge and synchronize multiple properties

Merge multiple properties into a single column. All named properties are concatenated into one value.

Configuration: property1,property2,...

Configuration is not available in the codeless UI. Use the code editor.

Reader configuration:

"multi": {
  "multi": true
}

Input:

"_source": {
  "feature1": "feature1",
  "feature2": [1, 2, 3],
  "feature3": { "child": "feature3" }
}

Column configuration:

"column": ["feature1,feature2,feature3"]

Output — one row, one column:

"feature1,[1,2,3],{"child":"feature3"}"

Scenario 6: Selectively synchronize multiple properties

Return the first non-null value from a list of properties. If none of the specified properties have a value, null is written.

Configuration: property1|property2|...

Configuration is not available in the codeless UI. Use the code editor.

Reader configuration:

"multi": {
  "multi": true
}

Input:

"_source": {
  "feature1": "feature1",
  "feature2": [1, 2, 3],
  "feature3": { "child": "feature3" }
}

Column configuration:

"column": ["feature1|feature2|feature3"]

Output — one row, one column (first non-null value):

"feature1"

References

Data Integration supports additional data sources. For a full list, see Data sources and synchronization.