本文以MySQL数据库为例介绍如何使用Canal接入云消息队列 RocketMQ 版,实现MySQL数据库Binlog数据的变更处理。
背景信息
CDC(Change Data Capture)是一种监测并捕获数据库变更的典型技术方案,常应用于异构数据源之间的数据同步。Canal作为一款轻量级的CDC工具,可基于数据库增量日志解析,提供增量变更数据的订阅和消费能力。Canal可以将变更记录可靠地投递到云消息队列 RocketMQ 版中,借助云消息队列 RocketMQ 版丰富的消息处理策略实现多样化的业务逻辑。
Canal是一个开源项目,仓库地址请参见Canal。
应用场景
基于Binlog日志实现增量订阅和消费的典型业务场景如下:
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务Cache刷新
带业务逻辑的增量数据处理
方案介绍
基于Canal和云消息队列 RocketMQ 版的CDC方案如下:

如上图所示,Canal将自己伪装成库,监听并接收数据库的Binlog,并同步到云消息队列 RocketMQ 版等存储或其他中间件系统。
具体操作步骤如下:
配置MySQL:开启MySQL的Binlog功能,创建测试需要的数据库和表。
部署Canal:部署一个canal-deployer(server),监听并接收MySQL数据库的Binlog。
测试验证:验证数据变动后消息发送的情况。
环境要求
资源要求
处于运行中状态的云消息队列 RocketMQ 版实例。实例创建,请参见创建云消息队列 RocketMQ 版实例。
处于运行中的MySQL实例。本文以阿里云RDS MySQL为例,实例创建,请参见创建RDS MySQL实例。
用于部署运行Canal相关组件的机器。本文以ECS为例,实例创建和使用,请参见通过控制台使用ECS实例(快捷版)。
网络要求
部署canal-deployer(server)的节点可以连接数据库和云消息队列 RocketMQ 版实例,一般位于VPC内的ECS和容器都可以连接。
版本要求
服务
版本
说明
Canal
1.1.6
其他版本请参见Canal Release。
MySQL
8.0
支持源端MySQL版本包括5.1.x、5.5.x、5.6.x、5.7.x、8.0.x。
云消息队列 RocketMQ 版
5.x
支持4.x和5.x版本,推荐使用5.x版本实例。
重要暂不支持5.x版本的Serverless实例。
1.配置MySQL
1.1 开启Binlog功能
阿里云RDS MySQL
阿里云RDS MySQL默认已开启Binlog功能,并且账号默认具有Binlog dump权限,可以直接跳过这一步。
自建MySQL
开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
[mysqld] log-bin=mysql-bin # 开启binlog binlog-format=ROW # 选择ROW模式 server_id=1 # 配置MySQL replaction需要定义,不要和canal的slaveId重复创建用户canal并授权MySQL slave 的权限。
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
1.2 创建数据库
执行下面的SQL,创建一个名为canal的数据库。
CREATE DATABASE canal DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;1.3 创建表
执行下面的SQL,在canal数据库创建一个名为students的表。
CREATE TABLE students (
id INT AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
age INT,
gender VARCHAR(10),
PRIMARY KEY (id)
);2.部署Canal
2.1 安装JDK
执行下面的命令,安装JDK。
sudo yum install java-1.8.0-openjdk2.2 下载canal-deployer
执行下面的命令,下载canal-deployer安装包。
sudo wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz2.3 解压缩安装包
执行下面的命令,创建目录canal-server,并将下载的安装包解压缩到canal-server目录中。
# 创建目录canal-server
sudo mkdir -p /usr/local/canal-server
# 将下载的安装包解压缩到canal-server目录中
sudo tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-server2.4 修改配置
执行下面的命令,配置canal.properties。
sudo vi /usr/local/canal-server/conf/canal.properties# 服务端模式
canal.serverMode = rocketMQ
# 云消息队列 RocketMQ 版实例的用户名。在控制台访问控制页面智能身份识别中获取。
canal.aliyun.accessKey = 6W0xz2uPf******
# 云消息队列 RocketMQ 版实例的密码。在控制台访问控制页面智能身份识别中获取。
canal.aliyun.secretKey = sK56k1DrGx******
# 消息队列接入的方式
canal.mq.accessChannel = cloud
# 消息发送格式。云消息队列 RocketMQ 版不支持批量发送,canal.mq.flatMessage需要设置成false;消费端获取到的消息body后需反序列化body内容;Java语言可使用com.alibaba.otter.canal.client.CanalMessageDeserializer#deserializer(byte[]) 方法转化,其他语言需参照该方法自行实现。
canal.mq.flatMessage = false
# 云消息队列 RocketMQ 版实例中Group名
rocketmq.producer.group = canal_test
# 是否开启消息轨迹
rocketmq.enable.message.trace = false
# message trace的topic
rocketmq.customized.trace.topic =
# 云消息队列 RocketMQ 版实例的命名空间。云消息队列 RocketMQ 版5.x实例无需填写该参数。
rocketmq.namespace =
# 云消息队列 RocketMQ 版实例的接入点。在云消息队列 RocketMQ 版控制台实例详情页面获取。
rocketmq.namesrv.addr = rmq-cn-xxx.{$RegionId}.rmq.aliyuncs.com:8080
# 重试次数
rocketmq.retry.times.when.send.failed = 0
# 是否启用VIP Netty通道发送消息
rocketmq.vip.channel.enabled = false
# 消息的tag配置
rocketmq.tag = 执行下面的命令,配置instance.properties。
sudo vi /usr/local/canal-server/conf/example/instance.properties# 阿里云RDS MySQL数据库的连接地址
canal.instance.master.address=rm-uf62****.rwlb.rds.aliyuncs.com:3306
# 阿里云RDS MySQL数据库的账号
canal.instance.dbUsername=xxx
# 阿里云RDS MySQL数据库的密码
canal.instance.dbPassword=xxx
# mysql 数据解析关注的表,Perl正则表达式,canal\\..*表示canal schema下所有表
canal.instance.filter.regex=canal\\..*
# 云消息队列 RocketMQ 版实例的topic名称
canal.mq.topic=canal_topic2.5 启动canal-deployer
执行下面的命令启动canal-deployer。
/usr/local/canal-server/bin/startup.sh2.6 验证启动
执行下面的命令查看canal.log日志文件,确认Canal成功启动。
sudo vi /usr/local/canal-server/logs/canal/canal.log 2024-07-15 17:24:12.154 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2024-07-15 17:24:12.202 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2024-07-15 17:24:12.497 [main] INFO c.a.o.c.c.rocketmq.producer.CanalRocketMQProducer - ##Start RocketMQ producer##
2024-07-15 17:24:12.799 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2024-07-15 17:24:12.984 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.XX.XX:11111]
2024-07-15 17:24:16.208 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......执行下面的命令查看example.log日志文件,确认Canal Instance成功启动。
sudo vi /usr/local/canal-server/logs/example/example.log 2024-07-15 18:22:15.667 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2024-07-15 18:22:15.699 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal\..*$
2024-07-15 18:22:15.699 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2024-07-15 18:22:16.030 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....3.测试验证
3.1 向MYSQL数据库中添加数据
执行下面的SQL,向步骤1.3 创建表所创建的表students添加一条数据。
INSERT INTO`students` (`name`, `age`, `gender`)VALUES('Tome', 18, 'male');3.2 查看Canal发送的消息
登录云消息队列 RocketMQ 版控制台,找到部署时配置的实例,在消息查询页面查看消息如下:

常见问题
操作过程中有哪些注意事项?
Java版本需要为1.8。
canal-deployer安装包建议下载最新版。
如果从服务器下载canal-deployer安装包慢,可以下载到本地后上传到服务器。
canal.properties文件:
设置
canal.mq.flatMessage = false后,默认在控制台看到的不是明文,接收到消息后可以通过反序列化获取到明文,对于Java,可参考如下代码:<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.8</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.8</version> </dependency>说明以下为grpc版本的反序列化,remoting版本的会有些差异。
PushConsumer pushConsumer0 = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) //设置消费者分组。 .setConsumerGroup(consumerGroup) //设置预绑定的订阅关系;可以订阅多个topic;保证订阅关系一致性 .setSubscriptionExpressions(tags) //设置消费监听器。 .setMessageListener(messageView -> { // todo 处理消息 //处理消息并返回消费结果。 // logger.info("normal Consume message={}", messageView); byte[] copy = new byte[messageView.getBody().remaining()]; // 注意:如果要监听指定多个表时,可设置canal.instance.filter.regex=test_db\\.user_table,test_db\\.order_table;\\. 是分隔符,可以用 . 和 * 来写正则 // 填充copy messageView.getBody().get(copy); com.alibaba.otter.canal.protocol.Message normalMessage = com.alibaba.otter.canal.client.CanalMessageDeserializer.deserializer(copy); normalMessage.getEntries().forEach(entry -> { if (entry.getEntryType() == com.alibaba.otter.canal.protocol.CanalEntry.EntryType.ROWDATA) { try { com.alibaba.otter.canal.protocol.CanalEntry.RowChange rowChange = com.alibaba.otter.canal.protocol.CanalEntry.RowChange.parseFrom(entry.getStoreValue()); // 事件类型 CanalEntry.EventType eventType = com.alibaba.otter.canal.protocol.CanalEntry.RowChange.parseFrom(entry.getStoreValue()).getEventType(); // 事件详情 List<CanalEntry. RowData> rowDataList = CanalEntry.RowChange.parseFrom(entry.getStoreValue()).getRowDatasList(); for(CanalEntry. RowData rowData : rowDataList){ List<CanalEntry. Column> columns = rowData.getAfterColumnsList(); } } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } } }); logger.info("normal Consume normalMessage={}", normalMessage); return ConsumeResult.SUCCESS; }) .build(); //如果不需要再使用PushConsumer,可关闭该进程。 //pushConsumer.close();instance.properties文件:
dbUsername使用高权限账户;
canal.instance.filter.regex为需要关注的表,如果是关注指定数据库下所有表,可设置为:
{databaseName}\\..*;如果是关注指定数据库下指定表,可设置为:{databaseName}\\.{table1Name},{databaseName}\\.{table2Name};其中\\. 是分隔符,可以用 . 和 * 来写正则表达式。
启动canal-deployer如果报权限问题,可以加上sudo来启动。
部署canal的服务器需要能够telnet连通数据库及mq的地址。
RocketMQ版本为5.0-rmq-20250721-1(当前只有该版本可用)时,可以设置
canal.mq.flatMessage=true,则在mq控制台查询消息可以看到明文JSON。