All Products
Search
Document Center

Elasticsearch:Use DataWorks to synchronize data from Hadoop to Alibaba Cloud Elasticsearch

Last Updated:Jan 16, 2024

When you use Hadoop to perform interactive big data analytics and queries, the process may be time-consuming. To address this issue, you can synchronize data from Hadoop to Alibaba Cloud Elasticsearch for analytics and queries. Elasticsearch can respond to multiple types of queries within seconds, especially ad hoc queries. This topic describes how to use the Data Integration service of DataWorks to quickly synchronize data from Hadoop to Alibaba Cloud Elasticsearch.

Background information

DataWorks is an end-to-end big data development and governance platform based on big data compute engines. DataWorks provides features such as data development, task scheduling, and data management. The Data Integration service of DataWorks can collect offline data at a minimum interval of 5 minutes. You can create batch synchronization tasks in DataWorks to rapidly synchronize data from various data sources to Alibaba Cloud Elasticsearch in offline mode.

  • The following types of data sources are supported:

    • Alibaba Cloud databases: ApsaraDB RDS for MySQL, ApsaraDB RDS for PostgreSQL, ApsaraDB RDS for SQL Server, ApsaraDB for MongoDB, and ApsaraDB for HBase

    • Alibaba Cloud PolarDB for Xscale (PolarDB-X) (formerly DRDS)

    • Alibaba Cloud MaxCompute

    • Alibaba Cloud Object Storage Service (OSS)

    • Alibaba Cloud Tablestore

    • Self-managed databases: HDFS, Oracle, FTP, Db2, MySQL, PostgreSQL, SQL Server, MongoDB, and HBase

  • The following synchronization scenarios are supported:

    • Synchronize big data to Alibaba Cloud Elasticsearch in offline mode.

    • Synchronize all data in a table to Alibaba Cloud Elasticsearch.

Prerequisites

  • A Hadoop cluster is created.

    In this example, the Alibaba Cloud E-MapReduce (EMR) service is used to automatically create a Hadoop cluster. For more information, see Create a cluster.

    Some configurations of the EMR Hadoop cluster:

    • Cluster Type: Hadoop

    • EMR Version: EMR-3.26.3

    • Assign Public IP Address: turned on

  • An Alibaba Cloud Elasticsearch cluster is created, and the Auto Indexing feature is enabled for the cluster. For more information, see Create an Alibaba Cloud Elasticsearch cluster and Configure the YML file.

  • A DataWorks workspace is created. For more information, see Create a workspace.

Note
  • You can synchronize data only to Alibaba Cloud Elasticsearch. Self-managed Elasticsearch is not supported.

  • The EMR Hadoop cluster, Elasticsearch cluster, and DataWorks workspace must reside in the same region.

  • The EMR Hadoop cluster, Elasticsearch cluster, and DataWorks workspace must be in the same time zone. Otherwise, if you synchronize time-related data, the data in the source and the data in the destination after the synchronization may have a time zone difference.

Billing

Procedure

