Alibaba Cloud provides a variety of cloud storage and database services. To search and analyze the data stored in these services, you can use Data Integration to collect offline data at five-minute intervals. Then, synchronize the data to an Alibaba Cloud Elasticsearch cluster. This topic uses ApsaraDB RDS for MySQL as an example.

Notice Data synchronization may incur Internet traffic fees.

Overview

  1. Prepare a MySQL data source.
  2. Create an Alibaba Cloud Elastic Compute Service (ECS) instance that resides in the same Virtual Private Cloud (VPC) as your Elasticsearch cluster. This ECS instance collects data from data sources and writes the data to your Elasticsearch cluster. Data Integration issues the tasks to perform these operations.
  3. Activate Data Integration and add the ECS instance to it as a resource to run data synchronization tasks.
  4. Configure a data synchronization script and schedule it to run periodically.
  5. Create an Elasticsearch cluster to store the data synchronized by Data Integration.

Preparations

  1. Create a database.

    You can use an ApsaraDB for RDS database or create a database on your local server. In this topic, an ApsaraDB RDS for MySQL database is used. Join two MySQL tables and then synchronize data to your Elasticsearch cluster. The following figures show the two MySQL tables. For more information, see Quick Start.

    Figure 1. Table 1
    Table 1
    Figure 2. Table 2
    Table 2
  2. Create a VPC and a VSwitch.
  3. Create an Elasticsearch cluster.
    Notice The region, VPC, and VSwitch that you specify for the Elasticsearch cluster must be the same as those created in the previous step.
  4. Enable the Auto Indexing feature for the Elasticsearch cluster. For more information, see Enable auto indexing.
  5. Create an ECS instance that is in the same region, zone, and VPC as the Elasticsearch cluster. Assign a public IP address to the ECS instance or bind an Elastic IP address to the ECS instance.
    Note
    • You can also use an existing ECS instance.
    • We recommend that you choose CentOS 6.X, CentOS 7.X, or Aliyun Linux for your ECS instance.
    • If you want the ECS instance to execute MaxCompute tasks or data synchronization tasks, make sure that the ECS instance runs Python 2.6 or 2.7. The default Python version in CentOS 5.X is 2.4, whereas other operating systems have Python 2.6 or later installed.
    • Make sure that your ECS instance is assigned a public IP address.

Create a resource group and add a data source

  1. Log on to the DataWorks console. In the left-side navigation pane, click Workspaces. On the Workspaces page, find the target workspace and click Data Integration in the Actions column.
    If you have activated Data Integration or DataWorks, the Overview page appears.Data Integration activated

    If you have not activated Data Integration or DataWorks, activate it as prompted. Data Integration or DataWorks activation incurs fees. Read and understand the description of charges before activation.

  2. In the left-side navigation pane of the Data Integration console, click Custom Resource Group.
  3. In the upper-right corner of the page that appears, click Add Resource Group to configure the ECS instance that you have created in the VPC as a scheduling resource.
    1. Add a server.

      Enter a resource group name and specify the required parameters. In this example, add the created ECS instance. The following table describes the server parameters.

      Add Resource Group
      Parameter Description
      ECS UUID Connect to an ECS instance, run the dmidecode | grep UUID command, and enter the return value into this field.
      Server IP The public IP address of the ECS instance. To obtain the information, log on to the ECS console and click the ECS instance name. The information is displayed in the Configuration Information section.
      Server CPU (Cores) The vCPUs of the ECS instance.
      Server RAM (GB) The memory size of the ECS instance.
    2. Install an agent.

      Follow the instructions to install the agent. You can skip the step for enabling port 8000 and use the default settings.

    3. Check the connectivity.

      After the connection is successfully established, the status is changed to Available. If the status is Unavailable, you must log on to the ECS instance and run the tail -f /home/admin/alisatasknode/logs/heartbeat.log command to check whether the heartbeat between DataWorks and the ECS instance times out.

  4. Configure the whitelists of the MySQL database and Elasticsearch cluster.

    Add the IP address of the ECS instance to the whitelists of the MySQL database and Elasticsearch cluster so that the ECS instance can communicate with both of them. For more information, see Configure a whitelist for an ApsaraDB RDS for MySQL instance and Configure a whitelist to access an Elasticsearch cluster over the Internet or a VPC.

  5. Go to the Data Integration console. In the left-side navigation pane, click Connection. On the page that appears, click Add a Connection.
  6. In the Add Connection dialog box, click MySQL. In the Add MySQL Connection dialog box, specify the required parameters.Parameter configuration

    Connect To: ApsaraDB for RDS is used in this example. You can also select Connection Mode. For more information about the parameters, see Configure a MySQL connection.

    Notice If your ECS instance fails to connect to the database, check whether the IP address of the instance is added to the whitelist of the database.

