This topic describes how to use Canal to synchronize data from a MySQL database to Message Queue for Apache Kafka .

Background information

Canal can parse the incremental data of a MySQL database and allows you to subscribe to and consume the incremental data. Canal assumes the role of a secondary MySQL database and initiates a dump request to the primary MySQL database. After the primary MySQL database receives the dump request, the primary MySQL database pushes binary logs to Canal. Canal parses the binary logs, obtains the incremental data, and then synchronizes the incremental data. Canal can connect to Message Queue for Apache Kafka and write the updated data of a MySQL database to Message Queue for Apache Kafka for analysis. For more information about how Canal works, visit the Canal official website. Background information

Prerequisites

Before you use Canal to synchronize data, make sure that the following conditions are met:

Procedure

  1. Download the Canal package. In this example, V1.1.5 is used.
  2. Run the following command to create a directory. In this example, the /home/doc/tools/canal.deployer-1.1.5 directory is created.
    mkdir -p /home/doc/tools/canal.deployer-1.1.5
  3. Copy the Canal package to the /home/doc/tools/canal.deployer-1.1.5 directory and decompress the package.
    tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /home/doc/tools/canal.deployer-1.1.5
  4. In the /home/doc/tools/canal.deployer-1.1.5 directory, run the following command to modify the instance.properties file:
    vi conf/example/instance.properties

    Set the parameters based on the description in the Table 1 table.

    # Set the following parameters based on the information about your MySQL database. 
    #################################################
    ...
    # The URL of the database. 
    canal.instance.master.address=192.168.XX.XX:3306
    # The username and password that are used to connect to the database. 
    ...
    canal.instance.dbUsername=****
    canal.instance.dbPassword=****
    ...
    # mq config
    # The topic that you created in the  Message Queue for Apache Kafka  console. 
    canal.mq.topic=mysql_test
    # Specify a dynamic topic based on the database name or table name. 
    #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
    # The partition in the  Message Queue for Apache Kafka  topic to which data is synchronized. 
    canal.mq.partition=0
    # The following two parameters cannot be set together with the canal.mq.partition parameter. If you set the following two parameters, data is synchronized to different partitions in the  Message Queue for Apache Kafka  topic. 
    #canal.mq.partitionsNum=3
    # Database name.Table name:Unique primary key. Separate multiple table settings with commas (,). 
    #canal.mq.partitionHash=mytest.person:id,mytest.role:id
    #################################################
    Table 1. Parameters in the instance.properties file
    Parameter Required Description
    canal.instance.master.address Yes The URL of the MySQL database.
    canal.instance.dbUsername Yes The username that is used to connect to the MySQL database.
    canal.instance.dbPassword Yes The password that is used to connect to the MySQL database.
    canal.mq.topic Yes The topic in the Message Queue for Apache Kafka instance. You can create a topic on the Topics page in the Message Queue for Apache Kafka console. For more information, see Step 3: Create resources.
    canal.mq.dynamicTopic No The regular expression that is used to match dynamic topics. After you specify the regular expression, Canal evaluates the expression to synchronize data of different tables from the MySQL database to different topics in the Message Queue for Apache Kafka instance. For more information, see Parameter description.
    canal.mq.partition No The partition in the Message Queue for Apache Kafka topic to which the database data is to be synchronized.
    canal.mq.partitionsNum No The number of partitions in the topic. This parameter is used together with the canal.mq.partitionHash parameter to synchronize data to different partitions in the Message Queue for Apache Kafka topic.
    canal.mq.partitionHash No The regular expression that is used to match partitions. For more information, see Parameter description.
  5. Run the following command to open the canal.properties file:
    vi conf/canal.properties

    Set the parameters based on the description in the Table 2 table.

    • If you want to connect a client to Message Queue for Apache Kafka over the Internet, the SASL_SSL protocol is used for authentication and encryption, and the Secure Sockets Layer (SSL) endpoint is required. For more information about endpoints, see Comparison among endpoints.
      # ...
      # Set the value to kafka. 
      canal.serverMode = kafka
      # ...
      # Configure the settings of Message Queue for Apache Kafka. 
      # The SSL endpoint of the  Message Queue for Apache Kafka  instance. You can obtain the endpoint on the Instance Details page in the Message Queue for Apache Kafka console. 
      kafka.bootstrap.servers = 192.168.XX.XX:9093,192.168.XX.XX:9093,192.168.XX.XX:9093
      # Set the parameters as required or retain the following default settings: 
      kafka.acks = all
      kafka.compression.type = none
      kafka.batch.size = 16384
      kafka.linger.ms = 1
      kafka.max.request.size = 1048576
      kafka.buffer.memory = 33554432
      kafka.max.in.flight.requests.per.connection = 1
      kafka.retries = 0
      
      # If a client connects to Message Queue for Apache Kafka over the Internet, the SASL_SSL protocol is used for authentication and encryption. You must specify the network protocol and identity authentication mechanism. 
      kafka.ssl.truststore.location= ../conf/kafka_client_truststore_jks
      kafka.ssl.truststore.password= KafkaOnsCl****
      kafka.security.protocol= SASL_SSL
      kafka.sasl.mechanism = PLAIN
      kafka.ssl.endpoint.identification.algorithm =
      Table 2. Parameters in the canal.properties file
      Parameter Required Description
      canal.serverMode Yes The server type of Canal. Set the value to kafka.
      kafka.bootstrap.servers Yes The endpoint of the Message Queue for Apache Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the Message Queue for Apache Kafka console.
      kafka.ssl.truststore.location Yes The storage path of the root SSL certificate kafka.client.truststore.jks.
      Note If a client connects to Message Queue for Apache Kafka over the Internet, authentication and encryption are required to ensure the security of message transmission. This means that you must use the SSL endpoint to connect to Message Queue for Apache Kafka and use the SASL_SSL protocol for authentication and encryption. For more information, see Comparison among endpoints.
      kafka.acks Yes The level of acknowledgment (ACK) that the client can receive from a Message Queue for Apache Kafka broker after the broker receives the message. Valid values:
      • 0: The client never waits for an ACK from the broker.
      • 1: The client receives an ACK after the leader receives the message. The leader writes the message to its log but responds without waiting for a full ACK from all followers.
      • all: The client receives an ACK after all in-sync replicas receive the message. The leader waits for the full set of in-sync replicas to acknowledge the message.
      kafka.compression.type Yes The algorithm that is used to compress data. By default, data is not compressed. Valid values:
      • none
      • gzip
      • snappy
      kafka.batch.size Yes The maximum size of messages to accumulate in a batch that the client will send. Unit: bytes.

      This parameter specifies the maximum number of bytes that can be sent in a batch. Each time the client sends a request to a broker, data is distributed across batches. This reduces the number of requests to be sent. A small batch size may cause lower throughput, whereas a large batch size may cause more wasted memory space. After you set this parameter, a fixed size of buffer is allocated for message processing. This helps improve the performance of both clients and brokers.

      Note The kafka.batch.size and kafka.linger.ms parameters are used to configure batch message processing. If the size of a batch or the duration that the client waits for messages to accumulate in the batch exceeds the specified threshold, the messages in the batch are ready to be sent.
      kafka.linger.ms Yes The maximum duration that the client waits for messages to accumulate in a batch. Unit: milliseconds.

      The client sends a request to a broker when the waiting duration reaches the specified value. This facilitates batch message processing and reduces the number of requests to be sent.

      kafka.max.request.size Yes The maximum number of bytes of a request sent by the client.
      kafka.buffer.memory Yes The total size of memory that the client can use to buffer messages that are waiting to be sent to a broker.
      kafka.max.in.flight.requests.per.connection Yes The number of unacknowledged requests that the client can send on a single connection. If this parameter is set to 1, the client cannot send requests to the same broker before the broker responds to the request.
      kafka.retries Yes Specifies whether the client resends a message if the message fails to be sent. If you set this parameter to 0, the client does not resend a message when the message fails to be sent. If you set this parameter to a value greater than 0, the client resends a message when the message fails to be sent.
      kafka.ssl.truststore.password Yes The password of the truststore in the root SSL certificate.
      kafka.security.protocol Yes Set this parameter to SASL_SSL if the SASL_SSL protocol is used for authentication and encryption.
      kafka.sasl.mechanism Yes The Simple Authentication and Security Layer (SASL) mechanism that is used for identity authentication. Set this parameter to PLAIN if the SSL endpoint is used to connect to Message Queue for Apache Kafka.

      If a client connects to Message Queue for Apache Kafka over the Internet, SASL is used for identity authentication. You must configure environment variables in the bin/startup.sh file and specify the username and password of the SASL user for the Message Queue for Apache Kafka instance in the kafka_client_producer_jaas.conf file.

      1. Run the vi bin/startup.sh command and configure environment variables in the startup.sh file:
        JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8 -Djava.security.auth.login.config=/home/doc/tools/canal.deployer-1.1.5/conf/kafka_client_jaas.conf"
      2. Run the vi conf/kafka_client_producer_jaas.conf command and specify the username and password of the SASL user for the Message Queue for Apache Kafka instance in the kafka_client_producer_jaas.conf file.
        Note
        • If the access control list (ACL) feature is disabled for the Message Queue for Apache Kafka instance, you can obtain the default username and password of the SASL user on the Instance Details page in the Message Queue for Apache Kafka console.
        • If the ACL feature is enabled for the Message Queue for Apache Kafka instance, make sure that the SASL user to be used is of the PLAIN type and that the user is authorized to send and subscribe to messages. For more information, see Authorize SASL users.
        KafkaClient {  org.apache.kafka.common.security.plain.PlainLoginModule required
                       username="The username of the SASL user"  
                       password="The password of the SASL user";
        };
    • If a client connects to Message Queue for Apache Kafka in a virtual private cloud (VPC), authentication and encryption are not required. The PLAINTEXT protocol is used to transmit messages, and the default endpoint is used to connect to Message Queue for Apache Kafka . In this case, you need only to set the canal.serverMode and kafka.bootstrap.servers parameters. For more information about endpoints, see Comparison among endpoints.
      # ...
      # Set the value to kafka. 
      canal.serverMode = kafka
      # ...
      # Configure the settings of Message Queue for Apache Kafka. 
      # The default endpoint of the  Message Queue for Apache Kafka  instance. You can obtain the endpoint on the Instance Details page in the Message Queue for Apache Kafka console. 
      kafka.bootstrap.servers = 192.168.XX.XX:9092,192.168.XX.XX:9092,192.168.XX.XX:9092
      # Set the parameters as required or retain the following default settings: 
      kafka.acks = all
      kafka.compression.type = none
      kafka.batch.size = 16384
      kafka.linger.ms = 1
      kafka.max.request.size = 1048576
      kafka.buffer.memory = 33554432
      kafka.max.in.flight.requests.per.connection = 1
      kafka.retries = 0
  6. In the /home/doc/tools/canal.deployer-1.1.5 directory, run the following command to start Canal:
    sh bin/startup.sh
    • Check the log file /home/doc/tools/canal.deployer-1.1.5/logs/canal/canal.log to confirm that Canal is connected to Message Queue for Apache Kafka and Canal is running.
      2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
      2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.XX.XX:11111]
      2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
    • Check the log file /home/doc/tools/canal.deployer-1.1.5/logs/example/example.log to confirm that a CanalInstance object is started.
      2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
      2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
      2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
      2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

Test the configurations

After Canal is started, perform a data synchronization test.

  1. In a MySQL database named mysql, create a table named T_Student. The following sample code provides an example on how to query the data in the table:
    mysql> select * from T_Student;
    +--------+---------+------+------+
    | stuNum | stuName | age  | sex  |
    +--------+---------+------+------+
    | 1 | Wang | 18 | girl |
    | 2 | Zhang | 17 | boy |
    +--------+---------+------+------+
    2 rows in set (0.00 sec)
    Check the log file /home/doc/tools/canal.deployer-1.1.5/logs/example/meta.log. Each time an add, a delete, or an update operation is performed in the database, a record is generated in the meta.log file. Check the log file to confirm whether Canal has collected data.
    tail -f example/meta.log
    2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/192.168.XX.XX:3306]
    2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/192.168.XX.XX:3306]
    2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/192.168.XX.XX:3306]
  2. Log on to the Message Queue for Apache Kafka console and query messages to check whether the incremental data of the MySQL database is synchronized to the Message Queue for Apache Kafka instance. For more information about how to query messages in the console, see Query messages.
    Query messages
  3. After data is synchronized, run the following command to stop Canal:
    sh bin/stop.sh