Alibaba Cloud provides a wide array of cloud storage and database services. You can use Data Integration of DataWorks to synchronize data from storage or database services to Alibaba Cloud Elasticsearch (ES), and then query or analyze the data. Data Integration allows you to synchronize data at a minimum interval of five minutes.

Notice Data synchronization may incur Internet data transfer fees.

Overview

Follow these steps to analyze and search data stored in a database:

  1. Create a database. You can use an ApsaraDB for RDS database, or create a database on your local server. In this topic, an ApsaraDB RDS for MySQL database is used. Join two MySQL tables and then synchronize the data to Elasticsearch. The following figures show the two MySQL tables.
    Figure 1. Table 1
    Table 1
    Figure 2. Table 2
    Table 2
  2. Purchase an Elastic Compute Service (ECS) instance that can be connected to your Elasticsearch instance through the VPC network. The ECS instance is used to read data from the MySQL database and run a synchronization task to write data to Elasticsearch. The synchronization task is dispatched by Data Integration.
  3. Activate Data Integration, and add the ECS instance to Data Integration as a resource to run the synchronization task.
  4. Create a data synchronization script and run the script periodically.
  5. Create an Elasticsearch instance to store the data synchronized by Data Integration.

Preparation

  1. Create a VPC network and a VSwitch.
  2. Create an Alibaba Cloud Elasticsearch instance.
    Notice The region, VPC network, and VSwitch of the Elaticsearch instance must be the same as those specified in step 1.
  3. Purchase an ECS instance that is connected to the same VPC network as your Elasticsearch instance, and assign a public IP address or Elastic IP address (EIP) to the instance.
    Note
    • You can also use an existing ECS instance.
    • We recommend that you choose CentOS 6, CentOS 7, or AliyunOS for your ECS instance.
    • If you want to run MaxCompute or synchronization tasks on the ECS instance, verify that the ECS instance runs Python V2.6 or V2.7. CentOS 5 uses Python V2.4. Other operating systems use a Python version later than V2.6.
    • Make sure that your ECS instance is assigned a public IP address.

Create a resource group and add a data source

  1. Log on to the DataWorks console, and click the Workspaces tab.
    • The following page is displayed if you have activated Data Integration.

      Data Integration activated
    • The following page is displayed if you have not activated Data Integration. Follow the steps to activate Data Integration. Data Integration is a charged service. Estimate the costs according to the billing items shown on the page.

      Data Integration not activated
  2. Click Data Integration in the Actions column for the purchased workspace.
  3. On the Data Integration page, choose Resource Groups in the left-side navigation pane, and then click Add Resource Group in the upper-right corner.
  4. Add a resource group.
    1. Add Server.

      Enter a resource group name, and specify the server information. In this example, add the purchased ECS instance. The server information is as follows.

      Add Resource Group
      Field Description
      ECS UUID Connect to an ECS instance, run dmidecode | grep UUID, and enter the returned value into this field.
      Server IP Address/Server CPU (Cores)/Server RAM (GB) The public IP address, CPU cores, and memory size of your ECS instance. To find the required information, log on to the ECS console and click the ECS instance name. The information is listed in the Configuration Information area.
    2. Install the Security Center agent.

      Follow the instructions on the page to install the agent. In step 5, open port 8000 of the ECS instance. You can also skip this step and use the default setting.

    3. Test Connection.
  5. Configure the MySQL database and Elasticsearch whitelists.

    Add the IP address of the ECS instance to the whitelists of the MySQL database and Elasticsearch instance so that the ECS instance can communicate with both of them.

  6. In the left-side navigation pane, choose Connections, and click Add Connection.
  7. Select MySQL. On the Add MySQL Connection page, enter the required information.Specify connection information

    Connect To: This example uses an ApsaraDB RDS for MySQL database. You can choose between User-created Data Store with Public IP Addresses and User-created Data Store without Public IP Addresses. For more information about the parameters, see Configure a MySQL connection.

    Notice If your ECS instance fails to connect to the database, check the whitelist of the database.

