All Products
Search
Document Center

MaxCompute:Migrate data from Elasticsearch to MaxCompute

Last Updated:Mar 26, 2026

Use the DataWorks data synchronization feature to copy data from an Alibaba Cloud Elasticsearch cluster into MaxCompute for offline analytics.

Prerequisites

Before you begin, make sure that you have:

The Elasticsearch cluster used in this guide has the following configuration:

ParameterValue
RegionChina (Shanghai)
ZoneZone B
VersionElasticsearch 5.5.3 with Commercial Feature

Background information

Alibaba Cloud Elasticsearch is available in the following versions: Elasticsearch 5.5.3 with Commercial Feature, Elasticsearch 6.3.2 with Commercial Feature, and Elasticsearch 6.7.0 with Commercial Feature. It includes the commercial X-Pack plug-in and supports use cases such as data analysis and search. Compared with open source Elasticsearch, Alibaba Cloud Elasticsearch additionally provides enterprise-class access control, security monitoring and alerting, and automatic reporting.

How it works

  1. Populate the Elasticsearch index with source data by running an existing synchronization task.

  2. Create a destination table in MaxCompute that matches the Elasticsearch field schema.

  3. Configure and run an offline synchronization node in DataWorks. The node uses the Elasticsearch Reader to scroll through the index and the MaxCompute Writer to load rows into the destination table.

  4. Query the destination table in MaxCompute to verify the migration.

Step 1: Create a source table in Elasticsearch

Follow Use DataWorks to synchronize data from MaxCompute to Alibaba Cloud Elasticsearch to populate the Elasticsearch index that you will migrate from.

Step 2: Create a destination table in MaxCompute

  1. Log on to the DataWorks console. In the top navigation bar, select a region. In the left-side navigation pane, choose Data Development and O&M > Data Development. Select your workspace from the drop-down list and click Go to Data Development.

  2. In the Scheduled Workflow pane of DataStudio, right-click your workflow name and choose Create Table > MaxCompute > Table.

  3. In the Create Table dialog box, enter a name in the Name field and click Create.

    Note

    If multiple MaxCompute data sources are associated with DataStudio, select the data source you want to use.

  4. In the toolbar of the table configuration tab, click DDL.

  5. In the DDL dialog box, enter the following table creation statement and click Generate Table Schema:

    create table elastic2mc_bankdata
    (
    age             string,
    job             string,
    marital         string,
    education       string,
    default         string,
    housing         string,
    loan            string,
    contact         string,
    month           string,
    day of week     string
    );
  6. Click Submit to Production Environment.

Step 3: Synchronize data

  1. In the Scheduled Workflow pane, right-click your workflow and choose new > data integration > offline synchronization.

  2. In the Create Node dialog box, enter a name in the Name field and click Confirm.

  3. In the toolbar of the node configuration tab, click the conversion script icon to switch to script mode.

  4. In script mode, click the template import icon.

  5. In the import Template dialog box, select the source type, data source, target type, and data source, and click confirm.

  6. Replace the generated script with the following configuration:

    {
      "type": "job",
      "steps": [
        {
          "stepType": "elasticsearch",
          "parameter": {
            "retryCount": 3,
            "column": [
              "age", "job", "marital", "education", "default", "housing",
              "loan", "contact", "month", "day_of_week", "duration",
              "campaign", "pdays", "previous", "poutcome", "emp_var_rate",
              "cons_price_idx", "cons_conf_idx", "euribor3m", "nr_employed", "y"
            ],
            "scroll": "1m",
            "index": "es_index",
            "pageSize": 1,
            "sort": { "age": "asc" },
            "type": "elasticsearch",
            "connTimeOut": 1000,
            "retrySleepTime": 1000,
            "endpoint": "http://es-cn-xxxx.xxxx.xxxx.xxxx.com:9200",
            "password": "xxxx",
            "search": { "match_all": {} },
            "readTimeOut": 5000,
            "username": "xxxx"
          },
          "name": "Reader",
          "category": "reader"
        },
        {
          "stepType": "odps",
          "parameter": {
            "partition": "",
            "truncate": true,
            "compress": false,
            "datasource": "odps_source",
            "column": [
              "age", "job", "marital", "education", "default", "housing",
              "loan", "contact", "month", "day_of_week", "duration",
              "campaign", "pdays", "previous", "poutcome", "emp_var_rate",
              "cons_price_idx", "cons_conf_idx", "euribor3m", "nr_employed", "y"
            ],
            "emptyAsNull": false,
            "table": "elastic2mc_bankdata"
          },
          "name": "Writer",
          "category": "writer"
        }
      ],
      "version": "2.0",
      "order": {
        "hops": [{ "from": "Reader", "to": "Writer" }]
      },
      "setting": {
        "errorLimit": { "record": "0" },
        "speed": {
          "throttle": false,
          "concurrent": 1,
          "dmu": 1
        }
      }
    }

    The key Reader parameters are:

    ParameterDescriptionValue in this example
    endpointPublic endpoint and port of the Elasticsearch cluster. Find these on the Basic Information page of your cluster.http://es-cn-xxxx.xxxx.xxxx.xxxx.com:9200
    indexName of the Elasticsearch index to read from.es_index
    scrollDuration each scroll context is kept alive on the Elasticsearch server. Increase this value if your dataset is large and each batch takes longer to process.1m
    pageSizeNumber of documents fetched per scroll request. Increase this value for large datasets to improve throughput.1
    sortField and order used to sort documents before reading.{ "age": "asc" }
    retryCountNumber of retries on a failed request.3
    connTimeOutConnection timeout in milliseconds.1000
    readTimeOutRead timeout in milliseconds. Increase this value if scroll batches are slow to return.5000

    The key Writer parameters are:

    ParameterDescriptionValue in this example
    datasourceName of the MaxCompute data source configured in DataStudio.odps_source
    tableName of the destination table in MaxCompute.elastic2mc_bankdata
    truncateWhether to clear the destination table before loading. Set to false for incremental loads.true
    emptyAsNullWhether to write empty strings as NULL.false

    For the full parameter reference, see Elasticsearch Reader.

  7. Click the run icon in the toolbar to execute the synchronization job.

  8. On the Runtime Logs tab, confirm that the job finishes without errors.

Step 4: Verify the migration

  1. In the Scheduled Workflow pane, right-click your workflow and choose new > MaxCompute > ODPS SQL.

  2. In the create a node dialog box, enter a name in the node name field and click submit.

  3. In the ODPS SQL node editor, enter:

    SELECT * FROM elastic2mc_bankdata;
  4. Click the run icon to execute the query.

  5. On the Operation Log tab, review the results.