All Products
Search
Document Center

ApsaraMQ for Kafka:Synchronize MySQL data to ApsaraMQ for Kafka by using Canal

Last Updated:Mar 11, 2026

When your application needs real-time visibility into MySQL data changes -- for analytics dashboards, search indexing, or downstream event processing -- you need a Change Data Capture (CDC) pipeline. Canal captures incremental changes from the MySQL binary log and streams them to ApsaraMQ for Kafka, so downstream consumers receive row-level INSERT, UPDATE, and DELETE events.

How Canal works

Canal emulates a MySQL secondary database. It sends a dump request to the primary MySQL database, receives binary log events, parses them into structured change records, and forwards the records to ApsaraMQ for Kafka.

How Canal works

For more information, see Canal on GitHub.

What you will do

This tutorial walks through five steps:

  1. Configure MySQL binary logging

  2. Download and install Canal

  3. Configure the MySQL connection

  4. Configure the Kafka connection

  5. Start Canal and verify data flow

Prerequisites

Before you begin, make sure that you have:

  • A MySQL database with binary logging enabled in ROW format (see Configure MySQL binary logging)

  • An ApsaraMQ for Kafka instance with at least one topic created -- see Step 3: Create resources

  • Network connectivity between the Canal host, the MySQL database, and the ApsaraMQ for Kafka instance

Configure MySQL binary logging

Canal relies on row-level binary logging to capture changes. Add the following settings to your MySQL configuration file (typically my.cnf or my.ini), then restart MySQL:

[mysqld]
server-id         = 1
log_bin           = mysql-bin
binlog_format     = ROW
binlog_row_image  = FULL
ParameterDescription
server-idA unique integer across all MySQL servers and replication clients in the cluster
log_binBase name for binary log files
binlog_formatMust be ROW. Canal cannot parse STATEMENT or MIXED format logs
binlog_row_imageMust be FULL to capture complete before-and-after row images

Create a MySQL user for Canal with replication privileges:

CREATE USER 'canal'@'%' IDENTIFIED BY '<your-password>';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

For more information, see the Canal QuickStart.

Download and install Canal

This guide uses Canal V1.1.5. Download the package from GitHub.

# Create the installation directory
mkdir -p /home/doc/tools/canal.deployer-1.1.5

# Extract the package
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /home/doc/tools/canal.deployer-1.1.5

Configure the MySQL connection

Edit conf/example/instance.properties to point Canal at your MySQL database:

vi conf/example/instance.properties

Set the following parameters:

# MySQL connection
canal.instance.master.address = 192.168.XX.XX:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = <your-password>

# Kafka topic mapping
canal.mq.topic = mysql_test

# Partition routing (choose one approach)
# Option A: Send all changes to a single partition
canal.mq.partition = 0

# Option B: Distribute changes across partitions by primary key
# canal.mq.partitionsNum = 3
# canal.mq.partitionHash = mytest.person:id,mytest.role:id

instance.properties parameter reference

ParameterRequiredDescription
canal.instance.master.addressYesMySQL database address in host:port format
canal.instance.dbUsernameYesMySQL username with replication privileges
canal.instance.dbPasswordYesPassword for the MySQL user
canal.mq.topicYesTarget topic in ApsaraMQ for Kafka. Create topics on the Topics page in the ApsaraMQ for Kafka console. For more information, see Step 3: Create resources
canal.mq.dynamicTopicNoRegex pattern that routes changes from different tables to different Kafka topics. For syntax details, see Canal MQ parameter reference
canal.mq.partitionNoFixed partition index for all messages. Cannot be used together with canal.mq.partitionsNum or canal.mq.partitionHash
canal.mq.partitionsNumNoTotal number of partitions for hash-based routing. Use with canal.mq.partitionHash
canal.mq.partitionHashNoHash routing rules in database.table:column format. Separate multiple rules with commas. For syntax details, see Canal MQ parameter reference

Configure the Kafka connection

Edit conf/canal.properties to connect Canal to your ApsaraMQ for Kafka instance:

vi conf/canal.properties

The configuration differs depending on your network access method. Choose one of the following:

  • Internet access (SASL_SSL) -- requires authentication and encryption

  • VPC access (PLAINTEXT) -- no authentication required

For endpoint details, see Comparison among endpoints.

Internet access (SASL_SSL)

When connecting over the Internet, Canal authenticates with ApsaraMQ for Kafka through SASL_SSL. You need to modify three files.

File 1: conf/canal.properties

# Set Canal to use Kafka as the output
canal.serverMode = kafka

# SSL endpoint of your ApsaraMQ for Kafka instance
# Get this from the Endpoint Information section on the Instance Details page
kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093

# Kafka producer settings (adjust as needed)
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

# SASL_SSL authentication
kafka.ssl.truststore.location = ../conf/kafka_client_truststore_jks
kafka.ssl.truststore.password = KafkaOnsClient
kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN
kafka.ssl.endpoint.identification.algorithm =

File 2: bin/startup.sh

Add the JAAS configuration path to the JAVA_OPTS variable:

JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8 -Djava.security.auth.login.config=/home/doc/tools/canal.deployer-1.1.5/conf/kafka_client_jaas.conf"

File 3: conf/kafka_client_producer_jaas.conf