Create a synchronization task

  1. Log on to the DataWorks console as a node owner.
  2. On the Workspaces tab, click Data Analytics in the Actions column.
  3. On the Data Studio page, choose Create > Workflow.Create icon
  4. In the Create Workflow dialog box, specify the Workflow Name and Description, and then click Create.
  5. Expand the created workflow in the left-side workflow list, right-click Data Integration, and choose Create Data Integration Node > Sync.
  6. In the Create Node dialog box, specify the Node Name, and click Commit.
  7. On the node tab, click the Switch to Code Editor icon Switch to Code Editor in the top toolbar.
  8. Confirm the operation to switch to the code editor.

    For more information about how to use the code editor, see Create a sync node by using the code editor.

    The following script is an example for retrieving information about students and their examinations from two tables.
    {
       "type": "job",
       "steps": [
           {
               "stepType": "mysql",
               "parameter": {
                   "column": [
                       "id",
                       "name",
                       "sex",
                       "birth",
                       "department",
                       "address": {
                   ],
                   "connection": [
                       {
                           "querysql":["SELECT student.id,name,sex,birth,department,address,c_name,grade FROM student JOIN score on student.id=score.stu_id;"],
                           "datasource": "zl_****_rdsmysql",
                           "table": [
                               "score"
                           ]
                       }
                   ],
                   "where": "",
                   "splitPk": "",
                   "encoding": "UTF-8"
               },
               "name": "Reader",
               "category": "reader"
           },
           {
               "stepType": "elasticsearch",
               "parameter": {
                   "accessId": "elastic",
                   "endpoint": "http://es-cn-0p*********2dpxtx.elasticsearch.aliyuncs.com:9200",
                   "indexType": "score",
                   "accessKey": "******",
                   "cleanup": true,
                   "discovery": false,
                   "column": [
                       {
                           "name":"student_id",
                           "type":"id"
                       },
                        {
                           "name": "sex",
                           "type": "text"
                       },
                       {
                           "name": "name",
                           "type": "text"
                       },
                       {
                           "name": "birth",
                           "type": "integer"
                       },
                       {
                           "name": "quyu",
                           "type": "text"
                       },
                       {
                           "name": "address",
                           "type": "text"
                       },
                       {
                           "name": "cname",
                           "type": "text"
                       },
                       {
                           "name": "grades",
                           "type": "integer"
                       }
                   ],
                   "index": "mysqljoin",
                   "batchSize": 1000,
                   "splitter": ","
               },
               "name": "Writer",
               "category": "writer"
           }
       ],
       "version": "2.0",
       "order": {
           "hops": [
               {
                   "from": "Reader",
                   "to": "Writer"
               }
           ]
       },
       "setting": {
           "jvmOption": "-Xms1024m -Xmx1024m",
           "errorLimit": {
               "record": ""
           },
           "speed": {
               "throttle": false,
               "concurrent": 1
           }
       }
    }

    The script includes three parts:

    • Reader: This part is used to Configure MySQL Reader. querysql is used to define a SQL statement to retrieve data based on specified conditions. If querysql is configured, the MySQL Reader ignores the table, column, where, and splitPk conditions. This is because querysql has a higher priority than table, column, where, and splitPk. datasource uses querysql to parse the username and password information.
    • Writer: This part is used to Configure Elasticsearch Writer.
      • endpoint: The public or private network endpoint of the Elasticsearch instance. To connect to your Elasticsearch instance, you must configure the public or private network whitelist on the Security page of your Elasticsearch instance.
      • accessId/accessKey: The username and password of the Elasticsearch instance. The default username is elastic.
      • index: The name of the index on the Elasticsearch instance. To access the data stored in the index, you must specify the index name.
      • The columns in the Reader and Writer must be defined in the same order. This is because the Reader reads data from the specified columns and saves the data in arrays. The Writer then retrieves the data from the arrays and writes the data into the defined columns in sequence.
    • setting: This part is used to configure the packet loss and maximum concurrency settings.
  9. After you configure the script, click Resource Group in the upper-right corner, select the created resource group, and click the Run icon in the top toolbar to synchronize the MySQL data to Elasticsearch.Select the resource group

Verify the synchronization result

  1. Log on the Kibana console of your Elasticsearch instance.
  2. In the left-side navigation pane, choose Dev Tools.
  3. On the Console tab, run the following command to query the synchronized data.
    POST /mysqljoin/_search? pretty
    {
    "query": { "match_all": {}}
    }

    The string mysqljoin is the name of the index that stores the synchronized data.

FAQ

  • Q: How can I resolve a database connection error?

    A: Check whether the public or private IP address of the ECS instance in the resource group is added to the whitelist of your database. If not, add the IP address to the whitelist.

  • Q: How can I resolve an Elasticsearch instance connection error?

    A: Follow these steps to locate the cause:

    1. Click Resource Group in the upper-right corner of the code editor and check whether the resource group created in the preceding step is selected.
      • Yes. Proceed.
      • No. Click Resource Group and select the resource group that you have created. Then click the Run icon to run the script.
    2. Check whether the IP address of the ECS instance in the resource group is added to the whitelist of the Elasticsearch instance.
      • Yes. Proceed.
      • No. Add the IP address of the ECS instance to the whitelist of the Elasticsearch instance.
        Notice If the private IP address of the ECS instance is used, go to the Security page of the Elasticsearch instance, and add the IP address to the Elasticsearch system whitelist. If the public IP address of the ECS instance is used, go to the Security page of the Elasticsearch instance, and add the IP address to the public network whitelist.
    3. Check whether the configuration of the script is correct. The fields that you must check include endpoint (the public or private network endpoint of the Elasticsearch instance), accessId (the username of the Elasticsearch instance. The default username is elastic), and accessKey (the password of the Elasticsearch instance).