Canal is an open source product provided by Alibaba Group. Canal can parse incremental log data in MySQL and allows you to subscribe to and consume incremental data. You can use Canal to synchronize incremental data from a MySQL database to an Alibaba Cloud Elasticsearch cluster. This topic describes the procedure in detail. An ApsaraDB RDS for MySQL database is used in this topic.

Background information

Canal is an open source extract, transform, load (ETL) software provided in GitHub. For more information about work principles and other details about Canal, see Canal.

Procedure

  1. Make preparations
    Create an ApsaraDB RDS for MySQL instance, an Alibaba Cloud Elasticsearch cluster, and an Elastic Compute Service (ECS) instance that reside in the same virtual private cloud (VPC).
    • ApsaraDB RDS for MySQL instance: stores source data and incremental data.
    • Canal: used to parse database logs, obtain incremental data, and synchronize the incremental data to the Alibaba Cloud Elasticsearch cluster.
    • Alibaba Cloud Elasticsearch cluster: receives incremental data.
    • ECS instance: used to deploy the Canal server and Canal adapter.
  2. Step 1: Prepare the MySQL data source
    Prepare the data that you want to synchronize in the ApsaraDB RDS for MySQL instance.
  3. Step 2: Create an index and configure mappings for the index
    Create an index and configure mappings for the index in the Elasticsearch cluster. The field names and types that are defined in the mappings must be the same as those of the data that you want to synchronize.
  4. Step 3: Install the JDK
    Install the Java Development Kit (JDK) before you use Canal. The version of the JDK must be 1.8.0 or later.
  5. Step 4: Install and start the Canal server
    Install the Canal server and modify its configuration file to associate the Canal server with the ApsaraDB RDS for MySQL instance. For a MySQL cluster, the Canal server simulates a slave node in the cluster to obtain the binary logs on the master node in the cluster. Then, the Canal server sends the logs to the Canal adapter.
  6. Step 5: Install and start the Canal adapter
    Install the Canal adapter and modify its configuration file to associate the Canal adapter with the ApsaraDB RDS for MySQL instance and Elasticsearch cluster. Then, define the mappings between the fields in the ApsaraDB RDS for MySQL instance and those in the Elasticsearch cluster for data synchronization.
  7. Step 6: Verify the synchronization result of incremental data
    Add, modify, or delete data in the ApsaraDB RDS for MySQL instance and view the data synchronization result.

Make preparations

  • Create an ApsaraDB RDS for MySQL instance.
    For more information, see Create an ApsaraDB RDS for MySQL instance. In this example, an ApsaraDB RDS instance that run MySQL 5.7 is created. The following figure shows the configuration of the instance. ApsaraDB RDS for MySQL configuration
    Important In this example, an ApsaraDB RDS instance that run MySQL 5.7 is used. If you use an ApsaraDB RDS instance that run MySQL 8.0, you must replace the MySQL 5.7 driver with the MySQL 8.0 driver. For more information, see FAQ.
  • Create an Alibaba Cloud Elasticsearch cluster.

    For more information, see Create an Alibaba Cloud Elasticsearch cluster. An Elasticsearch V6.7 cluster of the Standard Edition is created in this topic.

  • Create an Alibaba Cloud ECS instance.

    For more information, see Create an instance by using the wizard. The ECS instance runs an image of CentOS 7.6 64-bit in this topic.

Step 1: Prepare the MySQL data source

Log on to the ApsaraDB RDS console, and create an ApsaraDB RDS for MySQL database and a table. For more information, see General workflow to use ApsaraDB RDS for MySQL.

In this topic, a table named es_test is created. The following figure shows the fields that are added to the table. Fields and indexes of es_test

