All Products
Search
Document Center

ApsaraMQ for Kafka:Use Canal to synchronize data from a MySQL database to Message Queue for Apache Kafka

Last Updated:Sep 14, 2023

This topic describes how to use Canal to synchronize data from a MySQL database to ApsaraMQ for Kafka.

Background information

Canal can parse the incremental data of a MySQL database and allows you to subscribe to and consume the incremental data. Canal assumes the role of a secondary MySQL database and initiates a dump request to the primary MySQL database. After the primary MySQL database receives the dump request, the primary MySQL database pushes binary logs to Canal. Canal parses the binary logs, obtains the incremental data, and then synchronizes the incremental data. Canal can connect to ApsaraMQ for Kafka and write the incremental data of a MySQL database to ApsaraMQ for Kafka for analysis. For more information about how Canal works, see Canal documentation on GitHub. Background information

Prerequisites

Before you use Canal to synchronize data, make sure that the following conditions are met:

Procedure

  1. Download the Canal package. In the example of this topic, V1.1.5 is used.
  2. Run the following command to create a directory. In the example, the /home/doc/tools/canal.deployer-1.1.5 directory is created.
    mkdir -p /home/doc/tools/canal.deployer-1.1.5
  3. Copy the Canal package to the /home/doc/tools/canal.deployer-1.1.5 directory and decompress the package.
    tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /home/doc/tools/canal.deployer-1.1.5
  4. In the /home/doc/tools/canal.deployer-1.1.5 directory, run the following command to modify the instance.properties file:
    vi conf/example/instance.properties

    Set the parameters based on the description in the Parameters in the instance.properties file table.

    # Set the following parameters based on the information about your MySQL database. 
    #################################################
    ...
    # The URL of the database. 
    canal.instance.master.address=192.168.XX.XX:3306
    # The username and password that are used to connect to the database. 
    ...
    canal.instance.dbUsername=****
    canal.instance.dbPassword=****
    ...
    # mq config
    # The topic that you created in the Message Queue for Apache Kafka console. 
    canal.mq.topic=mysql_test
    # Specify a dynamic topic based on the database name or table name. 
    #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
    # The partition in the Message Queue for Apache Kafka topic to which data is to be synchronized. 
    canal.mq.partition=0
    # The following two parameters cannot be set together with the canal.mq.partition parameter. If you set the following two parameters, data is synchronized to different partitions in the Message Queue for Apache Kafka topic. 
    #canal.mq.partitionsNum=3
    # Database name.Table name:Unique primary key. Separate multiple table settings with commas (,). 
    #canal.mq.partitionHash=mytest.person:id,mytest.role:id
    #################################################
    Table 1. Parameters in the instance.properties file
    ParameterRequiredDescription
    canal.instance.master.addressYesThe URL of the MySQL database.
    canal.instance.dbUsernameYesThe username that is used to connect to the MySQL database.
    canal.instance.dbPasswordYesThe password that is used to connect to the MySQL database.
    canal.mq.topicYesThe topic in the ApsaraMQ for Kafka instance. You can create a topic on the Topics page in the ApsaraMQ for Kafka console. For more information, see Step 3: Create resources.
    canal.mq.dynamicTopicNoThe regular expression that is used to match dynamic topics. After you specify the regular expression, Canal evaluates the expression to synchronize data of different tables from the MySQL database to different topics in the Message Queue for Apache Kafka instance. For more information, see Parameter description.
    canal.mq.partitionNoThe partition in the ApsaraMQ for Kafka topic to which the database data is to be synchronized.
    canal.mq.partitionsNumNoThe number of partitions in the topic. This parameter is used together with the canal.mq.partitionHash parameter to synchronize data to different partitions in the ApsaraMQ for Kafka topic.
    canal.mq.partitionHashNoThe regular expression that is used to match partitions. For more information, see Parameter description.
  5. Run the following command to open the canal.properties file:
    vi conf/canal.properties

    Set the parameters based on the description in the Parameters in the canal.properties file table.

    • If you want to connect a client to ApsaraMQ for Kafka over the Internet, the SASL_SSL protocol is used for authentication and encryption, and the Secure Sockets Layer (SSL) endpoint is required. For more information about endpoints, see Comparison among endpoints.
      # ...
      # Set the value to kafka. 
      canal.serverMode = kafka
      # ...
      # Configure the settings of Message Queue for Apache Kafka. 
      # The SSL endpoint of the Message Queue for Apache Kafka instance. You can obtain the endpoint on the Instance Details page in the Message Queue for Apache Kafka console. 
      kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
      # Set the parameters as required or retain the following default settings: 
      kafka.acks = all
      kafka.compression.type = none
      kafka.batch.size = 16384
      kafka.linger.ms = 1
      kafka.max.request.size = 1048576
      kafka.buffer.memory = 33554432
      kafka.max.in.flight.requests.per.connection = 1
      kafka.retries = 0
      
      # If a client connects to Message Queue for Apache Kafka over the Internet, the SASL_SSL protocol is used for authentication and encryption. You must specify the network protocol and identity authentication mechanism. 
      kafka.ssl.truststore.location= ../conf/kafka_client_truststore_jks
      kafka.ssl.truststore.password= KafkaOnsClient
      kafka.security.protocol= SASL_SSL
      kafka.sasl.mechanism = PLAIN
      kafka.ssl.endpoint.identification.algorithm =
      Table 2. Parameters in the canal.properties file
      ParameterRequiredDescription
      canal.serverModeYesThe server type of Canal. Set this parameter to kafka.
      kafka.bootstrap.serversYesThe endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.
      kafka.ssl.truststore.locationYesThe storage path of the root SSL certificate kafka.client.truststore.jks.
      Note If a client connects to Message Queue for Apache Kafka over the Internet, authentication and encryption are required to ensure the security of message transmission. This means that you must use the SSL endpoint to connect to Message Queue for Apache Kafka and use the SASL_SSL protocol for authentication and encryption. For more information, see Comparison among endpoints.
      kafka.acks YesThe level of acknowledgment (ACK) that the client can receive from a ApsaraMQ for Kafka broker after the broker receives the message. Valid values:
      • 0: The client never waits for an ACK from the broker.
      • 1: The client receives an ACK after the leader receives the message. The leader writes the message to its log but responds without waiting for a full ACK from all followers.
      • all: The client receives an ACK after all in-sync replicas receive the message. The leader waits for the full set of in-sync replicas to acknowledge the message.
      kafka.compression.typeYesThe algorithm that is used to compress data. By default, data is not compressed. Valid values:
      • none
      • gzip
      • snappy
      kafka.batch.sizeYesThe maximum size of messages to accumulate in a batch that the client will send. Unit: byte.

      This parameter specifies the maximum number of bytes that can be sent in a batch. Each time the client sends a request to a broker, data is distributed across batches. This reduces the number of requests to be sent. A small batch size may cause lower throughput, whereas a large batch size may cause more wasted memory space. After you set this parameter, a fixed size of buffer is allocated for message processing. This helps improve the performance of both clients and brokers.

      Note The kafka.batch.size and kafka.linger.ms parameters are used to configure batch message processing. If the size of a batch or the duration that the client waits for messages to accumulate in the batch exceeds the specified threshold, the messages in the batch are ready to be sent.
      kafka.linger.msYesThe maximum duration that the client waits for messages to accumulate in a batch. Unit: milliseconds.

      The client sends a request to a broker when the waiting duration reaches the specified value. This facilitates batch message processing and reduces the number of requests to be sent.

      kafka.max.request.sizeYesThe maximum number of bytes of a request sent by the client.
      kafka.buffer.memoryYesThe total size of memory that the client can use to buffer messages that are waiting to be sent to a broker.
      kafka.max.in.flight.requests.per.connectionYesThe number of unacknowledged requests that the client can send on a single connection. If this parameter is set to 1, the client cannot send requests to the same broker before the broker responds to the request.
      kafka.retriesYesSpecifies whether the client resends a message if the message fails to be sent. If you set this parameter to 0, the client does not resend a message when the message fails to be sent. If you set this parameter to a value greater than 0, the client resends a message when the message fails to be sent.
      kafka.ssl.truststore.passwordYesThe password of the truststore in the root SSL certificate. Set this parameter to KafkaOnsClient.
      kafka.security.protocolYesSet this parameter to SASL_SSL if the SASL_SSL protocol is used for authentication and encryption.
      kafka.sasl.mechanismYesThe Simple Authentication and Security Layer (SASL) mechanism that is used for identity authentication. Set this parameter to PLAIN if the SSL endpoint is used to connect to Message Queue for Apache Kafka.

      If a client connects to Message Queue for Apache Kafka over the Internet, SASL is used for identity authentication. You must configure environment variables in the bin/startup.sh file and specify the username and password of the SASL user for the ApsaraMQ for Kafka instance in the kafka_client_producer_jaas.conf file.

      1. Run the vi bin/startup.sh command and configure environment variables in the startup.sh file:
        JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8 -Djava.security.auth.login.config=/home/doc/tools/canal.deployer-1.1.5/conf/kafka_client_jaas.conf"
      2. Run the vi conf/kafka_client_producer_jaas.conf command and specify the username and password of the SASL user for the Message Queue for Apache Kafka instance in the kafka_client_producer_jaas.conf file.
        Note
        • If the access control list (ACL) feature is disabled for the Message Queue for Apache Kafka instance, you can obtain the default username and password of the SASL user on the Instance Details page in the ApsaraMQ for Kafka console.
        • If the ACL feature is enabled for the Message Queue for Apache Kafka instance, make sure that the SASL user to be used is of the PLAIN type and that the user is authorized to send and consume messages. For more information, see Grant permissions to SASL users.
        KafkaClient {  org.apache.kafka.common.security.plain.PlainLoginModule required
                       username="The username of the SASL user"  
                       password="The password of the SASL user";
        };
    • If a client connects to Message Queue for Apache Kafka in a virtual private cloud (VPC), authentication and encryption are not required. The PLAINTEXT protocol is used to transmit messages, and the default endpoint is used to connect to ApsaraMQ for Kafka. In this case, you need only to set the canal.serverMode and kafka.bootstrap.servers parameters. For more information about endpoints, see Comparison among endpoints.
      # ...
      # Set the value to kafka. 
      canal.serverMode = kafka
      # ...
      # Configure the settings of Message Queue for Apache Kafka. 
      # The default endpoint of the Message Queue for Apache Kafka instance. You can obtain the endpoint on the Instance Details page in the Message Queue for Apache Kafka console. 
      kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
      # Set the parameters as required or retain the following default settings: 
      kafka.acks = all
      kafka.compression.type = none
      kafka.batch.size = 16384
      kafka.linger.ms = 1
      kafka.max.request.size = 1048576
      kafka.buffer.memory = 33554432
      kafka.max.in.flight.requests.per.connection = 1
      kafka.retries = 0
  6. In the /home/doc/tools/canal.deployer-1.1.5 directory, run the following command to start Canal:
    sh bin/startup.sh
    • Check the log file /home/doc/tools/canal.deployer-1.1.5/logs/canal/canal.log to confirm that Canal is connected to ApsaraMQ for Kafka and Canal is running.
      2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
      2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.XX.XX:11111]
      2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
    • Check the log file /home/doc/tools/canal.deployer-1.1.5/logs/example/example.log to confirm that a CanalInstance object is started.
      2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
      2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
      2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
      2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

Test the configurations

After Canal is started, perform a data synchronization test.

  1. In a MySQL database named mysql, create a table named T_Student. The following sample code provides an example on how to query the data in the table:
    mysql> select * from T_Student;
    +--------+---------+------+------+
    | stuNum | stuName | age  | sex  |
    +--------+---------+------+------+
    |      1 | Wang    |   18 | girl |
    |      2 | Zhang    |   17 | boy |
    +--------+---------+------+------+
    2 rows in set (0.00 sec)
    Check the log file /home/doc/tools/canal.deployer-1.1.5/logs/example/meta.log. Each time an add, a delete, or an update operation is performed in the database, a record is generated in the meta.log file. Check the log file to confirm whether Canal has collected data.
    tail -f example/meta.log
    2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/192.168.XX.XX:3306]
    2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/192.168.XX.XX:3306]
    2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/192.168.XX.XX:3306]
  2. Log on to the ApsaraMQ for Kafka console and query messages to check whether the incremental data of the MySQL database is synchronized to the ApsaraMQ for Kafka instance. For more information about how to query messages in the console, see Query messages.
    Message query
  3. After data is synchronized, run the following command to stop Canal:
    sh bin/stop.sh