All Products
Search
Document Center

Elasticsearch:Use DTS to synchronize data from MySQL to Alibaba Cloud Elasticsearch in real time

Last Updated:Aug 23, 2023

If you want to search and analyze the production data in your ApsaraDB RDS for MySQL database by using Alibaba Cloud Elasticsearch, you can use Data Transmission Service (DTS) to synchronize the data to an Elasticsearch cluster in real time. The synchronization is implemented based on a real-time synchronization task. This topic describes how to create a real-time synchronization task to synchronize data from an ApsaraDB RDS for MySQL database to an Alibaba Cloud Elasticsearch cluster in real time. This topic also describes how to verify the synchronization results of full data and incremental data.

Background information

  • DTS is a data transmission service that integrates data migration, data subscription, and real-time data synchronization. For more information, see DTS. DTS supports synchronization of data changes generated by insert, delete, and update operations. For information about the versions of data sources from which DTS can synchronize data, see Overview of data synchronization scenarios.

  • You can use DTS to synchronize full or incremental data from MySQL to Elasticsearch. This solution is suitable for scenarios in which you have high requirements for the performance of real-time synchronization from a relational database or you need to synchronize full or incremental data from a relational database to an Alibaba Cloud Elasticsearch cluster.

Prerequisites

An ApsaraDB RDS for MySQL instance and an Alibaba Cloud Elasticsearch cluster are created. We recommend that you create the instance and cluster in the same virtual private cloud (VPC).

Limits

  • You cannot use DTS to synchronize data to an Alibaba Cloud Elasticsearch V7.16 or V8.X cluster.

  • DTS does not synchronize data changes generated by DDL operations. If a DDL operation is performed on a table in the source database during data synchronization, you must perform the following operations: Remove the table from the data synchronization task, remove the index for the table from the Elasticsearch cluster, and then add the table to the data synchronization task again. For more information, see Remove an object from a data synchronization task and Add an object to a data synchronization task.

Precautions

  • DTS uses read and write resources of the source and destination databases during initial full data synchronization. This may increase the loads of the database servers. If the database performance or specifications are unfavorable, or the data volume is large, database services may become unavailable. For example, DTS occupies a large amount of read and write resources in the following cases: a large number of slow SQL queries are performed on the source database, the tables have no primary keys, or a deadlock occurs in the destination database. Before you synchronize data, evaluate the impact of data synchronization on the performance of the source and destination databases. We recommend that you synchronize data during off-peak hours. For example, you can synchronize data when the CPU utilization of the source and destination databases is less than 30%.
  • If you want to add columns to the source table, modify the mappings of the index that corresponds to the table. Then, perform the related DDL operation on the source table, pause the data synchronization task, and start the task again.

Procedure

Step 1: Prepare environments

  1. Enable the Auto Indexing feature for the Elasticsearch cluster.

    For more information, see Configure the YML file. In this topic, an Elasticsearch V6.7 cluster is used.

    Note

    To ensure data security, Alibaba Cloud Elasticsearch disables Auto Indexing by default. When you use DTS to synchronize data to an Elasticsearch cluster, you must create indexes in the Elasticsearch cluster by submitting data instead of calling the create index API. Therefore, before you use DTS to synchronize data, you must enable Auto Indexing for the Elasticsearch cluster.

  2. Create a database and a table. Then, insert data into the table.

    You can use an ApsaraDB RDS database or a self-managed database that is created on your on-premises machine. In this example, an ApsaraDB RDS for MySQL database is used. For information about how to create an ApsaraDB RDS for MySQL database and create a table in the database, see General workflow to use ApsaraDB RDS for MySQL.

    Important

    We recommend that you use an ApsaraDB RDS for MySQL instance in the same region as the Elasticsearch cluster. If the instance and cluster reside in different regions, network connectivity between them cannot be ensured.

    In this example, the following ApsaraDB RDS for MySQL database is created, and the following SQL statements are executed to create a table in the database and insert data into the table.

    • Create an ApsaraDB RDS for MySQL database

      test_logstash

    • Create a table in the database and insert data into the table

      -- create table
      CREATE TABLE `es_test` (
          `id` bigint(32) NOT NULL,
          `name` varchar(32) NULL,
          `age` bigint(32) NULL,
          `hobby` varchar(32) NULL,
          PRIMARY KEY (`id`)
      ) ENGINE=InnoDB
      DEFAULT CHARACTER SET=utf8;
      
      -- insert data
      INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (1,'user1',22,'music');
      INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (2,'user2',23,'sport');
      INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (3,'user3',43,'game');
      INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (4,'user4',24,'run');
      INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (5,'user5',42,'basketball');

