Alibaba Cloud provides a variety of cloud storage and database services. To search for and analyze data stored in these services, you can use the Data Integration service provided by DataWorks to collect the offline data at a minimum interval of five minutes. Then, synchronize the data to Alibaba Cloud Elasticsearch. This topic uses ApsaraDB RDS for MySQL as an example.
Overview
- PreparationsPrepare a MySQL data source, create a DataWorks workspace, and create and configure an Alibaba Cloud Elasticsearch cluster.
- Step 1: Purchase and create an exclusive resource groupPurchase and create an exclusive resource group for data integration. Bind the exclusive resource group to a virtual private cloud (VPC) and the created workspace. Exclusive resource groups transmit data in a fast and stable manner.
- Step 2: Add data sourcesConnect the MySQL data source and Elasticsearch cluster to the Data Integration service provided by DataWorks.
- Step 3: Create and run a data synchronization taskConfigure a data synchronization script to import the data synchronized by Data Integration into the Elasticsearch cluster. The exclusive resource group is registered with Data Integration as a resource to run tasks. This resource group retrieves data from data sources and runs the task of writing data to the Elasticsearch cluster. The task is issued by Data Integration.
- Step 4: Verify the data synchronization resultView the synchronized data in the Kibana console of your Elasticsearch cluster.
Preparations
- Create a database.You can use an ApsaraDB RDS database or create a database on your on-premises machine. In this topic, an ApsaraDB RDS for MySQL database is used. Join two MySQL tables and then synchronize data to your Elasticsearch cluster. The following figures show the two MySQL tables. For more information, see Create an ApsaraDB RDS for MySQL instance.
Figure 1. Table 1 Figure 2. Table 2 - Create a DataWorks workspace.For more information, see Create a workspace. The workspace must reside in the same region as the ApsaraDB RDS for MySQL database.
- Create an Alibaba Cloud Elasticsearch cluster and enable the Auto Indexing feature
for the cluster.The Elasticsearch cluster must reside in the same virtual private cloud (VPC) as the ApsaraDB RDS for MySQL database. For more information, see Create an Alibaba Cloud Elasticsearch cluster and Enable the Auto Indexing feature.
Step 1: Purchase and create an exclusive resource group
- Log on to the DataWorks console.
- In the top navigation bar, select a region. In the left-side navigation pane, click Resource Groups.
- Purchase exclusive resources for data integration. For more information, see Purchase an exclusive resource group for data integration.Notice The exclusive resources for data integration must reside in the same region as the DataWorks workspace you created.
- Create an exclusive resource group for data integration. For more information, see
Add an exclusive resource group for data integration.The following figure shows the configuration used in this example. Resource Group Type is set to Exclusive Resource Groups for Data Integration.
- Click Add VPC Binding on the right side of the created exclusive resource group to bind the exclusive resource
group to a VPC. For more information, see Bind an exclusive resource group for data integration to a VPC.Exclusive resources are deployed in the VPC where DataWorks resides. DataWorks can be used to synchronize data from the ApsaraDB RDS for MySQL database to the Elasticsearch cluster only after it connects to the VPCs where the database and cluster reside. The ApsaraDB RDS for MySQL database and the Elasticsearch cluster reside in the same VPC. Therefore, when you bind the exclusive resource group to a VPC, you need only to select the VPC and VSwitch to which your Elasticsearch cluster belongs.
- Click Change Workspace in the Actions column that corresponds to the exclusive resource group to bind it
to the DataWorks workspace you created. For more information, see Change the workspace to which an exclusive resource group is bound.
Step 2: Add data sources
- Go to the Data Integration page.
- In the left-side navigation pane of the DataWorks console, click Workspaces.
- Find the workspace you created and click Data Integration in the Actions column.
- In the left-side navigation pane of the Data Integration page, click Connection.
- On the Data Source page, click New data source in the upper-right corner.
- In the Add data source dialog box, click MySQL. In the Add MySQL data source dialog box, configure the parameters.Connection Type: Alibaba Cloud instance mode is used in this example. You can also select Connection string mode. For more information about the parameters, see Configure a MySQL connection.Notice If you select Connection string mode, you can configure JDBC URL by using the public endpoint of the ApsaraDB RDS for MySQL instance. However, you must add the elastic IP address (EIP) of the exclusive resource group to the whitelist of the ApsaraDB RDS for MySQL instance. For more information, see Configure a whitelist for an ApsaraDB RDS for MySQL instance and Add the information about an exclusive resource group for Data Integration to the whitelist of a data store.After parameters are configured, you can test the connectivity to the exclusive resource group. If the connectivity test is passed, Connectable appears in the Connectivity status column.
- Click Complete.
- Add an Elasticsearch data source in the same way.
Parameter Description Endpoint The URL that is used to access the Elasticsearch cluster. Specify the URL in the following format: http://<Internal or public endpoint of the Elasticsearch cluster>:9200
. You can obtain the endpoint from the Basic Information page of the cluster. For more information, see View the basic information of a cluster.Notice If you use the public endpoint of the cluster, add the elastic IP address (EIP) of the exclusive resource group to the public IP address whitelist of the cluster. For more information, see Configure a whitelist to access an Elasticsearch cluster over the Internet or a VPC and Add the information about an exclusive resource group for Data Integration to the whitelist of a data store.User name The username that is used to access the Elasticsearch cluster. The default username is elastic. Password The password that is used to access the Elasticsearch cluster. The password of the elastic account is specified when you create the cluster. If you forget the password, you can reset it. For more information about the procedure and precautions for resetting a password, see Reset the access password for an Elasticsearch cluster. Note Configure the parameters that are not listed in the preceding table as required.
Step 3: Create and run a data synchronization task
- On the Data Development tab of the DataWorks console, create a business process.For more information, see Manage workflows.
- In the navigation tree, right-click Data Integration and choose New > Offline synchronization.
- In the New node dialog box, set Node name and click Submit.
- In the upper part of the page, click the
icon.
- Configure the data synchronization script.For more information, see Create a sync node by using the code editor.Note You can click the
icon in the upper part of the page to apply the required script configuration template. Then, modify the template as required.
The following code provides a sample script used to retrieve information about students and their examination scores from two tables:{ "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "10" }, "speed": { "concurrent": 1, "throttle": false } }, "steps": [ { "category": "reader", "name": "Reader", "parameter": { "column": [ "id", "name", "sex", "birth", "department", "address" ], "connection": [ { "datasource": "zl_test_rdsmysql", "querysql": [ "SELECT student.id,name,sex,birth,department,address,c_name,grade FROM student JOIN score on student.id=score.stu_id;" ], "table": [ "score" ] } ], "encoding": "UTF-8", "splitPk": "", "where": "" }, "stepType": "mysql" }, { "category": "writer", "name": "Writer", "parameter": { "batchSize": 1000, "cleanup": true, "column": [ { "name": "student_id", "type": "id" }, { "name": "sex", "type": "text" }, { "name": "name", "type": "text" }, { "name": "birth", "type": "integer" }, { "name": "quyu", "type": "text" }, { "name": "address", "type": "text" }, { "name": "cname", "type": "text" }, { "name": "grades", "type": "integer" } ], "datasource": "ES_data_source", "discovery": false, "index": "mysqljoin", "indexType": "_doc", "splitter": "," }, "stepType": "elasticsearch" } ], "type": "job", "version": "2.0" }
The synchronization script includes three parts.Parameter Description setting
Used to configure parameters related to packet loss and the maximum concurrency during synchronization. The default value of the record
field inerrorLimit
is 0. You can change it to a larger value, such as 10.Reader
Used to configure the MySQL reader. Use querysql
to define an SQL statement to retrieve data based on specific conditions. Ifquerysql
is configured, the MySQL reader ignores thetable
,column
,where
, andsplitPk
conditions.datasource
usesquerysql
to parse the username and password. For more information, see Configure MySQL Reader.Writer
Used to configure the Elasticsearch writer. For more information, see Elasticsearch Writer. The configuration items include: index
: the name of the index.indexType
: The index type of Elasticsearch clusters of V7.0 or later must be_doc
.
- Save the script. Then, in the right-side navigation pane, click the Scheduling configuration tab. In the pane that appears, configure parameters based on your business needs.For more information about the parameters, see Scheduling configuration.Notice
- Before you submit a task, you must set Dependent upstream node in the Scheduling dependency section of the Scheduling configuration pane. For more information, see Dependencies.
- If you want to schedule the task to run periodically, you must configure the parameters in the Time attribute section, such as Specific time, Scheduling cycle, Effective Date, and Rerun attribute.
- The configuration of a periodic task takes effect at 00:00 of the next day.
- Configure the resource group that is used to execute the synchronization task.
- Click the Data integration resource group configuration tab on the right side of the page.
- Set Programme to Exclusive data integration Resource Group.
- Set Exclusive data integration Resource Group to the exclusive resource group you created.
- Submit the data synchronization task.
- Save the current configurations and click the
icon.
- In the Submit New Version dialog box, enter your comments in the Change description field.
- Click OK.
- Save the current configurations and click the
- Click the
icon to run the task.
You can view the operational logs of the task when the task is running. After the task is successfully executed, the result shown in the following figure is returned.
Step 4: Verify the data synchronization result
- Log on to the Kibana console of your Elasticsearch cluster.For more information, see Log on to the Kibana console.
- In the left-side navigation pane, click Dev Tools.
- On the Console tab of the page that appears, run the following command to query the synchronized
data:
POST /mysqljoin/_search? pretty { "query": { "match_all": {}} }
Notemysqljoin
is the value that you configured for theindex
field in the data synchronization script.If the data is synchronized, the result shown in the following figure is returned.