All Products
Search
Document Center

Elasticsearch:Use DataWorks to synchronize data from PolarDB-X (DRDS) to Alibaba Cloud ES

Last Updated:Mar 30, 2026

Use DataWorks Data Integration to batch-synchronize large volumes of data from PolarDB for Xscale (PolarDB-X) to Alibaba Cloud Elasticsearch (ES) in minutes.

How it works

DataWorks Data Integration uses an exclusive resource group as the execution engine. The resource group connects to your PolarDB-X instance and Elasticsearch cluster over a virtual private cloud (VPC), retrieves records from the source table, and writes them to the target ES index in a single batch run. All three components — the PolarDB-X instance, the ES cluster, and the DataWorks workspace — must share the same region and time zone so the resource group can reach both endpoints without cross-region latency or timestamp skew.

Limitations

  • Only Alibaba Cloud Elasticsearch clusters are supported as sync targets. Self-managed Elasticsearch clusters are not supported.

  • The PolarDB-X instance, ES cluster, and DataWorks workspace must be in the same region.

  • The PolarDB-X instance, ES cluster, and DataWorks workspace must be in the same time zone. A time zone mismatch causes timestamp fields in the synced data to shift.

Prerequisites

Before you begin, make sure you have:

Billing

Step 1: Prepare source data

Insert the data you want to sync into the PolarDB-X 1.0 instance. For SQL syntax, see Basic SQL operations.

The following figure shows the test data used in this tutorial.Basic SQL operations

Test data

Step 2: Purchase and create an exclusive resource group

An exclusive resource group runs the batch sync job and ensures fast, stable data transfer. After purchasing the resource group, attach it to the VPCs of both the PolarDB-X and Elasticsearch instances, then associate it with your DataWorks workspace.

  1. Log on to the DataWorks consoleDataWorks console.

  2. In the top menu bar, select a region. In the left navigation pane, click Resource Group.

  3. On the Resource Groups tab, click Create Resource Group > Data Integration Resource Group.

  4. On the DataWorks Exclusive Resources (Subscription) page, set Resource Type to Exclusive Resource Group For Data Integration, enter a name, and click Buy Now. For details, see Step 1: Create an exclusive resource group for Data Integration.

  5. In the Actions column of the resource group you created, click Network Settings to attach a VPC. The resource group must connect to the VPCs of both the PolarDB-X and Elasticsearch instances. Attach the resource group to the VPC, Zone, and vSwitch of each instance. To look up VPC details for an instance, see View the basic information of an Elasticsearch instance. For details, see Attach a VPC.

    Important

    After attaching the VPC, add the CIDR block of the vSwitch to the private access whitelists of both the PolarDB-X and Elasticsearch instances. See Configure a public or private access whitelist for an Elasticsearch instance.

    This tutorial uses a VPC connection. To sync over the Internet instead, see Configure an IP address whitelist.
  6. In the upper-left corner, click the back icon to return to the Resource Groups page.

  7. Find the resource group and click Attach Workspace in the Actions column to associate it with your target workspace. For details, see Step 2: Associate the exclusive resource group for Data Integration with a workspace.

Step 3: Add data sources

Add the PolarDB-X and Elasticsearch data sources in DataWorks Data Integration.

  1. Go to the Data Integration page.

    1. Log on to the DataWorks consoleDataWorks console.

    2. In the left navigation pane, click Workspace.

    3. Find your workspace and choose Shortcuts > Data Integration in the Actions column.

  2. In the left navigation pane, click Data Source.

  3. Click Add Data Source.

  4. Search for and select DRDS.

  5. On the Add DRDS Data Source page, fill in the connection parameters and run the connectivity test. After the test passes, click Complete. For parameter details, see Add a PolarDB-X data source.

  6. Add the Elasticsearch data source the same way. See Add an Elasticsearch data source.

Step 4: Configure and run a batch synchronization task

The batch sync task runs on the exclusive resource group. The resource group reads data from the PolarDB-X source and writes it to the Elasticsearch index.

This tutorial uses the codeless UI in legacy Data Development (DataStudio). To use the code editor instead, see Configure a batch synchronization task using the code editor and Elasticsearch Writer.
  1. Go to the Data Development page.

    1. Log on to the DataWorks consoleDataWorks console.

    2. In the left navigation pane, click Workspaces.

    3. In the Actions column of your workspace, choose Quick Access > Data Development.

  2. Create a batch synchronization task.

    1. In the left navigation pane, go to the Data Development tab. Click the image icon and choose New > Business Flow. Create a business flow as prompted.

    2. Right-click the business flow and choose Create Node > Batch Synchronization.

    3. In the Create Node dialog box, enter a name and click Confirm.

  3. Configure the network and resources.

    1. In the Source section, set Source to DRDS and Data Source to the name of your PolarDB-X data source.

    2. In the Resource Group section, select the exclusive resource group you created.

    3. In the Destination section, set Destination to Elasticsearch and Data Source to the name of your Elasticsearch data source.

  4. Click Next.

  5. Configure the sync task.

    1. In the Source section, select the source table.

    2. In the Destination section, configure the destination parameters.

    3. In the Field Mapping section, map Source Fields to Target Fields. In this example, the source fields are kept as-is and only the destination fields are customized. To the right of Destination Field, click the 修改字段图标 icon and enter the field definitions:

      {"name":"Name","type":"text"}
      {"name":"Platform","type":"text"}
      {"name":"Year_of_Release","type":"date"}
      {"name":"Genre","type":"text"}
      {"name":"Publisher","type":"text"}
      {"name":"na_Sales","type":"float"}
      {"name":"EU_Sales","type":"float"}
      {"name":"JP_Sales","type":"float"}
      {"name":"Other_Sales","type":"float"}
      {"name":"Global_Sales","type":"float"}
      {"name":"Critic_Score","type":"long"}
      {"name":"Critic_Count","type":"long"}
      {"name":"User_Score","type":"float"}
      {"name":"User_Count","type":"long"}
      {"name":"Developer","type":"text"}
      {"name":"Rating","type":"text"}

      For the full list of destination parameters, see Configure an offline sync task in codeless UI.

    4. In the Channel Control section, configure the channel parameters.

  6. Run the task. When the task completes successfully, the log contains the message Shell run successfully!.

    1. (Optional) Click Scheduling Configuration on the right side of the page to set up a recurring schedule. See Scheduling Configuration.

    2. In the toolbar, click the Save icon.

    3. Click the Submit icon to submit the task.

      • If you configured a schedule, the task runs automatically at the scheduled times.

      • To run the task immediately, click the Run icon in the toolbar.

Step 5: Verify the sync results

  1. Log on to the Kibana console of your Elasticsearch instance. See Log on to the Kibana console.

  2. In the left navigation pane, click Dev Tools.

  3. In the Console, run the following query to count the synced documents. Compare this number against the row count in your source table to confirm all records were transferred.

    GET drdstest/_search
    {
      "query": {
        "match_all": {}
      }
    }

    If the command is successful, the following result is returned.

    查看目标端数据量

  4. Run the following query to filter documents by a specific field value:

    GET drdstest/_search
    {
      "query": {
        "term": {
          "Publisher.keyword": {
            "value": "Nintendo"
          }
        }
      }
    }

    If the command runs successfully, the following output is returned.

    对字段进行数据检索

What's next