Step 2: Configure a data synchronization task

  1. Log on to the DTS console.

  2. In the left-side navigation pane, click Data Synchronization.

    Note

    The operations described in this topic are performed in the DTS console of the new version. For information about operations in the DTS console of the old version, see Manage a data synchronization task.

  3. On the Data Synchronization page, click Create Task. Then, create and configure the data synchronization task as prompted.

    You must configure the source database and destination database, the objects from which you want to synchronize data, the field mappings, the advanced settings, and the names of fields that are synchronized. For more information, see the topics in Synchronize data from a MySQL database and the Synchronize data from a PolarDB-X 2.0 instance to an Elasticsearch cluster topic. The following table describes the parameters that are configured in this example.

    1. Configure the source database and destination database.

      Category

      Parameter

      Description

      None

      Task Name

      • DTS automatically generates a task name. You do not need to use a unique task name.

      • We recommend that you use an informative name for easy identification.

      Source Database

      Database Type

      The source database type. Select MySQL.

      Access Method

      The access method for the source database. Select Alibaba Cloud Instance.

      Instance Region

      The region in which the source database resides.

      Replicate Data Across Alibaba Cloud Accounts

      In this example, No is selected because data is synchronized within the same Alibaba Cloud account.

      RDS Instance ID

      The ID of the source ApsaraDB RDS for MySQL instance.

      Database Account

      The account that is used to connect to the ApsaraDB RDS for MySQL database from which you want to synchronize data. The account must be granted read permissions on the database.

      Database Password

      The password that is used to connect to the ApsaraDB RDS for MySQL database. Enter the password that corresponds to the username specified by Database Account.

      Encryption

      Specifies whether to encrypt the connection to the source ApsaraDB RDS for MySQL database. Valid values: Non-encrypted and SSL-encrypted. If you select SSL-encrypted, you must enable SSL encryption for the ApsaraDB RDS for MySQL instance before you configure the data synchronization task. For more information, see Configure SSL encryption for an ApsaraDB RDS for MySQL instance.

      Destination Database

      Database Type

      The destination database type. Select Elasticsearch.

      Access Method

      The access method for the destination Elasticsearch cluster. The value of this parameter is fixed as Alibaba Cloud Instance.

      Instance Region

      The region in which the destination Elasticsearch cluster resides. We recommend that you select the region in which the source ApsaraDB RDS for MySQL database resides.

      Instance ID

      The ID of the destination Elasticsearch cluster.

      Database Account

      The username that is used to connect to the destination Elasticsearch cluster. The default username is elastic.

      Database Password

      The password that is used to connect to the destination Elasticsearch cluster. Enter the password that corresponds to the username specified by Database Account.

    2. Configure the objects from which you want to synchronize data.

      Parameter

      Description

      Synchronization Type

      By default, Incremental Data Synchronization is selected. You must also select Schema Synchronization and Full Data Synchronization. After the precheck is complete, DTS synchronizes the historical data of selected objects from the source instance to the destination instance. The historical data is the basis for subsequent incremental synchronization.

      Processing Mode of Conflicting Tables

      • Precheck and Report Errors: checks whether the destination database contains tables that have the same names as tables in the source database. If the source and destination databases do not contain tables that have identical table names, the precheck is passed. Otherwise, an error is returned during the precheck and the data synchronization task cannot be started.

        Note

        You can use the object name mapping feature to rename the tables that are synchronized to the destination database. You can use this feature if the source and destination databases contain identical table names and the tables in the destination database cannot be deleted or renamed. For more information, see Map object names.

      • Ignore Errors and Proceed: skips the precheck for identical table names in the source and destination databases.

        Warning

        If you select Ignore Errors and Proceed, data inconsistency may occur, and your business may be exposed to potential risks.

        • If the source and destination databases have the same schemas, and a data record has the same primary key value as an existing data record in the destination database:

          • During full data synchronization, DTS does not synchronize the data record to the destination database. The existing data record in the destination database is retained.

          • During incremental data synchronization, DTS synchronizes the data record to the destination database. The existing data record in the destination database is overwritten.

        • If the source and destination databases have different schemas, data may fail to be initialized, only some columns are synchronized, or the data synchronization task fails. Operate with caution.

      • Precheck and Report Errors: checks whether the destination database contains collections that have the same names as collections in the source database. If the source and destination databases do not contain collections that have identical collection names, the precheck is passed. Otherwise, an error is returned during the precheck and the data synchronization task cannot be started.

        Note

        You can use the object name mapping feature to rename the collections that are synchronized to the destination database. You can use this feature if the source and destination databases contain collections that have identical names and the collections in the destination database cannot be deleted or renamed. For more information, see Rename an object to be synchronized.

      • Ignore Errors and Proceed: skips the precheck for identical collection names in the source and destination databases.

        Warning

        If you select Ignore Errors and Proceed, data inconsistency may occur, and your business may be exposed to potential risks.

        • DTS does not synchronize data records that have the same primary key values as data records in the destination database.

        • Data may fail to be initialized, only some columns are synchronized, or the data synchronization task fails.

      • Precheck and Report Errors: checks whether the destination database contains tables that have the same names as tables in the source database. If the source and destination databases do not contain tables that have identical table names, the precheck is passed. Otherwise, an error is returned during the precheck and the data synchronization task cannot be started.

      • Ignore Errors and Proceed: skips the precheck for identical table names in the source and destination databases.

        Warning

        If you select Ignore Errors and Proceed, data inconsistency may occur, and your business may be exposed to potential risks.

        • If the source and destination databases have the same schemas, and a data record has the same primary key value as an existing data record in the destination database:

          • During full data synchronization, DTS does not synchronize the data record to the destination database. The existing data record in the destination database is retained.

          • During incremental data synchronization, DTS synchronizes the data record to the destination database. The existing data record in the destination database is overwritten.

        • If the source and destination databases have different schemas, data may fail to be initialized, only some columns are synchronized, or the data synchronization task fails. Operate with caution.

      Index Name

      • Table Name

        If you select Table Name, the system creates an index that has the same name as the table in the Elasticsearch cluster. In this example, the index name is es_test.

      • Database Name_Table Name

        If you select Database Name_Table Name, the system creates an index whose name is in the format of Database name_Table name. In this example, the index name is test_logstash_es_test.

      Source Objects

      Select one or more objects from the Source Objects section and click the Rightwards arrow icon to add the objects to the Selected Objects section.

      Selected Objects

      • To rename an object that you want to synchronize to the destination instance, right-click the object in the Selected Objects section. For more information, see Map the name of a single object.

      • To rename multiple objects at a time, click Batch Edit in the upper-right corner of the Selected Objects section. For more information, see Map multiple object names at a time.

    3. Configure field mappings and change the names of fields that are synchronized.

      If you want to change the names of fields that are synchronized, you can click the name of the table to which the fields belong in the Selected Objects list, and specify the name of the index and the name of the index type in the Elasticsearch cluster for the table. After the configuration is complete, click OK. The following table describes the parameters that are configured in this example. You can retain the default values for other parameters that are not described in the following table. For more information, see Map the name of a single object.

      Parameter

      Description

      Index Name

      The name of the destination index. You can specify a name based on your business requirements. For information about the concept of an index, see Terms.

      Important

      The name that you enter must be unique in the Elasticsearch cluster. Otherwise, the index already exists error message appears.

      Type Name

      The name of the destination index type. You can specify a name based on your business requirements. For information about the concept of an index type, see Terms.

      Filter Conditions

      You can specify SQL conditions to filter data. Only data that meets the specified conditions can be synchronized to the destination Elasticsearch cluster. For more information, see Use SQL conditions to filter data.

      Parameter

      Select the column parameter and parameter value. For more information, see Mapping parameters.

      Important

      If you set index to false for a field that you added, the field cannot be queried. For more information, see index.

    4. Configure advanced parameters.

      In this example, default values are retained for the advanced parameters. The following table describes the advanced parameters.

      Parameter

      Description

      Select the dedicated cluster used to schedule the task

      • By default, DTS schedules the data synchronization task by using a public cluster. You do not need to specify a node in a public cluster for task scheduling.

      • If you want to use a dedicated cluster, you must purchase one first, and make sure that the dedicated cluster can successfully run before you can select it.

      • You can purchase a dedicated cluster to run DTS tasks such as data migration tasks, data synchronization tasks, and subscription tasks. Dedicated clusters can isolate resources for your DTS tasks from the resources used by DTS instances of other users. Compared with DTS instances in public clusters, DTS instances in dedicated clusters have higher stability and better performance.

      Set Alerts

      Specifies whether to configure alerting for the data synchronization task. If the task fails or the synchronization latency exceeds the specified threshold, alert contacts will receive notifications. Valid values:

      Specify the retry time range for failed connections

      The retry time range for failed connections. If the source or destination database fails to be connected after the data synchronization task is started, DTS immediately retries a connection within the time range. Valid values: 10 to 1440. Unit: minutes. Default value: 720. We recommend that you set the parameter to a value greater than 30. If DTS reconnects to the source and destination databases within the specified time range, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.

      Note
      • If you set different retry time ranges for multiple DTS tasks that have the same source or destination database, the shortest retry time range that is set takes precedence.

      • When DTS retries a connection, you are charged for the DTS instance. We recommend that you specify the retry time range based on your business requirements. You can also release the DTS instance at your earliest opportunity after the source and destination instances are released.

      Shard Configuration

      The number of primary shards and the number of replica shards for each primary shard. You can specify the numbers based on the default shard configuration of the destination Elasticsearch cluster. By default, an index in an Elasticsearch cluster whose version is earlier than V7.X has five primary shards and one replica shard for each primary shard. By default, an index in an Elasticsearch cluster whose version is V7.X or later has one primary shard and one replica shard for the primary shard.

      Important

      The number of shards and the size of each shard are important factors that affect the stability and performance of an Elasticsearch cluster. You must appropriately configure shards for indexes in an Elasticsearch cluster. For information about how to plan shards for indexes, see Evaluate specifications and storage capacity.

      String Index

      The method used to index strings that are synchronized to the destination Elasticsearch cluster. Valid values:

      • analyzed: indicates that the strings are analyzed before indexing. You must select an analyzer. For information about the analyzer types, reference Built-in analyzer reference.

      • not analyzed: indicates that the strings are indexed with the original values.

      • no: indicates that the strings are not indexed.

      Time Zone

      The time zone of the date and time data types such as DATETIME and TIMESTAMP. You can select a time zone during the data synchronization to the destination Elasticsearch cluster.

      Note

      If the date and time data types in the destination Elasticsearch cluster do not need a time zone, you must specify the document type for the date and time data types.

      DOCID

      The default value of this parameter is the primary key of the table in the destination Elasticsearch cluster. If the table does not have a primary key, the value of this parameter is the ID column that is automatically generated by Elasticsearch.

      Configure ETL

      • The ETL configuration feature does not support changes on table schemas in a destination. If changes on table schemas are required, you must make the changes in the destination before the data synchronization task starts.

      • If you change the existing ETL-related configurations, the data synchronization task may be interrupted, or historical data may be changed. Make sure that your business is not affected if you change the existing ETL-related configurations.

      • If you change the existing ETL-related configurations and restart the data synchronization task, the new configurations take effect only for incremental data that is generated after the restart. The new configurations do not take effect for historical data.

      • For more information about how to make ETL-related configurations, see Configure ETL in a data migration or data synchronization task.

    5. Configure the routing policy and the value of the _id field in the Elasticsearch cluster for the table from which you want to synchronize data.

      The following table describes the parameters that are configured in this example.

      Option

      Description

      Set _routing

      Specifies whether to store a document on a specific shard of the destination Elasticsearch cluster. For more information, see _routing.

      • If you select Yes, you can specify custom columns for routing.

      • If you select No, the _id value is used for routing.

      Note

      If the version of the destination Elasticsearch cluster is 7.x, you must select No.

      Value of _id

      • Primary key column

        Multiple columns are merged into one composite primary key.

      • Business key

        If you select a business key, you must also specify the business key column.

  4. After the configuration is complete, save the data synchronization task, perform a pre-check on the task, and purchase a DTS instance to start the data synchronization task.

    After the DTS instance is purchased, the data synchronization task starts to run. You can view the data synchronization progress of the task on the Data Synchronization Tasks page. After the full data in the ApsaraDB RDS for MySQL database is synchronized, the synchronization of the incremental data in the database starts. During the synchronization of the incremental data, you can view the full data that is synchronized to the Elasticsearch cluster.查看数据同步作业状态

    Important

    The data types supported by the ApsaraDB RDS for MySQL instance and those supported by the Elasticsearch cluster are different. Therefore, the data types in the ApsaraDB RDS for MySQL instance and those in the Elasticsearch cluster do not have one-to-one mapping relationships. When DTS synchronizes data from a source to a destination, DTS establishes mappings between source fields and destination fields based on the data types supported by the destination. For more information, see Data type mappings for schema synchronization.

Step 3: Verify the data synchronization results

  1. Log on to the Kibana console of your Elasticsearch cluster and go to the homepage of the Kibana console as prompted.
    For more information about how to log on to the Kibana console, see Log on to the Kibana console.
    Note In this example, an Elasticsearch V6.7.0 cluster is used. Operations on clusters of other versions may differ. The actual operations in the console prevail.
  2. In the left-side navigation pane of the page that appears, click Dev Tools.
  3. On the Console tab, run the following command to query the full data that is synchronized to the destination Elasticsearch cluster:

    GET /es_test_index/es_test_type/_search

    If the command is successfully run, the result shown in the following figure is returned.查询结果

  4. Insert a data record into the source table and view the synchronization result of the data record in the destination Elasticsearch cluster.

    In this example, the following SQL statement is executed to insert a data record:

    INSERT INTO `test_logstash`.`es_test` (`id`,`name`,`age`,`hobby`) VALUES (6,'user6',30,'dance');

    If the synchronization of the data record is successful, the result shown in the following figure is returned.验证增量数据同步