All Products
Search
Document Center

Elasticsearch:Use DataWorks to synchronize data from MaxCompute to Alibaba Cloud Elasticsearch

Last Updated:Jan 16, 2024

If you want to perform operations such as information retrieval, multi-dimensional queries, and statistical analysis on large volumes of data in MaxCompute, you can use the Data Integration service of DataWorks to synchronize the data to Alibaba Cloud Elasticsearch. The data can be synchronized within minutes or a longer period of time. This topic describes how to use the Data Integration service to synchronize data from MaxCompute to Alibaba Cloud Elasticsearch in offline mode.

Background information

DataWorks is an end-to-end big data development and governance platform based on big data compute engines. DataWorks provides features such as data development, task scheduling, and data management. The Data Integration service of DataWorks can collect offline data at a minimum interval of 5 minutes. You can create batch synchronization tasks in DataWorks to rapidly synchronize data from various data sources to Alibaba Cloud Elasticsearch in offline mode.

  • The following types of data sources are supported:

    • Alibaba Cloud databases: ApsaraDB RDS for MySQL, ApsaraDB RDS for PostgreSQL, ApsaraDB RDS for SQL Server, ApsaraDB for MongoDB, and ApsaraDB for HBase

    • Alibaba Cloud PolarDB for Xscale (PolarDB-X) (formerly DRDS)

    • Alibaba Cloud MaxCompute

    • Alibaba Cloud Object Storage Service (OSS)

    • Alibaba Cloud Tablestore

    • Self-managed databases: HDFS, Oracle, FTP, Db2, MySQL, PostgreSQL, SQL Server, MongoDB, and HBase

  • The following synchronization scenarios are supported:

    • Synchronize big data to Alibaba Cloud Elasticsearch in offline mode.

    • Synchronize all data in a table to Alibaba Cloud Elasticsearch.

Prerequisites

Note
  • You can synchronize data only to Alibaba Cloud Elasticsearch. Self-managed Elasticsearch is not supported.

  • The MaxCompute project, Elasticsearch cluster, and DataWorks workspace must reside in the same region.

  • The MaxCompute project, Elasticsearch cluster, and DataWorks workspace must be in the same time zone. Otherwise, if you synchronize time-related data, the data in the source and the data in the destination after the synchronization may have a time zone difference.

Billing

Procedure

Step 1: Prepare source data

Create a table in MaxCompute and import data into the table. For more information, see Create tables and Import data to tables.

The following figures show the table schema and a part of the table data.

  • Table schema表结构

  • Table data表数据

Step 2: Create and configure an exclusive resource group for Data Integration

Create an exclusive resource group for Data Integration, and associate the resource group with a virtual private cloud (VPC) and the created workspace. Exclusive resource groups ensure fast and stable data transmission.

  1. Log on to the DataWorks console.

  2. In the top navigation bar, select a region. In the left-side navigation pane, click Resource Groups.

  3. On the Exclusive Resource Groups tab of the Resource Groups page, click Create Resource Group for Data Integration.

  4. On the DataWorks Exclusive Resources page, set Type to Exclusive Resource Groups for Data Integration, configure Resource Group Name, and then click Buy Now.

    For more information, see Create an exclusive resource group for Data Integration.

  5. On the Exclusive Resource Groups tab, find the created resource group and click Network Settings in the Actions column to associate the resource group with a VPC. For more information, see Associate the exclusive resource group for Data Integration with a VPC.

    Note

    In this example, an exclusive resource group for Data Integration is used to synchronize data over a VPC. For information about how to use an exclusive resource group for Data Integration to synchronize data over the Internet, see Add the EIP or CIDR block of an exclusive resource group for Data Integration to an IP address whitelist of a data source.

    The exclusive resource group must be connected to the VPC where the Elasticsearch cluster resides. This way, data can be synchronized based on the exclusive resource group. Therefore, you must associate the exclusive resource group with the VPC, zone, and vSwitch of the Elasticsearch cluster. For information about how to view the VPC, zone, and vSwitch of the Elasticsearch cluster, see View the basic information of a cluster.

    Important

    After you associate the exclusive resource group with the VPC, you need to add the CIDR block of the vSwitch to which the Elasticsearch cluster belong to a private IP address whitelist of the Elasticsearch cluster. For more information, see Configure a public or private IP address whitelist for an Elasticsearch cluster.

  6. Click the back icon in the upper-left corner of the page to return to the Resource Groups page.

  7. On the Exclusive Resource Groups tab, find the resource group and click Change Workspace in the Actions column to associate the resource group with the created workspace.

    For more information, see Associate the exclusive resource group for Data Integration with a workspace.

Step 3: Add data sources