Step 1: Prepare source data

  1. Log on to the EMR console.

  2. In the top navigation bar, select a region.

  3. In the left-side navigation pane, click Data Development (Old).

  4. On the page that appears, click Create Project to create a data development project. In this step, set Select Resource Group to default resource group.

    For more information, see Manage projects.

  5. In the Projects section, find the newly created project and click Edit Job in the Actions column to create a job.

    For more information, see Edit jobs. In this step, set Job Type to Hive.

  6. Create a data table and insert data into the table.

    1. In the code editor, enter an SQL statement to create a Hive table. Then, click Run.

      In this example, the following statement is used:

      CREATE TABLE IF NOT
      EXISTS hive_esdoc_good_sale(
       create_time timestamp,
       category STRING,
       brand STRING,
       buyer_id STRING,
       trans_num BIGINT,
       trans_amount DOUBLE,
       click_cnt BIGINT
       )
       PARTITIONED BY (pt string) ROW FORMAT
       DELIMITED FIELDS TERMINATED BY ',' lines terminated by '\n'
    2. In the Run Job dialog box, configure the parameters and click OK.

      • Set Select Resource Group to default resource group.

      • Set Target Cluster to the cluster that you created.

    3. Create another job. In the code editor, enter the following SQL statement to insert test data.

      You can import test data from Object Storage Service (OSS) or other data sources to the table. You can also manually insert test data into the table. In this example, data is manually inserted.

      insert into
      hive_esdoc_good_sale PARTITION(pt =1 ) values('2018-08-21','Coat','Brand A','lilei',3,500.6,7),('2018-08-22','Fresh','Brand B','lilei',1,303,8),('2018-08-22','Coat','Brand C','hanmeimei',2,510,2),(2018-08-22,'Bathroom','Brand A','hanmeimei',1,442.5,1),('2018-08-22','Fresh','Brand D','hanmeimei',2,234,3),('2018-08-23','Coat','Brand B','jimmy',9,2000,7),('2018-08-23','Fresh','Brand A','jimmy',5,45.1,5),('2018-08-23','Coat','Brand E','jimmy',5,100.2,4),('2018-08-24','Fresh','Brand G','peiqi',10,5560,7),('2018-08-24','Bathroom','Brand F','peiqi',1,445.6,2),('2018-08-24','Coat','Brand A','ray',3,777,3),('2018-08-24','Bathroom','Brand G','ray',3,122,3),('2018-08-24','Coat','Brand C','ray',1,62,7) ;
  7. Check whether the data is inserted.

    1. Create a job for an ad hoc query.

      For more information, see Perform ad hoc queries.

    2. Enter the following SQL statement and click Run.

      select * from hive_esdoc_good_sale where pt =1;
    3. In the lower part of the page, click the Records tab. On this tab, click Details in the Action column. The Scheduling Center tab appears.

    4. On the Scheduling Center tab, click the Execution Result tab.

      Then, you can check whether the data is inserted into the Hive table of the Hadoop cluster for synchronization. The following figure shows the inserted data.查看测试数据

Step 2: Create an exclusive resource group for Data Integration

Create an exclusive resource group for Data Integration, and associate the resource group with a virtual private cloud (VPC) and the created workspace. Exclusive resource groups ensure fast and stable data transmission.

  1. Log on to the DataWorks console.

  2. In the top navigation bar, select a region. In the left-side navigation pane, click Resource Groups.

  3. On the Exclusive Resource Groups tab of the Resource Groups page, click Create Resource Group for Data Integration.

  4. On the DataWorks Exclusive Resources page, set Type to Exclusive Resource Groups for Data Integration, configure Resource Group Name, and then click Buy Now.

    For more information, see Create an exclusive resource group for Data Integration.

  5. On the Exclusive Resource Groups tab, find the created resource group and click Network Settings in the Actions column to associate the resource group with a VPC. For more information, see Associate the exclusive resource group for Data Integration with a VPC.

    Note

    In this example, an exclusive resource group for Data Integration is used to synchronize data over a VPC. For information about how to use an exclusive resource group for Data Integration to synchronize data over the Internet, see Add the EIP or CIDR block of an exclusive resource group for Data Integration to an IP address whitelist of a data source.

    The exclusive resource group must be connected to the VPC where the EMR Hadoop cluster resides and the VPC where the Elasticsearch cluster resides. This way, data can be synchronized based on the exclusive resource group. Therefore, you must associate the exclusive resource group with the VPC, zone, and vSwitch of the EMR Hadoop cluster and with those of the Elasticsearch cluster. For information about how to view the VPC, zone, and vSwitch of the Elasticsearch cluster, see View the basic information of a cluster.

    Important

    After you associate the exclusive resource group with the VPCs, you need to add the CIDR blocks of the vSwitches to which the EMR Hadoop cluster and the Elasticsearch cluster belong to the private IP address whitelists of the EMR Hadoop cluster and Elasticsearch cluster. For more information, see Configure a public or private IP address whitelist for an Elasticsearch cluster.

  6. Click the back icon in the upper-left corner of the page to return to the Resource Groups page.

  7. On the Exclusive Resource Groups tab, find the resource group and click Change Workspace in the Actions column to associate the resource group with the created workspace.

    For more information, see Associate the exclusive resource group for Data Integration with a workspace.