Create a data synchronization task

  1. Log on to the DataWorks console.
  2. On the Workspaces page, find the target workspace and click Data Analytics in the Actions column.
  3. In the DataStudio console, move the pointer over the Create icon and click Workflow.Create icon
  4. In the Create Workflow dialog box, set Workflow Name and Description. Then, click Create.
  5. In the navigation tree, right-click Data Integration and choose Create > Batch Synchronization.
  6. In the Create Node dialog box, set Node Name and click Commit.
  7. Click the Switch to Code Editor icon in the toolbar.
  8. Configure the data synchronization script.

    For more information, see Create a sync node by using the code editor.

    The following code provides an example script for retrieving information about students and their examinations from two tables:
    {
       "type": "job",
       "steps": [
           {
               "stepType": "mysql",
               "parameter": {
                   "column": [
                       "id",
                       "name",
                       "sex",
                       "birth",
                       "department",
                       "address"
                   ],
                   "connection": [
                       {
                           "querysql":["SELECT student.id,name,sex,birth,department,address,c_name,grade FROM student JOIN score on student.id=score.stu_id;"],
                           "datasource": "zl_test_rdsmysql",
                           "table": [
                               "score"
                           ]
                       }
                   ],
                   "where": "",
                   "splitPk": "",
                   "encoding": "UTF-8"
               },
               "name": "Reader",
               "category": "reader"
           },
           {
               "stepType": "elasticsearch",
               "parameter": {
                   "accessId": "elastic",
                   "endpoint": "http://es-cn-0p*********2dpxtx.elasticsearch.aliyuncs.com:9200",
                   "indexType": "score",
                   "accessKey": "******",
                   "cleanup": true,
                   "discovery": false,
                   "column": [
                       {
                           "name":"student_id",
                           "type":"id"
                       },
                        {
                           "name": "sex",
                           "type": "text"
                       },
                       {
                           "name": "name",
                           "type": "text"
                       },
                       {
                           "name": "birth",
                           "type": "integer"
                       },
                       {
                           "name": "quyu",
                           "type": "text"
                       },
                       {
                           "name": "address",
                           "type": "text"
                       },
                       {
                           "name": "cname",
                           "type": "text"
                       },
                       {
                           "name": "grades",
                           "type": "integer"
                       }
                   ],
                   "index": "mysqljoin",
                   "batchSize": 1000,
                   "splitter": ","
               },
               "name": "Writer",
               "category": "writer"
           }
       ],
       "version": "2.0",
       "order": {
           "hops": [
               {
                   "from": "Reader",
                   "to": "Writer"
               }
           ]
       },
       "setting": {
           "jvmOption": "-Xms1024m -Xmx1024m",
           "errorLimit": {
               "record": ""
           },
           "speed": {
               "throttle": false,
               "concurrent": 1
           }
       }
    }

    This script includes three parts:

    • Reader: used to configure the MySQL reader. querysql is used to define an SQL statement to retrieve data based on specified conditions. If querysql is configured, the reader ignores the table, column, where, and splitPk conditions. querysql has a higher priority than table, column, where, and splitPk. The datasource uses querysql to parse the username and password.
    • Writer: used to configure the Elasticsearch writer.
      • endpoint: the internal or public endpoint of the Elasticsearch cluster. To connect to your Elasticsearch cluster, you must configure the public network whitelist or VPC whitelist on the Security page of your Elasticsearch cluster.
      • accessId: the username that is used to access the Elasticsearch cluster. Default value: elastic.
      • accessKey: the password that is used to access the Elasticsearch cluster.
      • index: the name of the index on the Elasticsearch cluster.
      Notice Reader reads data from the specified fields and saves the data in arrays. Writer then retrieves the data from the arrays and writes the data into the defined fields in sequence. Therefore, the fields in the Reader and Writer parts must be defined in the same order.
    • setting: used to configure the settings related to packet loss and maximum concurrency.
  9. After the synchronization script is configured, click Resource Group configuration and select the resource group. Then, click the Run icon icon to synchronize data from the MySQL database to the Elasticsearch cluster.Select a resource group

Verify the data synchronization result

  1. Log on to the Kibana console of the Elasticsearch cluster.

    For more information, see Log on to the Kibana console.

  2. In the left-side navigation pane, click Dev Tools.
  3. On the Console tab of the page that appears, run the following command to query the synchronized data:
    POST /mysqljoin/_search?pretty
    {
    "query": { "match_all": {}}
    }
    Note mysqljoin is the value that you configured for the index field in the data synchronization script.
    Query the synchronized data

FAQ

  • An error occurred during the connection to the MySQL database.

    Solution: Add the private and public IP addresses of the ECS instance in the resource group to the MySQL database whitelist.

  • An error occurred during the connection to the Elasticsearch cluster.

    Solution:

    1. Check whether you have selected the resource group that you created in the preceding step from Resource Group configuration.
      • If yes, go to the next step.
      • If no, click Resource Group configuration to select the correct one. Then, click the Run icon icon.
    2. Check whether the IP address of the ECS instance in the resource group is added to the whitelist of the Elasticsearch cluster.
      • If yes, go to the next step.
      • If no, add the IP address of the ECS instance to the whitelist.
        Notice If you use the internal endpoint of the Elasticsearch cluster, log on to the Elasticsearch cluster and add the IP address of the ECS instance to VPC Whitelist on the Security page of the cluster. If you use the public endpoint of the Elasticsearch cluster, add the IP address of the ECS instance to Public Network Whitelist.
    3. Check whether the script is correctly configured. The fields that you must check include endpoint, accessId, and accessKey.