Step 2: Create an index and configure mappings for the index

  1. Log on to the Kibana console of your Elasticsearch cluster and go to the homepage of the Kibana console as prompted.
    For more information about how to log on to the Kibana console, see Log on to the Kibana console.
    Note In this example, an Elasticsearch V6.7.0 cluster is used. Operations on clusters of other versions may differ. The actual operations in the console prevail.
  2. In the left-side navigation pane of the page that appears, click Dev Tools.
  3. On the Console tab of the page that appears, run the following command to create an index.
    In this example, an index named es_test is created. The index contains the following fields: count, id, name, and color.
    Important The field names and types defined in the mappings in the command must be the same as those in the table created in Step 1: Prepare the MySQL data source.
    PUT es_test?include_type_name=true
    {
    
        "settings" : {
          "index" : {
            "number_of_shards" : "5",
            "number_of_replicas" : "1"
          }
        },
        "mappings" : {
            "_doc" : {
                "properties" : {
                  "count": {          
                       "type": "text"       
                   },
                  "id": {
                       "type": "integer"
                   },
                   "name": {
                        "type" : "text",
                        "analyzer": "ik_smart"                   
                    },
                    "color" : {
                        "type" : "text"                    
                    }
                }
            }
        }
    }
    After the index is created and the mappings are configured, the following result is returned:
    {
      "acknowledged" : true,
      "shards_acknowledged" : true,
      "index" : "es_test"
    }

Step 3: Install the JDK

  1. Connect to the ECS instance.
    For more information, see Connect to a Linux instance by using a password or key.
    Note In this example, a common user is used.
  2. View available JDK packages.
    sudo yum search java | grep -i --color JDK
  3. Install the JDK of the required version.
    In this example, java-1.8.0-openjdk-devel.x86_64 is used.
    sudo yum install java-1.8.0-openjdk-devel.x86_64
  4. Configure environment variables.
    1. Open the profile file in the etc folder.
      vim ~/.bash_profile
    2. Add the following environment variables to the file:
      export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.71-2.b15.el7_2.x86_64
      export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
      export PATH=$PATH:$JAVA_HOME/bin
      Important You must replace JAVA_HOME in the preceding variables with the path in which the JDK is installed. You can run the find / -name 'java' command to query the path.
    3. Press Esc, and run the :wq command to save the file and exit from the vi mode. Then, run the following command to apply the configuration:
      source ~/.bash_profile
  5. Run the following commands to check whether the JDK is installed:
    • java
    • javac
    • java -version
    If the JDK is installed, the result shown in the following figure is returned. JDK installed

Step 4: Install and start the Canal server

  1. Download the Canal server package.
    A Canal 1.1.4 server is used in this topic.
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
    Note Canal 1.1.5 supports only Alibaba Cloud Elasticsearch V7.0 clusters. If your Elasticsearch is of V7.0, you must download the Canal 1.1.5 package. For more information, see Canal release note.
  2. Run the following command to decompress the package:
    tar -zxvf canal.deployer-1.1.4.tar.gz
  3. Run the following command to modify the conf/example/instance.properties file:
    vi conf/example/instance.properties
    Modify the conf/example/instance.properties file
    ParameterDescription
    canal.instance.master.addressConfigure this parameter in the format of <Internal endpoint of the ApsaraDB RDS for MySQL instance>:<Internal port>. You can obtain the required information on the Basic Information page of the ApsaraDB RDS for MySQL instance. Example: rm-bp1u1xxxxxxxxx6ph.mysql.rds.aliyuncs.com:3306.
    canal.instance.dbUsernameThe username that is used to log on to the ApsaraDB RDS for MySQL database. You can obtain the username on the Accounts tab of the ApsaraDB RDS for MySQL instance.
    canal.instance.dbPasswordThe password that is used to log on to the ApsaraDB RDS for MySQL database.
  4. Press Esc, and run the :wq command to save the file and exit from the vi mode.
  5. Start the Canal server and query the log.
    ./bin/startup.sh
    cat logs/canal/canal.log
    Start the Canal server

