This topic describes how to use Canal to synchronize incremental data from an ApsaraDB RDS for MySQL database to an Alibaba Cloud Elasticsearch cluster.

Prerequisites

Notice Make sure that you have specified the same region, zone, Virtual Private Cloud (VPC), subnet, and security group for the required services. These services include ApsaraDB RDS for MySQL, Alibaba Cloud Elasticsearch, and Alibaba Cloud Elastic Compute Service (ECS).
  • ApsaraDB RDS for MySQL is activated.
    ApsaraDB RDS for MySQL stores data that you want to synchronize. For more information about how to activate ApsaraDB RDS MySQL, see Create an ApsaraDB RDS for MySQL instance. The following figure shows the ApsaraDB RDS for MySQL configuration that is used in this topic.ApsaraDB RDS for MySQL configuration
  • canal.adapter-1.1.4.tar.gz and canal.deployer-1.1.4.tar.gz are prepared.

    The Canal packages. Canal is a GitHub open-source extract, transform, and load (ETL) tool. It is used to parse database logs and retrieve incremental data for data synchronization. For more information, see Canal.

  • Alibaba Cloud Elasticsearch is activated.

    Alibaba Cloud Elasticsearch receives the synchronized incremental data. For more information about how to activate Alibaba Cloud Elasticsearch, see Create an Elasticsearch cluster. This topic uses an Alibaba Cloud Elasticsearch V6.7 cluster of the Standard Edition as an example.

  • Alibaba Cloud ECS is activated.

    Alibaba Cloud ECS connects ApsaraDB RDS for MySQL and Elasticsearch. In addition, both Canal deployer and Canal adapter are deployed on an Alibaba Cloud ECS instance. For more information about how to activate Alibaba Cloud ECS, see Step 1: Create an ECS instance. The image of the ECS instance is a CentOS 7.6 64-bit image.

Create a table and add fields

  1. Create a table in an ApsaraDB RDS for MySQL instance and add fields to the table.

    In this topic, table es_test is created. The following figure shows the fields that are added to the table.

    Fields and indexes in the es_test table
  2. Create an index on the Elasticsearch cluster and configure mappings.
    Log on to the Kibana console. In the left-side navigation pane, click Dev Tools. On the Console tab of the page that appears, create an index and configure mappings.
    Notice Make sure that the field names and field types specified in the following command are the same as those in the created table.
    PUT es_test?include_type_name=true
    {
    
        "settings" : {
          "index" : {
            "number_of_shards" : "5",
            "number_of_replicas" : "1"
          }
        },
        "mappings" : {
            "_doc" : {
                "properties" : {
                  "count": {          
                       "type": "text"       
                   },
                  "id": {
                       "type": "integer"
                   },
                    "name" : {
                        "type" : "text"                    
                    },
                    "color" : {
                        "type" : "text"                    
                    }
                }
            }
        }
    }
    If the index is created and mappings are configured, the following result is returned:
    {
      "acknowledged" : true,
      "shards_acknowledged" : true,
      "index" : "es_test"
    }

