Alibaba Cloud provides a variety of cloud storage and database services. To search and analyze the data stored in these services, you can use Data Integration to collect offline data at five-minute intervals. Then, synchronize the data to an Alibaba Cloud Elasticsearch cluster. This topic describes how to synchronize data from MaxCompute to an Alibaba Cloud Elasticsearch cluster.

Notice Data synchronization may incur Internet traffic fees.

Overview

  1. Prepare the MaxCompute data source.
  2. Create an Alibaba Cloud Elastic Compute Service (ECS) instance that resides in the same Virtual Private Cloud (VPC) as your Elasticsearch cluster. This ECS instance collects data from data sources and writes the data to your Elasticsearch cluster. Data Integration issues the tasks to perform these operations.
  3. Activate Data Integration and add the ECS instance to it as a resource to run data synchronization tasks.
  4. Configure a data synchronization script and schedule it to run periodically.
  5. Create an Elasticsearch cluster to store the data synchronized by Data Integration.

Preparations

  1. Activate MaxCompute, create a table, and then import data into the table.

    For more information, see Activate MaxCompute, Create and view a table, and Import data.

    The following figures show the table schema and fields used in this example.
    Figure 1. Table schema
    Table schema
    Figure 2. Table fields
    Table fields
    Note The provided data is for testing only. You can migrate data from Hadoop to MaxCompute and then synchronize the data to your Elasticsearch cluster. For more information, see Best practices of migrating data from Hadoop to MaxCompute.
  2. Create a VPC and a VSwitch.
  3. Create an Elasticsearch cluster.
    Notice Make sure that the VPC you specify for the Elasticsearch cluster is the VPC created in the previous step.
  4. Enable the Auto Indexing feature for the Elasticsearch cluster. For more information, see Enable auto indexing.
  5. Create an ECS instance that is in the same region, zone, and VPC as the Elasticsearch cluster. Assign a public IP address to the ECS instance or bind an Elastic IP address to the ECS instance.
    Note
    • You can also use an existing ECS instance.
    • We recommend that you choose CentOS 6.X, CentOS 7.X, or Aliyun Linux for your ECS instance.
    • If you want the ECS instance to execute MaxCompute tasks or data synchronization tasks, make sure that the ECS instance runs Python 2.6 or 2.7. The default Python version in CentOS 5.X is 2.4, whereas other operating systems have Python 2.6 or later installed.
    • Make sure that your ECS instance is assigned a public IP address.

Create a resource group and add a data source

  1. Log on to the DataWorks console. In the left-side navigation pane, click Workspaces. On the Workspaces page, find the target workspace and click Data Integration in the Actions column.
    If you have activated Data Integration or DataWorks, the Overview page appears.Data Integration activated

    If you have not activated Data Integration or DataWorks, activate it as prompted. Data Integration or DataWorks activation incurs fees. Read and understand the description of charges before activation.

  2. In the left-side navigation pane of the Data Integration console, click Custom Resource Group.
  3. In the upper-right corner of the page that appears, click Add Resource Group to configure the ECS instance that you have created in the VPC as a scheduling resource.
    1. Add a server.

      Enter a resource group name and specify the required parameters. In this example, add the created ECS instance. The following table describes the server parameters.

      Add Resource Group
      Parameter Description
      ECS UUID Connect to an ECS instance, run the dmidecode | grep UUID command, and enter the return value into this field.
      Server IP, Server CPU (Cores), and Server RAM (GB) The public IP address, vCPUs, and memory size of the ECS instance. To obtain the information, log on to the ECS console and click the ECS instance name. The information is displayed in the Configuration Information section.
    2. Install an agent.

      Follow the instructions to install the agent. You can skip the step for enabling port 8000 and use the default settings.

    3. Check the connectivity.

      After the connection is successfully established, the status is changed to Available. If the status is Unavailable, you must log on to the ECS instance and run the tail -f /home/admin/alisatasknode/logs/heartbeat.log command to check whether the heartbeat between DataWorks and the ECS instance times out.

  4. Go to the Data Integration console. In the left-side navigation pane, click Connection. On the page that appears, click Add a Connection.
  5. In the Add Connection dialog box, click ODPS. In the Add ODPS Connection dialog box, specify the required parameters.Add the MaxCompute (ODPS) data source
    Parameter Description
    ODPS Endpoint The endpoint of MaxCompute, which varies depending on the region. For more information, see Configure endpoints.
    MaxCompute Project Name To obtain the project name, log on to the DataWorks console. In the left-side navigation pane, click Compute Engines and then MaxCompute.
    AccessKey ID and AccessKey Secret To obtain the information, move the pointer over your profile picture and click AccessKey.
    Notice Specify the parameters that are not listed in the preceding table as required.
  6. Click Test Connection.

    After the connection is successfully established, proceed with the following operations.