Step 5: Install and start the Canal adapter

  1. Download the Canal adapter package.
    A Canal 1.1.4 server is used in this topic.
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
    Note Canal 1.1.5 supports only Alibaba Cloud Elasticsearch V7.0 clusters. If your Elasticsearch is of V7.0, you must download the Canal 1.1.5 package. For more information, see Canal release note.
  2. Run the following command to decompress the package:
    tar -zxvf canal.adapter-1.1.4.tar.gz
  3. Run the following command to modify the conf/application.yml file:
    vi conf/application.yml
    Modify the conf/application.yml file
    ParameterDescription
    canal.conf.canalServerHostThe address of the Canal deployer. Retain the default value 127.0.0.1:11111.
    canal.conf.srcDataSources.defaultDS.urlConfigure this parameter in the format of jdbc:mysql://<Internal endpoint of the ApsaraDB RDS for MySQL instance>:<Internal port>/<Database name>?useUnicode=true. You can obtain the required information on the Basic Information page of the ApsaraDB RDS for MySQL instance. Example: jdbc:mysql://rm-bp1xxxxxxxxxnd6ph.mysql.rds.aliyuncs.com:3306/elasticsearch?useUnicode=true.
    canal.conf.srcDataSources.defaultDS.usernameThe username that is used to log on to the ApsaraDB RDS for MySQL database. You can obtain the username on the Accounts tab of the ApsaraDB RDS for MySQL instance.
    canal.conf.srcDataSources.defaultDS.passwordThe password that is used to log on to the ApsaraDB RDS for MySQL database.
    canal.conf.canalAdapters.groups.outerAdapters.hostsFind name:es and replace hosts with <Internal endpoint of the Elasticsearch cluster>:<Internal port>. You can obtain the required information on the Basic Information page of the Elasticsearch cluster. Example: es-cn-v64xxxxxxxxx3medp.elasticsearch.aliyuncs.com:9200.
    canal.conf.canalAdapters.groups.outerAdapters.modeSet the value to rest.
    canal.conf.canalAdapters.groups.outerAdapters.properties.security.authConfigure this parameter in the format of <Username of the Elasticsearch cluster>:<Password>. Example: elastic:es_password.
    canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.nameThe ID of the Elasticsearch cluster. You can obtain the cluster ID on the Basic Information page of the Elasticsearch cluster. Example: es-cn-v64xxxxxxxxx3medp.
  4. Press Esc, and run the :wq command to save the file and exit from the vi mode.
  5. Repeat the preceding steps to modify the conf/es/*.yml file and specify the fields that you want to map from the ApsaraDB RDS for MySQL database to the Elasticsearch cluster.
    Modify the conf/es/*.yml file
    ParameterDescription
    esMapping._indexSet the value to the name of the index created in the Elasticsearch cluster in Step 2: Create an index and configure mappings for the index. es_test is used in this topic.
    esMapping._typeSet the value to the type of the index created in the Elasticsearch cluster in Step 2: Create an index and configure mappings for the index. _doc is used in this topic.
    esMapping._idThe ID of the document generated for the fields that you want to synchronize to the Elasticsearch cluster. You can customize this parameter. _id is used in this topic.
    esMapping.sqlThe SQL statement that is used to query the fields that you want to synchronize to the Elasticsearch cluster. The select t.id as _id,t.id,t.count,t.name,t.color from es_test t statement is used in this topic.
  6. Start the Canal adapter and query logs.
    ./bin/startup.sh
    cat logs/adapter/adapter.log
    If the Canal adapter is started, the result shown in the following figure is returned. Canal adapter service logs

Step 6: Verify the synchronization result of incremental data

  1. In the ApsaraDB RDS for MySQL database, add, modify, or remove data in the es_test table.
    insert `elasticsearch`.`es_test`(`count`,`id`,`name`,`color`) values('11',2,'canal_test2','red');
  2. Log on to the Kibana console of your Elasticsearch cluster and go to the homepage of the Kibana console as prompted.
    For more information about how to log on to the Kibana console, see Log on to the Kibana console.
    Note In this example, an Elasticsearch V6.7.0 cluster is used. Operations on clusters of other versions may differ. The actual operations in the console prevail.
  3. In the left-side navigation pane of the page that appears, click Dev Tools.
  4. On the Console tab of the page that appears, run the following command to query the synchronized data:
    GET /es_test/_search
    If the incremental data is synchronized, the result shown in the following figure is returned. Successful data synchronization
    Important You can use Canal to synchronize only incremental data.

FAQ

Q: I want to synchronize data from an ApsaraDB RDS instance that run MySQL 8.0 to an Elasticsearch cluster. How do I change the existing MySQL driver to the MySQL 8.0 driver?
Note In this example, the root user is used.
A:
  1. Download the package of the MySQL 8.0 driver.
    wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.29.zip
  2. Decompress the package.
    unzip mysql-connector-java-8.0.29.zip
  3. Copy the obtained file to the lib directory of the Canal adapter.
    mv mysql-connector-java-8.0.29/mysql-connector-java-8.0.29.jar lib/
  4. Add permissions.
    chmod 777 mysql-connector-java-8.0.29.jar
    chmod +st mysql-connector-java-8.0.29.jar
  5. Delete the MySQL 5.x driver.
    rm -rf lib/mysql-connector-java-5.1.40.jar