This topic describes how to use Canal to synchronize incremental data from ApsaraDB RDS MySQL to Alibaba Cloud Elasticsearch.

Prerequisites

Before you use Canal, make sure that the following prerequisites are met:
Notice When you activate ApsaraDB RDS MySQL, Alibaba Cloud Elasticsearch, and Elastic Compute Service (ECS), make sure that you have specified the same region, zone, VPC network, VSwitch, and security group for them.
  • ApsaraDB RDS MySQL.
    ApsaraDB RDS MySQL is used to store the source data and incremental data that you want to synchronize. For more information about how to activate ApsaraDB RDS MySQL, see Create an ApsaraDB RDS for MySQL instance. The following figure shows the ApsaraDB RDS MySQL configuration used in this topic.ApsaraDB RDS for MySQL configuration
  • canal.adapter-1.1.4.tar.gz and canal.deployer-1.1.4.tar.gz.

    The Canal packages. Canal is a GitHub open-source extract-transform-load (ETL) tool that is used to parse database logs and retrieve incremental data for data synchronization. For more information, see Canal.

  • Alibaba Cloud Elasticsearch.
    Alibaba Cloud Elasticsearch is used to receive the synchronized incremental data. The following figure shows the Elasticsearch configuration used in this topic.Elasticsearch configuration
  • Alibaba Cloud ECS.
    Alibaba Cloud ECS is used to connect ApsaraDB RDS MySQL and Alibaba Cloud Elasticsearch. In addition, both Canal deployer and Canal adapter are deployed on Alibaba Cloud ECS. For more information about how to activate Alibaba Cloud ECS, see Create an ECS instance. The following figure shows the ECS configuration used in this topic.ECS configuration

Create tables, fields, and indexes

  1. Create a table in ApsaraDB RDS MySQL and add fields to the table.

    In this topic, table es_test is created. The following figure shows the fields that are added to the table.

     es_test table and fields
  2. Create an index on the Elasticsearch instance and configure mappings.
    Log on to the Kibana console, and run the following command in the Console on the Dev Tools page to create the index and configure mappings.
    Notice You must make sure that the table name, field names, and field types specified in the following command are the same as those in the MySQL table.
    PUT es_test? include_type_name=true
    {
    
        "settings" : {
          "index" : {
            "number_of_shards" : "5",
            "number_of_replicas" : "1"
          }
        },
        "mappings" : {
            "_doc" : {
                "properties" : {
                  "count": {          
                       "type": "text"       
                   },
                  "id": {
                       "type": "long"
                   },
                    "name" : {
                        "type" : "text"                    
                    },
                    "color" : {
                        "type" : "text"                    
                    }
                }
            }
        }
    }
    If the table and mappings are successfully created, the following result is returned:
    {
      "acknowledged" : true,
      "shards_acknowledged" : true,
      "index" : "es_test"
    }

Install MySQL

  1. Connect to your Alibaba Cloud ECS instance.
  2. Download the MySQL source installation package.
    wget http://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm
  3. Install the MySQL source.
    yum -y install mysql57-community-release-el7-11.noarch.rpm
  4. Check whether the MySQL source has been installed successfully.
    yum repolist enabled | grep mysql. *
    If the MySQL source is successfully installed, the following result is returned:MySQL source installed successfully
  5. Install the MySQL server.
    yum install mysql-community-server
  6. Start the MySQL service and check the status of the MySQL service.
    systemctl start mysqld.service
    systemctl status mysqld.service
    If the MySQL service is started, the following result is returned:Start MySQL and check its status
  7. Connect to the ApsaraDB RDS MySQL database.
    Notice
    • Before you run the following command to connect to the ApsaraDB RDS MySQL database, you must add the private IP address of the ECS instance to the ApsaraDB RDS MySQL whitelist. For more information, see Configure a whitelist for an RDS MySQL instance.
    • To use Canal, you must enable the MySQL binlog mode. By default, this mode is enabled for ApsaraDB RDS MySQL. You can run the following command to query the status of the binlog mode.
      show variables like '%log_bin%';
      If the binlog mode is enabled, the following result is returned:Enable the MySQL binlog mode
    mysql -h<hostname> -P<port> -u<username> -p<password> -D<database>
    Variable Description
    <hostname> The internal endpoint of the ApsaraDB RDS MySQL instance. You can check the internal endpoint information on the Basic Information page of the instance.
    <port> The internal port of the ApsaraDB RDS MySQL instance. The default port number is 3306. You can check the internal port information on the Basic Information page of the instance.
    <username> The username of the ApsaraDB RDS MySQL database. You can check the account information on the Accounts page of the instance. If no account is available, you must create an account. For more information, see Create accounts and databases for an RDS MySQL instance.
    <database> The name of the ApsaraDB RDS MySQL database. You can check the database name on the Databases page of the instance. If no database is available, you must create a database. For more information, see Create accounts and databases for an RDS MySQL instance.
    <password> The password of the ApsaraDB RDS MySQL database.
    Sample command:
    mysql -hrm-bp1u1xxxxxxxxx6ph.mysql.rds.aliyuncs.com -P3306 -ues -pmima -Delasticsearch
    If you are connected to ApsaraDB RDS MySQL, the following result is returned:Connect to ApsaraDB RDS for MySQL

