This topic describes how to use the data synchronization feature of DataWorks to migrate data from an Elasticsearch cluster to MaxCompute.

Prerequisites

  • Activate MaxCompute.
  • DataWorks is activated.
  • A workflow is created in the DataWorks console. In this example, create a workflow in a DataWorks workspace in the basic mode. For more information, see Create a workflow.
  • An Elasticsearch cluster is created.

    Before data migration, make sure that your Elasticsearch cluster works properly. For more information about how to create an Elasticsearch cluster, see Quick start to Elasticsearch.

    This topic uses an Elasticsearch cluster with the following configurations as an example:
    • Region: China (Shanghai)
    • Zone: Zone B
    • Version: Elasticsearch 5.5.3 with Commercial Feature

Background information

Elasticsearch is a Lucene-based search server. It provides a distributed multi-tenant search engine that supports full-text search. Elasticsearch is an open-source product that complies with the Apache open standards. It is a mainstream enterprise-class search engine.

Alibaba Cloud Elasticsearch includes multiple versions, including Elasticsearch 5.5.3 with Commercial Feature, Elasticsearch 6.3.2 with Commercial Feature, and Elasticsearch 6.7.0 with Commercial Feature. It also contains the X-Pack plug-in. You can use Alibaba Cloud Elasticsearch to search for and analyze data. Based on open-source Elasticsearch, Alibaba Cloud Elasticsearch provides enterprise-class access control, security monitoring and alerting, and automatic reporting.

Procedure

  1. Create a source table in Elasticsearch. For more information, see Use DataWorks to synchronize data from MaxCompute to an Alibaba Cloud Elasticsearch cluster.
  2. Create a destination table in MaxCompute.
    1. Login DataWorks console.
    2. Right-click a created workflow, Select new > MaxCompute > table.
    3. In create a table page, select the engine type, and enter table name.
    4. On the table editing page, click DDL Statement.
    5. In the DDL Statement dialog box that appears, enter the following statement and click Generate Table Schema:
      create table elastic2mc_bankdata 
      (
      age             string,
      job             string,
      marital         string,
      education       string,
      default         string,
      housing         string,
      loan            string,
      contact         string,
      month           string,
      day of week     string
      );
    6. Click Submit to Production Environment.
  3. Synchronize data.
    1. Go to the data analytics page. Right-click the specified workflow and choose new > data integration > offline synchronization.
    2. In create a node dialog box, enter node name, and click submit.
    3. In the top navigation bar, choose Conversion scripticon.
    4. In script mode, click **icon.
    5. In import Template dialog box SOURCE type, data source, target type and data source, and click confirm.
    6. Enter the script content.
      In this example, enter the following content. For more information about the parameters in the script content, see Configure Elasticsearch Reader.
      {
          "type": "job",
          "steps": [
              {
                  "stepType": "elasticsearch",
                  "parameter": {
                      "retryCount": 3,
                      "column": [
                          "age",
                          "job",
                          "marital",
                          "education",
                          "default",
                          "housing",
                          "loan",
                          "contact",
                          "month",
                          "day_of_week",
                          "duration",
                          "campaign",
                          "pdays",
                          "previous",
                          "poutcome",
                          "emp_var_rate",
                          "cons_price_idx",
                          "cons_conf_idx",
                          "euribor3m",
                          "nr_employed",
                          "y"
                      ],
                      "scroll": "1m",
                      "index": "es_index",
                      "pageSize": 1,
                      "sort": {
                          "age": "asc"
      },
                      "type": "elasticsearch",
                      "connTimeOut": 1000,
                      "retrySleepTime": 1000,
                      "endpoint": "http://es-cn-xxxx.xxxx.xxxx.xxxx.com:9200",
                      "password": "xxxx",
                      "search": {
                          "match_all": {}
                      },
                      "readTimeOut": 5000,
                      "username": "xxxx"
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "odps",
                  "parameter": {
                      "partition": "",
                      "truncate": true,
                      "compress": false,
                      "datasource": "odps_first",
                      "column": [
                          "age",
                          "job",
                          "marital",
                          "education",
                          "default",
                          "housing",
                          "loan",
                          "contact",
                          "month",
                          "day_of_week",
                          "duration",
                          "campaign",
                          "pdays",
                          "previous",
                          "poutcome",
                          "emp_var_rate",
                          "cons_price_idx",
                          "cons_conf_idx",
                          "euribor3m",
                          "nr_employed",
                          "y"
                      ],
                      "emptyAsNull": false,
                      "table": "elastic2mc_bankdata"
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "version": "2.0",
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          },
          "setting": {
              "errorLimit": {
                  "record": "0"
              },
              "speed": {
                  "throttle": false,
                  "concurrent": 1,
                  "dmu": 1
              }
          }
      }
      Note To view the public IP address and port number of the created Elasticsearch cluster, follow these steps: Log on to the Elasticsearch console, go to the Basic Information page of the cluster, and view the values of Public IP and Public Port.
    7. Click the ** icon to run the created node.
    8. View the running result of the node on the Log tab.
  4. View the data synchronization result.
    1. Right-click the workflow and choose new > MaxCompute > ODPS SQL.
    2. In create a node dialog box, enter node name, and click submit.
    3. On the configuration tab of the ODPS SQL node, enter the following statement:
      SELECT * FROM elastic2mc_bankdata;
    4. Click ** icon to run the code.
    5. You can operation Log view the results.