When your application needs real-time visibility into MySQL data changes -- for analytics dashboards, search indexing, or downstream event processing -- you need a Change Data Capture (CDC) pipeline. Canal captures incremental changes from the MySQL binary log and streams them to ApsaraMQ for Kafka, so downstream consumers receive row-level INSERT, UPDATE, and DELETE events.
How Canal works
Canal emulates a MySQL secondary database. It sends a dump request to the primary MySQL database, receives binary log events, parses them into structured change records, and forwards the records to ApsaraMQ for Kafka.

For more information, see Canal on GitHub.
What you will do
This tutorial walks through five steps:
Configure MySQL binary logging
Download and install Canal
Configure the MySQL connection
Configure the Kafka connection
Start Canal and verify data flow
Prerequisites
Before you begin, make sure that you have:
A MySQL database with binary logging enabled in
ROWformat (see Configure MySQL binary logging)An ApsaraMQ for Kafka instance with at least one topic created -- see Step 3: Create resources
Network connectivity between the Canal host, the MySQL database, and the ApsaraMQ for Kafka instance
Configure MySQL binary logging
Canal relies on row-level binary logging to capture changes. Add the following settings to your MySQL configuration file (typically my.cnf or my.ini), then restart MySQL:
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL| Parameter | Description |
|---|---|
server-id | A unique integer across all MySQL servers and replication clients in the cluster |
log_bin | Base name for binary log files |
binlog_format | Must be ROW. Canal cannot parse STATEMENT or MIXED format logs |
binlog_row_image | Must be FULL to capture complete before-and-after row images |
Create a MySQL user for Canal with replication privileges:
CREATE USER 'canal'@'%' IDENTIFIED BY '<your-password>';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;For more information, see the Canal QuickStart.
Download and install Canal
This guide uses Canal V1.1.5. Download the package from GitHub.
# Create the installation directory
mkdir -p /home/doc/tools/canal.deployer-1.1.5
# Extract the package
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /home/doc/tools/canal.deployer-1.1.5Configure the MySQL connection
Edit conf/example/instance.properties to point Canal at your MySQL database:
vi conf/example/instance.propertiesSet the following parameters:
# MySQL connection
canal.instance.master.address = 192.168.XX.XX:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = <your-password>
# Kafka topic mapping
canal.mq.topic = mysql_test
# Partition routing (choose one approach)
# Option A: Send all changes to a single partition
canal.mq.partition = 0
# Option B: Distribute changes across partitions by primary key
# canal.mq.partitionsNum = 3
# canal.mq.partitionHash = mytest.person:id,mytest.role:idinstance.properties parameter reference
| Parameter | Required | Description |
|---|---|---|
canal.instance.master.address | Yes | MySQL database address in host:port format |
canal.instance.dbUsername | Yes | MySQL username with replication privileges |
canal.instance.dbPassword | Yes | Password for the MySQL user |
canal.mq.topic | Yes | Target topic in ApsaraMQ for Kafka. Create topics on the Topics page in the ApsaraMQ for Kafka console. For more information, see Step 3: Create resources |
canal.mq.dynamicTopic | No | Regex pattern that routes changes from different tables to different Kafka topics. For syntax details, see Canal MQ parameter reference |
canal.mq.partition | No | Fixed partition index for all messages. Cannot be used together with canal.mq.partitionsNum or canal.mq.partitionHash |
canal.mq.partitionsNum | No | Total number of partitions for hash-based routing. Use with canal.mq.partitionHash |
canal.mq.partitionHash | No | Hash routing rules in database.table:column format. Separate multiple rules with commas. For syntax details, see Canal MQ parameter reference |
Configure the Kafka connection
Edit conf/canal.properties to connect Canal to your ApsaraMQ for Kafka instance:
vi conf/canal.propertiesThe configuration differs depending on your network access method. Choose one of the following:
Internet access (SASL_SSL) -- requires authentication and encryption
VPC access (PLAINTEXT) -- no authentication required
For endpoint details, see Comparison among endpoints.
Internet access (SASL_SSL)
When connecting over the Internet, Canal authenticates with ApsaraMQ for Kafka through SASL_SSL. You need to modify three files.
File 1: conf/canal.properties
# Set Canal to use Kafka as the output
canal.serverMode = kafka
# SSL endpoint of your ApsaraMQ for Kafka instance
# Get this from the Endpoint Information section on the Instance Details page
kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
# Kafka producer settings (adjust as needed)
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
# SASL_SSL authentication
kafka.ssl.truststore.location = ../conf/kafka_client_truststore_jks
kafka.ssl.truststore.password = KafkaOnsClient
kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN
kafka.ssl.endpoint.identification.algorithm =File 2: bin/startup.sh
Add the JAAS configuration path to the JAVA_OPTS variable:
JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8 -Djava.security.auth.login.config=/home/doc/tools/canal.deployer-1.1.5/conf/kafka_client_jaas.conf"File 3: conf/kafka_client_producer_jaas.conf
Specify the SASL credentials for your ApsaraMQ for Kafka instance:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<your-sasl-username>"
password="<your-sasl-password>";
};Replace the placeholders with your actual values:
| Placeholder | Description | Where to find it |
|---|---|---|
<your-sasl-username> | SASL username | Instance Details page in the ApsaraMQ for Kafka console |
<your-sasl-password> | SASL password | Instance Details page in the ApsaraMQ for Kafka console |
If the Access Control List (ACL) feature is disabled for your instance, you can obtain the default SASL username and password on the Instance Details page in the ApsaraMQ for Kafka console.
If the ACL feature is enabled for your instance, the SASL user must be of the PLAIN type and have permissions to send and consume messages. For more information, see Grant permissions to SASL users.
Download the SSL truststore file kafka.client.truststore.jks and save it to the conf/ directory.
VPC access (PLAINTEXT)
When connecting from within a VPC, no authentication or encryption is required. Set canal.serverMode and kafka.bootstrap.servers in conf/canal.properties:
# Set Canal to use Kafka as the output
canal.serverMode = kafka
# Default (VPC) endpoint of your ApsaraMQ for Kafka instance
# Get this from the Endpoint Information section on the Instance Details page
kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
# Kafka producer settings (adjust as needed)
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0canal.properties parameter reference
| Parameter | Required | Description |
|---|---|---|
canal.serverMode | Yes | Output destination type. Set to kafka |
kafka.bootstrap.servers | Yes | ApsaraMQ for Kafka endpoint. Get this from the Endpoint Information section on the Instance Details page in the ApsaraMQ for Kafka console. Use port 9093 for Internet (SSL) or port 9092 for VPC |
kafka.acks | Yes | Acknowledgment level from the broker. 0: no acknowledgment. 1: leader acknowledgment only. all: all in-sync replicas must acknowledge |
kafka.compression.type | Yes | Message compression algorithm. Valid values: none, gzip, snappy |
kafka.batch.size | Yes | Maximum batch size in bytes. Larger values improve throughput; smaller values reduce latency. Default: 16384 |
kafka.linger.ms | Yes | Maximum time in milliseconds to wait for additional messages before sending a batch. Default: 1 |
kafka.max.request.size | Yes | Maximum size of a single produce request in bytes. Default: 1048576 |
kafka.buffer.memory | Yes | Total memory in bytes available for buffering unsent messages. Default: 33554432 |
kafka.max.in.flight.requests.per.connection | Yes | Maximum number of unacknowledged requests per connection. Set to 1 to guarantee ordering. Default: 1 |
kafka.retries | Yes | Number of retry attempts for failed sends. 0 disables retries. Default: 0 |
kafka.ssl.truststore.location | Internet only | Path to the SSL truststore file (kafka.client.truststore.jks) |
kafka.ssl.truststore.password | Internet only | Truststore password. Set to KafkaOnsClient |
kafka.security.protocol | Internet only | Security protocol. Set to SASL_SSL for Internet access |
kafka.sasl.mechanism | Internet only | SASL authentication mechanism. Set to PLAIN |
Start Canal and verify data flow
Start Canal from the installation directory:
sh bin/startup.shVerify that Canal started
Check the Canal server log:
tail -f logs/canal/canal.logExpected output:
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.XX.XX:11111]
2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......Check the Canal instance log:
tail -f logs/example/example.logExpected output:
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....Test with a sample table
In a MySQL database named
mysql, create a test table and insert sample data:mysql> SELECT * FROM T_Student; +--------+---------+------+------+ | stuNum | stuName | age | sex | +--------+---------+------+------+ | 1 | Wang | 18 | girl | | 2 | Zhang | 17 | boy | +--------+---------+------+------+ 2 rows in set (0.00 sec)Confirm that Canal captured the changes by checking the meta log. Each INSERT, UPDATE, or DELETE generates a new entry: Expected output: Each line shows the binary log file (
log.000001), the byte offset (for example,29723), and the source MySQL address -- confirming that Canal is reading changes.tail -f logs/example/meta.log2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/192.168.XX.XX:3306] 2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/192.168.XX.XX:3306]Verify that messages arrived in ApsaraMQ for Kafka. Open the ApsaraMQ for Kafka console and query messages for your topic. For more information, see Query messages.

After you finish testing, stop Canal:
sh bin/stop.sh
Troubleshooting
| Symptom | Possible cause | Solution |
|---|---|---|
| Canal starts but no data flows to Kafka | MySQL binary logging is not in ROW format | Set binlog_format = ROW in my.cnf and restart MySQL. Canal cannot parse STATEMENT or MIXED format logs |
Authentication failed in Canal logs | Incorrect SASL credentials or missing JAAS configuration | Verify the username and password in kafka_client_producer_jaas.conf. Make sure the JAAS path is set correctly in startup.sh |
Connection refused when connecting to Kafka | Wrong endpoint or port | Use port 9093 for Internet (SSL) access and port 9092 for VPC access. Verify the endpoint on the Instance Details page |
No topic error | Topic does not exist in ApsaraMQ for Kafka | Create the topic in the ApsaraMQ for Kafka console before starting Canal |
Canal log shows start successful but meta.log has no entries | No changes occurred in the MySQL database after Canal started | Canal only captures incremental changes. Run an INSERT, UPDATE, or DELETE statement in MySQL to generate binary log events |