Specify the SASL credentials for your ApsaraMQ for Kafka instance:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="<your-sasl-username>"
  password="<your-sasl-password>";
};

Replace the placeholders with your actual values:

PlaceholderDescriptionWhere to find it
<your-sasl-username>SASL usernameInstance Details page in the ApsaraMQ for Kafka console
<your-sasl-password>SASL passwordInstance Details page in the ApsaraMQ for Kafka console
Note
  • If the Access Control List (ACL) feature is disabled for your instance, you can obtain the default SASL username and password on the Instance Details page in the ApsaraMQ for Kafka console.

  • If the ACL feature is enabled for your instance, the SASL user must be of the PLAIN type and have permissions to send and consume messages. For more information, see Grant permissions to SASL users.

Download the SSL truststore file kafka.client.truststore.jks and save it to the conf/ directory.

VPC access (PLAINTEXT)

When connecting from within a VPC, no authentication or encryption is required. Set canal.serverMode and kafka.bootstrap.servers in conf/canal.properties:

# Set Canal to use Kafka as the output
canal.serverMode = kafka

# Default (VPC) endpoint of your ApsaraMQ for Kafka instance
# Get this from the Endpoint Information section on the Instance Details page
kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092

# Kafka producer settings (adjust as needed)
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

canal.properties parameter reference

ParameterRequiredDescription
canal.serverModeYesOutput destination type. Set to kafka
kafka.bootstrap.serversYesApsaraMQ for Kafka endpoint. Get this from the Endpoint Information section on the Instance Details page in the ApsaraMQ for Kafka console. Use port 9093 for Internet (SSL) or port 9092 for VPC
kafka.acksYesAcknowledgment level from the broker. 0: no acknowledgment. 1: leader acknowledgment only. all: all in-sync replicas must acknowledge
kafka.compression.typeYesMessage compression algorithm. Valid values: none, gzip, snappy
kafka.batch.sizeYesMaximum batch size in bytes. Larger values improve throughput; smaller values reduce latency. Default: 16384
kafka.linger.msYesMaximum time in milliseconds to wait for additional messages before sending a batch. Default: 1
kafka.max.request.sizeYesMaximum size of a single produce request in bytes. Default: 1048576
kafka.buffer.memoryYesTotal memory in bytes available for buffering unsent messages. Default: 33554432
kafka.max.in.flight.requests.per.connectionYesMaximum number of unacknowledged requests per connection. Set to 1 to guarantee ordering. Default: 1
kafka.retriesYesNumber of retry attempts for failed sends. 0 disables retries. Default: 0
kafka.ssl.truststore.locationInternet onlyPath to the SSL truststore file (kafka.client.truststore.jks)
kafka.ssl.truststore.passwordInternet onlyTruststore password. Set to KafkaOnsClient
kafka.security.protocolInternet onlySecurity protocol. Set to SASL_SSL for Internet access
kafka.sasl.mechanismInternet onlySASL authentication mechanism. Set to PLAIN

Start Canal and verify data flow

Start Canal from the installation directory:

sh bin/startup.sh

Verify that Canal started

Check the Canal server log:

tail -f logs/canal/canal.log

Expected output:

2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.XX.XX:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

Check the Canal instance log:

tail -f logs/example/example.log

Expected output:

2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

Test with a sample table

  1. In a MySQL database named mysql, create a test table and insert sample data:

        mysql> SELECT * FROM T_Student;
        +--------+---------+------+------+
        | stuNum | stuName | age  | sex  |
        +--------+---------+------+------+
        |      1 | Wang    |   18 | girl |
        |      2 | Zhang   |   17 | boy  |
        +--------+---------+------+------+
        2 rows in set (0.00 sec)
  2. Confirm that Canal captured the changes by checking the meta log. Each INSERT, UPDATE, or DELETE generates a new entry: Expected output: Each line shows the binary log file (log.000001), the byte offset (for example, 29723), and the source MySQL address -- confirming that Canal is reading changes.

        tail -f logs/example/meta.log
        2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/192.168.XX.XX:3306]
        2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/192.168.XX.XX:3306]
  3. Verify that messages arrived in ApsaraMQ for Kafka. Open the ApsaraMQ for Kafka console and query messages for your topic. For more information, see Query messages.

    Query messages in the ApsaraMQ for Kafka console

  4. After you finish testing, stop Canal:

        sh bin/stop.sh

Troubleshooting

SymptomPossible causeSolution
Canal starts but no data flows to KafkaMySQL binary logging is not in ROW formatSet binlog_format = ROW in my.cnf and restart MySQL. Canal cannot parse STATEMENT or MIXED format logs
Authentication failed in Canal logsIncorrect SASL credentials or missing JAAS configurationVerify the username and password in kafka_client_producer_jaas.conf. Make sure the JAAS path is set correctly in startup.sh
Connection refused when connecting to KafkaWrong endpoint or portUse port 9093 for Internet (SSL) access and port 9092 for VPC access. Verify the endpoint on the Instance Details page
No topic errorTopic does not exist in ApsaraMQ for KafkaCreate the topic in the ApsaraMQ for Kafka console before starting Canal
Canal log shows start successful but meta.log has no entriesNo changes occurred in the MySQL database after Canal startedCanal only captures incremental changes. Run an INSERT, UPDATE, or DELETE statement in MySQL to generate binary log events

Related topics