Add the MaxCompute project and Elasticsearch cluster to Data Integration as data sources.

  1. Go to the Data Integration page.

    1. Log on to the DataWorks console.

    2. In the left-side navigation pane, click Workspaces.

    3. Find the workspace and choose Shortcuts > Data Integration in the Actions column.

  2. In the left-side navigation pane of the Data Integration page, click Data Source.

  3. Add a MaxCompute data source.

    1. On the Data Source List page, click Add Data Source.

    2. In the Add data source dialog box, search for and select MaxCompute.

    3. In the Add MaxCompute data source dialog box, configure the parameters related to basic information.

      For more information, see Add a MaxCompute data source.

    4. Find the resource group in the resource group list in the lower part of the dialog box and click Test Connectivity. If Connected is displayed, the resource group is connected to the ApsaraDB RDS for MySQL data source.

    5. Click Complete.

  4. Add an Elasticsearch data source in the same way. For more information, see Add an Elasticsearch data source.

Step 4: Configure and run a batch synchronization task

The exclusive resource group is used to run the batch synchronization task. The resource group obtains data from the source and writes the data to the Elasticsearch cluster.

Note

You can use the codeless UI or code editor to configure the batch synchronization task. In this example, the codeless UI is used. For information about how to use the code editor to configure the batch synchronization task, see Configure a batch synchronization task by using the code editor and Elasticsearch Writer.

  1. Go to the DataStudio page of DataWorks.

    1. Log on to the DataWorks console.

    2. In the left-side navigation pane, click Workspaces.

    3. Find the workspace and choose ShortcutsData Integration Shortcuts > Data Integration in the Actions column.

  2. Create a batch synchronization task.

    1. In the left-side navigation pane, choose Create > Create Workflow to create a workflow.

    2. Right-click the name of the newly created workflow and choose Create Node > Offline synchronization.

    3. In the Create Node dialog box, configure the Name parameter and click Confirm.

  3. Configure the network and resources.

    1. For the source part, set Source to MaxCompute(ODPS) and Data Source Name to the name of the added MaxCompute data source.

    2. For the resource group part, select the created exclusive resource group.

    3. For the destination part, set Destination to Elasticsearch and Data Source Name to the name of the added Elasticsearch data source.

  4. Click Next.

  5. Configure the task.

    1. In the Source section, select the table whose data you want to synchronize.

    2. In the Destination section, configure the parameters.

    3. In the Field Mapping section, configure mappings between source fields and destination fields.

    4. In the Channel Control section, configure the parameters.

    For more information, see Configure a batch synchronization task by using the codeless UI.

  6. Run the task.

    1. (Optional) Configure scheduling properties for the task. In the right-side navigation pane, click Properties. On the Properties tab, configure the parameters based on your business requirements. For more information about the parameters, see Scheduling configuration.

    2. In the upper-left corner, click the Save icon to save the task.

    3. In the upper-left corner, click the Submit icon to submit the task.

      If you configure scheduling properties for the task, the task is automatically run on a regular basis. You can also click the Run icon in the upper-left corner to run the task immediately.

      If Shell run successfully! is displayed in operational logs, the task runs successfully. The following code shows a part of operational logs:

      2023-10-31 16:52:35 INFO Exit code of the Shell command 0
      2023-10-31 16:52:35 INFO --- Invocation of Shell command completed ---
      2023-10-31 16:52:35 INFO Shell run successfully!
      2023-10-31 16:52:35 INFO Current task status: FINISH
      2023-10-31 16:52:35 INFO Cost time is: 33.106s

Step 5: Verify the data synchronization result

In the Kibana console, view the synchronized data and search for data based on specific conditions.

  1. Log on to the Kibana console of the Elasticsearch cluster.

    For more information, see Log on to the Kibana console.

  2. Click the 菜单.png icon in the upper-left corner of the page that appears and select Dev Tools.

  3. On the Console tab, run the following command to view the synchronized data:

    POST /odps_index/_search?pretty
    {
    "query": { "match_all": {}}
    }
    Note

    Change odps_index to the value that you specified for the Index parameter when you configure the task.

    If the data is synchronized, the result shown in the following figure is returned.查看同步的数据

  4. Run the following command to search for the category and brand fields in the data:

    POST /odps_index/_search?pretty
    {
    "query": { "match_all": {} },
    "_source": ["category", "brand"]
    }
  5. Run the following command to search for documents in which the value of the category field is fresh:

    POST /odps_index/_search?pretty
    {
    "query": { "match": {"category":"fresh"} }
    }
  6. Run the following command to sort documents based on the trans_num field:

    POST /odps_index/_search?pretty
    {
    "query": { "match_all": {} },
    "sort": { "trans_num": { "order": "desc" } }
    }

    For more information about other commands and their use scenarios, see open source Elasticsearch documentation.