Install MySQL

  1. Connect to the ECS instance.
  2. Download the MySQL source package.
    wget http://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm
  3. Install the MySQL source.
    yum -y install mysql57-community-release-el7-11.noarch.rpm
  4. Check whether the MySQL source is successfully installed.
    yum repolist enabled | grep mysql.* 
    If the MySQL source is successfully installed, the result shown in the following figure is returned.MySQL source installed successfully
  5. Install MySQL.
    yum install mysql-community-server
  6. Start the MySQL service and check the service status.
    systemctl start mysqld.service
    systemctl status mysqld.service
    If MySQL is successfully started, the result shown in the following figure is returned.Start MySQL and check its status
  7. Connect to an ApsaraDB RDS for MySQL database.
    Notice
    • Before you run the required command to connect to an ApsaraDB RDS for MySQL database, you must add the private IP address of your ECS instance to the whitelist of the corresponding ApsaraDB RDS for MySQL instance. For more information, see Configure a whitelist for an ApsaraDB RDS for MySQL instance.
    • To use Canal, you must enable the MySQL binlog mode. By default, this mode is enabled for ApsaraDB RDS for MySQL. You can run the following command to query the status of the binlog mode:
      show variables like '%log_bin%';
      If the binlog mode is enabled, the result shown in the following figure is returned.Enable the MySQL binlog mode
    mysql -h<Hostname> -P<Port> -u<Username> -p<Password> -D<Database>
    Variable Description
    <Hostname> The internal endpoint of the ApsaraDB RDS for MySQL instance. You can query the internal endpoint on the Basic Information page of the ApsaraDB RDS for MySQL instance.
    <Port> The internal port of the ApsaraDB RDS for MySQL instance. The default port is 3306. You can query the internal port on the Basic Information page of the ApsaraDB RDS for MySQL instance.
    <Username> The username that is used to log on to the ApsaraDB RDS for MySQL database. You can query the username on the Accounts page of the ApsaraDB RDS for MySQL instance. If no account is available, you must create an account. For more information, see Create databases and accounts for an ApsaraDB RDS for MySQL instance.
    <Database> The name of the ApsaraDB RDS for MySQL database. You can query the database name on the Databases page of the ApsaraDB RDS for MySQL instance. If no database is available, you must create a database. For more information, see Create databases and accounts for an ApsaraDB RDS for MySQL instance.
    <Password> The password that is used to log on to the ApsaraDB RDS for MySQL database.
    Example command:
    mysql -hrm-bp1u1xxxxxxxxx6ph.mysql.rds.aliyuncs.com -P3306 -ues -pmima -Delasticsearch
    If ApsaraDB RDS for MySQL is successfully connected, the result shown in the following figure is returned.Connect to ApsaraDB RDS for MySQL

Install the JDK

  1. Connect to the ECS instance and query available JDK packages.
    yum search java | grep -i --color JDK
  2. Install the JDK of the required version.
    java-1.8.0-openjdk-devel.x86_64 is used in this topic.
    yum install java-1.8.0-openjdk-devel.x86_64
  3. Configure environment variables.
    1. Open the profile file in the etc folder.
      vi /etc/profile
    2. Add the following environment variables to the file:
      export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.71-2.b15.el7_2.x86_64
      export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
      export PATH=$PATH:$JAVA_HOME/bin
    3. Enter :wq to save the file and quit the vi mode. Then, run the following command for the modification to take effect:
      source /etc/profile
  4. Run the following commands to check whether the JDK is successfully installed:
    • java
    • javac
    • java -version
      If the JDK is successfully installed, the result shown in the following figure is returned.JDK installed successfully

Install and start the Canal server

  1. Connect to the ECS instance. Then, download and decompress the Canal deployer package.
    Canal deployer 1.1.4 is used in this topic.
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
  2. Decompress the canal.deployer-1.1.4.tar.gz package.
    tar -zxvf canal.deployer-1.1.4.tar.gz
  3. Modify the conf/example/instance.properties file.
    vi conf/example/instance.properties
    Modify the conf/example/instance.properties file
    Parameter Description
    canal.instance.master.address The parameter is in the format of <Internal endpoint of the ApsaraDB RDS for MySQL instance>:<Internal port>. You can query the required information on the Basic Information page of the ApsaraDB RDS for MySQL instance. Example: rm-bp1u1xxxxxxxxx6ph.mysql.rds.aliyuncs.com:3306.
    canal.instance.dbUsername The username that is used to log on to the ApsaraDB RDS for MySQL database. You can query the username on the Accounts page of the ApsaraDB RDS for MySQL instance.
    canal.instance.dbPassword The password that is used to log on to the ApsaraDB RDS for MySQL database.
  4. Enter :wq to save the file and quit the vi mode.
  5. Start the Canal server and query the log.
    ./bin/startup.sh
    cat logs/canal/canal.log
    Start the Canal server