Step 3: Add data sources

  1. Go to the Data Integration page.

    1. Log on to the DataWorks console.

    2. In the left-side navigation pane, click Workspaces.

    3. Find the workspace and choose Shortcuts > Data Integration in the Actions column.

  2. In the left-side navigation pane of the Data Integration page, click Data Source.

  3. Add a Hadoop data source.

    1. On the Data Source List page, click Add Data Source.

    2. In the Add data source dialog box, search for and select HDFS.

    3. In the Add HDFS data source dialog box, configure the parameters.

      For more information, see Add an HDFS data source.

    4. Find the resource group in the resource group list in the lower part of the dialog box and click Test Connectivity. If Connected is displayed, the resource group is connected to the Hadoop data source.

    5. Click Complete.

  4. Add an Elasticsearch data source in the same way. For more information, see Add an Elasticsearch data source.

Step 4: Configure and run a batch synchronization task

The exclusive resource group is used to run the batch synchronization task. The resource group obtains data from the source and writes the data to the Elasticsearch cluster.

Note

You can use the codeless UI or code editor to configure the batch synchronization task. In this example, the codeless UI is used. For information about how to use the code editor to configure the batch synchronization task, see Configure a batch synchronization task by using the code editor and Elasticsearch Writer.

  1. Go to the DataStudio page of DataWorks.

    1. Log on to the DataWorks console.

    2. In the left-side navigation pane, click Workspaces.

    3. Find the workspace and choose Shortcuts > Data Integration in the Actions column.

  2. Create a batch synchronization task.

    1. In the left-side navigation pane, choose Create > Create Workflow to create a workflow.

    2. Right-click the name of the newly created workflow and choose Create Node > Offline synchronization.

    3. In the Create Node dialog box, configure the Name parameter and click Confirm.

  3. Configure the network and resources.

    1. For the source part, set Source to HDFS and Data Source Name to the name of the added Hadoop data source.

    2. For the resource group part, select the created exclusive resource group.

    3. For the destination part, set Destination to Elasticsearch and Data Source Name to the name of the added Elasticsearch data source.

  4. Click Next.

  5. Configure the task.

    1. In the Source section, select the table whose data you want to synchronize.

    2. In the Destination section, configure the parameters.

    3. In the Field Mapping section, configure mappings between source fields and destination fields.

    4. In the Channel Control section, configure the parameters.

    For more information, see Configure a batch synchronization task by using the codeless UI.

  6. Run the task.

    1. (Optional) Configure scheduling properties for the task. In the right-side navigation pane, click Properties. On the Properties tab, configure the parameters based on your business requirements. For more information about the parameters, see Scheduling configuration.

    2. In the upper-left corner, click the Save icon to save the task.

    3. In the upper-left corner, click the Submit icon to submit the task.

      If you configure scheduling properties for the task, the task is automatically run on a regular basis. You can also click the Run icon in the upper-left corner to run the task immediately.

      If Shell run successfully! is displayed in operational logs, the task runs successfully.

Step 5: Verify the data synchronization result

  1. Log on to the Kibana console of the Elasticsearch cluster.

    For more information, see Log on to the Kibana console.

  2. In the left-side navigation pane, click Dev Tools.

  3. On the Console tab, run the following command to view the synchronized data:

    POST /hive_esdoc_good_sale/_search?pretty
    {
    "query": { "match_all": {}}
    }
    Note

    Change hive_esdoc_good_sale to the value that you specified for the Index parameter when you configure the synchronization task.

    If the data is synchronized, the result shown in the following figure is returned.查看已经同步的数据

  4. Run the following command to search for all documents that contain Brand A:

    POST /hive_esdoc_good_sale/_search?pretty
    {
      "query": { "match_phrase": { "brand":"Brand A" } }
    }

    返回品牌为A的所有文档

  5. Run the following command to sort products of each brand based on the number of clicks. Then, determine the popularity of the products.

    POST /hive_esdoc_good_sale/_search?pretty
    {
    "query": { "match_all": {} },
    "sort": { "click_cnt": { "order": "desc" } },
    "_source": ["category", "brand","click_cnt"]
    }

    按照点击次数进行排序

    For more information about other commands and their use scenarios, see Alibaba Cloud Elasticsearch documentation and open source Elasticsearch documentation.