Create a data synchronization task

  1. Log on to the DataWorks console.
  2. On the Workspaces page, find the target workspace and click Data Analytics in the Actions column.
  3. In the DataStudio console, move the pointer over the Create icon and click Workflow.Create icon
  4. In the Create Workflow dialog box, set Workflow Name and Description. Then, click Create.
  5. In the navigation tree, right-click Data Integration and choose Create > Batch Synchronization.
  6. In the Create Node dialog box, set Node Name and click Commit.
  7. Click the Switch to Code Editor icon in the toolbar.
  8. Configure the data synchronization script.
    For more information, see Create a sync node by using the code editor.
    Note You can click the Apply Template icon in the toolbar to apply the required script configuration template. Then, modify the template as required.
    The following code provides a sample script:
    {
    "configuration": {
    "reader": {
    "plugin": "odps",
    "parameter": {
      "partition": "pt=1",
      "datasource": "odps_es",
      "column": [
        "create_time",
        "category",
        "brand",
        "buyer_id",
        "trans_num",
        "trans_amount",
        "click_cnt"
      ],
      "table": "hive_doc_good_sale"
    }
    },
    "writer": {
    "plugin": "elasticsearch",
    "parameter": {
      "accessId": "elastic",
      "endpoint": "http://es-cn-mpXXXXXXX.elasticsearch.aliyuncs.com:9200",
      "indexType": "elasticsearch",
      "accessKey": "XXXXXX",
      "cleanup": true,
      "discovery": false,
      "column": [
        {
          "name": "create_time",
          "type": "string"
        },
        {
          "name": "category",
          "type": "string"
        },
        {
          "name": "brand",
          "type": "string"
        },
        {
          "name": "buyer_id",
          "type": "string"
        },
        {
          "name": "trans_num",
          "type": "long"
        },
        {
          "name": "trans_amount",
          "type": "double"
        },
        {
          "name": "click_cnt",
          "type": "long"
        }
      ],
      "index": "es_index",
      "batchSize": 1000,
      "splitter": ","
    }
    },
    "setting": {
    "errorLimit": {
      "record": "0"
    },
    "speed": {
      "throttle": false,
      "concurrent": 1,
      "mbps": "1",
      "dmu": 1
    }
    }
    },
    "type": "job",
    "version": "1.0"
    }

    This script includes three parts:

    • reader: used to configure MaxCompute as the reader. If your MaxCompute table is a partitioned table, you must configure the partition information in the partition field. The partition information in this example is pt=1.
    • writer: used to configure the Elasticsearch cluster as the writer.
      • endpoint: the internal or public endpoint of the Elasticsearch cluster. To connect to your Elasticsearch cluster, you must configure the public network whitelist or VPC whitelist on the Security page of your Elasticsearch cluster. In this example, the internal endpoint of the Elasticsearch cluster is used, and the cluster is in the same VPC as the ECS instance in the resource group. Therefore, no whitelist is required.
        Notice If you use the public endpoint of the Elasticsearch cluster or the cluster and ECS instance in the resource group are in different VPCs, you must configure a whitelist on the Security page of the cluster, for more information, see Configure a whitelist to access an Elasticsearch cluster over the Internet or a VPC.
      • accessId: the username that is used to access the Elasticsearch cluster. Default value: elastic.
      • accessKey: the password that is used to access the Elasticsearch cluster.
      • index: the name of the index on the Elasticsearch cluster.
    • setting: used to configure the settings related to packet loss and maximum concurrency.
  9. After the synchronization script is configured, click Resource Group configuration and select the resource group. Then, click the Run icon icon to synchronize data from MaxCompute to your Elasticsearch cluster.Select a resource group

Verify the data synchronization result

  1. Log on the Kibana console of your Elasticsearch cluster.
  2. In the left-side navigation pane, click Dev Tools.
  3. On the Console tab of the page that appears, run the following command to query the synchronized data:
    POST /es_index/_search?pretty
    {
    "query": { "match_all": {}}
    }
    Note es_index is the value that you configured for the index field in the data synchronization script.
    Query the synchronized data
  4. Run the following command to query the category and brand fields in the data:
    POST /es_index/_search?pretty
    {
    "query": { "match_all": {} },
    "_source": ["category", "brand"]
    }
  5. Run the following command to query data entries where the category field is set to fresh:
    POST /es_index/_search?pretty
    {
    "query": { "match": {"category":"fresh"} }
    }
  6. Run the following command to sort the data based on the trans_num field:
    POST /es_index/_search?pretty
    {
    "query": { "match_all": {} },
    "sort": { "trans_num": { "order": "desc" } }
    }

    For more information, see open-source Elastic documentation.

FAQ

Problem description: An error occurred during the connection to the Elasticsearch cluster.

Solution

  1. Check whether you have selected the resource group that you created in the preceding step from Resource Group configuration.
    • If yes, go to the next step.
    • If no, click Resource Group configuration to select the correct one. Click the Run icon icon.
  2. The endpoint of the Elasticsearch cluster configured in the script is in a different VPC from the ECS instance in the resource group. In this case, check whether the IP address of the ECS instance in the resource group is added to the whitelist of the Elasticsearch cluster.
    • If yes, go to the next step.
    • If no, add the IP address of the ECS instance to the whitelist.
      Notice If you use the internal endpoint of the Elasticsearch cluster, log on to the Elasticsearch cluster and add the IP address of the ECS instance to VPC Whitelist on the Security page of the cluster. If you use the public endpoint of the Elasticsearch cluster, add the IP address of the ECS instance to Public Network Whitelist.
  3. Check whether the script is correctly configured. The fields that you must check include endpoint, accessId, and accessKey.