Install the JDK

  1. Connect to the ECS instance and query the available JDK packages.
    yum search java | grep -i --color JDK
  2. Select a version and then install the JDK. In this topic, java-1.8.0-openjdk-devel.x86_64 is selected.
    yum install java-1.8.0-openjdk-devel.x86_64
  3. Configure environment variables.
    1. Open the profile file in the etc folder.
      vi /etc/profile
    2. Add the following environment variables to the file.
      export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.71-2.b15.el7_2.x86_64
      export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
      export PATH=$PATH:$JAVA_HOME/bin
    3. Enter :wq to save the file and quit the vi mode, and then run the following command to apply the changes.
      source /etc/profile
  4. Run the following commands to check whether the JDK has been successfully installed.
    • java
    • javac
    • java -version
      If the JDK is installed successfully, the following result is returned:JDK installed successfully

Install and start the Canal server

  1. Connect to the ECS instance, and download and extract the Canal deployer package. In this topic, Canal deployer V1.1.4 is used.
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
  2. Extract the canal.deployer-1.1.4.tar.gz package.
    tar -zxvf canal.deployer-1.1.4.tar.gz
  3. Modify the conf/example/instance.properties file.
    vi conf/example/instance.properties
    Modify the conf/example/instance.properties file
    Parameter Description
    canal.instance.master.address <ApsaraDB RDS MySQL instance internal endpoint>:<internal port>. You can check the required information on the Basic Information page of the ApsaraDB RDS MySQL instance. Example: rm-bp1u1xxxxxxxxx6ph.mysql.rds.aliyuncs.com:3306.
    canal.instance.dbUsername The username of the ApsaraDB RDS MySQL database. You can check the account information on the Accounts page of the ApsaraDB RDS MySQL instance.
    canal.instance.dbPassword The password of the ApsaraDB RDS MySQL database.
  4. Enter :wq to save the file and quit the vi mode.
  5. Start the Canal server, and then display the Canal log.
    ./bin/startup.sh
    cat logs/canal/canal.log
    Start the Canal server

Install and start the Canal adapter

  1. Connect to the ECS instance, and then download and extract the Canal adapter package. In this topic, Canal adapter V1.1.4 is used.
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
  2. Extract the canal.adapter-1.1.4.tar.gz package.
    tar -zxvf canal.adapter-1.1.4.tar.gz
  3. Modify the conf/application.yml file.
    vi conf/application.yml
    Modify the conf/application.yml file
    Parameter Description
    canal.conf.canalServerHost The endpoint of the Canal deployer. Use the default setting: 127.0.0.1:11111.
    canal.conf.srcDataSources.defaultDS.url jdbc:mysql://<ApsaraDB RDS MySQL instance internal endpoint>:<internal port>/<database name>? useUnicode=true. You can check the required information on the Basic Information page of the ApsaraDB RDS MySQL instance. Example: jdbc:mysql://rm-bp1xxxxxxxxxnd6ph.mysql.rds.aliyuncs.com:3306/elasticsearch? useUnicode=true.
    canal.conf.srcDataSources.defaultDS.username The username of the ApsaraDB RDS MySQL database. You can check the account information on the Basic Information page of the ApsaraDB RDS MySQL instance.
    canal.conf.srcDataSources.defaultDS.password The password of the ApsaraDB RDS MySQL database.
    canal.conf.canalAdapters.groups.outerAdapters.hosts Find name:es, and replace hosts with <Alibaba Cloud Elasticsearch instance internal network address>:<internal network port>. You can check the required information on the View the basic information of a cluster page of the Elasticsearch instance. Example: es-cn-v64xxxxxxxxx3medp.elasticsearch.aliyuncs.com:9200.
    canal.conf.canalAdapters.groups.outerAdapters.mode Set the value to rest.
    canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth Set the value to <Alibaba Cloud Elasticsearch instance username>:<password>. Example: elastic:es_password.
    canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.name The ID of the Alibaba Cloud Elasticsearch instance. You can check the ID on the View the basic information of a cluster page of the Elasticsearch instance. Example: es-cn-v64xxxxxxxxx3medp.
  4. Enter :wq to save the file and exit the vi mode.
  5. Follow the same procedure to modify the conf/es/*.yml file to specify the fields to be mapped from ApsaraDB RDS MySQL to Alibaba Cloud Elasticsearch.
    Modify the conf/es/*.yml file
    Parameter Description
    esMapping._index Set this value to the name of the index created on the Elasticsearch instance in the Create tables, fields, and indexes section. In this topic, the value is set to es_test.
    esMapping._type Set this value to the type of the index created on the Elasticsearch instance in the Create tables, fields, and indexes section. In this topic, the value is set to _doc.
    esMapping._id The ID of the document to be synchronized to the Elasticsearch instance. You can specify the ID of your own document. In this topic, the value is set to _id.
    esMapping.sql The SQL statement that is used to retrieve the fields to be synchronized to the Elasticsearch instance. In this topic, the select t.id as _id,t.id,t.count,t.name,t.color from es_test t; statement is used.
  6. Start the Canal adapter service and display the Canal log.
    ./bin/startup.sh
    cat logs/adapter/adapter.log
    If the Canal adapter service is started, the following result is returned:Canal adapter service log

Verify the incremental data synchronization function of Canal

  1. In the ApsaraDB RDS MySQL database, add, modify, or delete data in the es_test table.
    insert `elasticsearch`.`es_test`(`count`,`id`,`name`,`color`) values('11',2,'canal_test2','red');
  2. Log on to the Alibaba Cloud Elasticsearch console, and then Log on to the Kibana console.
  3. In the Kibana console, run the following command in the Console on the Dev Tools page to query the synchronized incremental data.
    GET /es_test/_search
    If the incremental data is successfully synchronized, the following result is returned:Successful data synchronization result