Install and start the Canal adapter

  1. Connect to the ECS instance. Then, download and decompress the Canal adapter package.
    Canal adapter 1.1.4 is used in this topic.
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
  2. Decompress the canal.adapter-1.1.4.tar.gz package.
    tar -zxvf canal.adapter-1.1.4.tar.gz
  3. Modify the conf/application.yml file.
    vi conf/application.yml
    Modify the conf/application.yml file
    Parameter Description
    canal.conf.canalServerHost The endpoint of the Canal deployer. Retain the default value: 127.0.0.1:11111.
    canal.conf.srcDataSources.defaultDS.url jdbc:mysql://<Internal endpoint of the ApsaraDB RDS for MySQL instance>:<Internal port>/<Database name>?useUnicode=true. You can query the required information on the Basic Information page of the ApsaraDB RDS for MySQL instance. Example: jdbc:mysql://rm-bp1xxxxxxxxxnd6ph.mysql.rds.aliyuncs.com:3306/elasticsearch?useUnicode=true.
    canal.conf.srcDataSources.defaultDS.username The username that is used to log on to the ApsaraDB RDS for MySQL database. You can query the username on the Accounts page of the ApsaraDB RDS for MySQL instance.
    canal.conf.srcDataSources.defaultDS.password The password that is used to log on to the ApsaraDB RDS for MySQL database.
    canal.conf.canalAdapters.groups.outerAdapters.hosts Find name:es and set hosts to a value in the format of <Internal endpoint of the Elasticsearch cluster>:<Internal port>. You can query the required information on the Basic Information page of the Elasticsearch cluster. Example: es-cn-v64xxxxxxxxx3medp.elasticsearch.aliyuncs.com:9200.
    canal.conf.canalAdapters.groups.outerAdapters.mode Set the value to rest.
    canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth The parameter is in the format of <Username of the Elasticsearch cluster>:<Password>. Example: elastic:es_password.
    canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.name The ID of the Elasticsearch cluster. You can query the cluster ID on the Basic Information page of the Elasticsearch cluster. Example: es-cn-v64xxxxxxxxx3medp.
  4. Enter :wq to save the file and exit the vi mode.
  5. Repeat the preceding steps to modify the conf/es/*.yml file and specify the fields that you want to map from an ApsaraDB RDS for MySQL database to an Elasticsearch cluster.
    Modify the conf/es/*.yml file
    Parameter Description
    esMapping._index Set the value to the name of the index created on the Elasticsearch cluster in the Create a table and add fields section. es_test is used in this topic.
    esMapping._type Set the value to the type of the index created on the Elasticsearch cluster in the Create a table and add fields section. _doc is used in this topic.
    esMapping._id The ID of the document that you want to synchronize to the Elasticsearch cluster. This parameter is user-defined. _id is used in this topic.
    esMapping.sql The SQL statement that is used to query the fields to be synchronized to the Elasticsearch cluster. The select t.id as _id,t.id,t.count,t.name,t.color from es_test t; statement is used in this topic.
  6. Start the Canal adapter and query the log.
    ./bin/startup.sh
    cat logs/adapter/adapter.log
    If the Canal adapter is successfully started, the result shown in the following figure is returned.Canal adapter log

Verify the incremental data synchronization result

  1. In an ApsaraDB RDS for MySQL database, add, modify, or delete data in the es_test table.
    insert `elasticsearch`.`es_test`(`count`,`id`,`name`,`color`) values('11',2,'canal_test2','red');
  2. Log on to the Kibana console of the Elasticsearch cluster.
    For more information, see Log on to the Kibana console.
  3. In the left-side navigation pane, click Dev Tools. On the Console tab of the pane that appears, run the following command to query synchronized data:
    GET /es_test/_search
    If the incremental data is successfully synchronized, the result shown in the following figure is returned.Successful data synchronization