全部产品
Search
文档中心

云消息队列 RocketMQ 版:使用Canal和RocketMQ实现数据库变更订阅处理

更新时间:Mar 31, 2026

本文以MySQL数据库为例介绍如何使用Canal接入云消息队列 RocketMQ 版,实现MySQL数据库Binlog数据的变更处理。

背景信息

CDC(Change Data Capture)是一种监测并捕获数据库变更的典型技术方案,常应用于异构数据源之间的数据同步。Canal作为一款轻量级的CDC工具,可基于数据库增量日志解析,提供增量变更数据的订阅和消费能力。Canal可以将变更记录可靠地投递到云消息队列 RocketMQ 版中,借助云消息队列 RocketMQ 版丰富的消息处理策略实现多样化的业务逻辑。

Canal是一个开源项目,仓库地址请参见Canal

应用场景

基于Binlog日志实现增量订阅和消费的典型业务场景如下:

  • 数据库镜像

  • 数据库实时备份

  • 索引构建和实时维护(拆分异构索引、倒排索引等)

  • 业务Cache刷新

  • 带业务逻辑的增量数据处理

方案介绍

基于Canal和云消息队列 RocketMQ 版的CDC方案如下:

CDC方案

如上图所示,Canal将自己伪装成库,监听并接收数据库的Binlog,并同步到云消息队列 RocketMQ 版等存储或其他中间件系统。

具体操作步骤如下:

  1. 配置MySQL:开启MySQL的Binlog功能,创建测试需要的数据库和表。

  2. 部署Canal:部署一个canal-deployer(server),监听并接收MySQL数据库的Binlog。

  3. 测试验证:验证数据变动后消息发送的情况。

环境要求

  • 资源要求

  • 网络要求

    • 部署canal-deployer(server)的节点可以连接数据库和云消息队列 RocketMQ 版实例,一般位于VPC内的ECS和容器都可以连接。

  • 版本要求

    服务

    版本

    说明

    Canal

    1.1.6

    其他版本请参见Canal Release

    MySQL

    8.0

    支持源端MySQL版本包括5.1.x、5.5.x、5.6.x、5.7.x、8.0.x。

    云消息队列 RocketMQ 版

    5.x

    支持4.x和5.x版本,推荐使用5.x版本实例。

    重要

    暂不支持5.x版本的Serverless实例。

1.配置MySQL

1.1 开启Binlog功能

阿里云RDS MySQL

阿里云RDS MySQL默认已开启Binlog功能,并且账号默认具有Binlog dump权限,可以直接跳过这一步。

自建MySQL

  • 开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

    [mysqld]
    log-bin=mysql-bin  # 开启binlog
    binlog-format=ROW  # 选择ROW模式
    server_id=1  # 配置MySQL replaction需要定义,不要和canal的slaveId重复
  • 创建用户canal并授权MySQL slave 的权限。

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;

1.2 创建数据库

执行下面的SQL,创建一个名为canal的数据库。

CREATE DATABASE canal DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;

1.3 创建表

执行下面的SQL,在canal数据库创建一个名为students的表。

CREATE TABLE students (
    id INT AUTO_INCREMENT,
    name VARCHAR(100) NOT NULL,
    age INT,
    gender VARCHAR(10),
    PRIMARY KEY (id)
);

2.部署Canal

2.1 安装JDK

执行下面的命令,安装JDK。

sudo yum install java-1.8.0-openjdk

2.2 下载canal-deployer

执行下面的命令,下载canal-deployer安装包。

sudo wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

2.3 解压缩安装包

执行下面的命令,创建目录canal-server,并将下载的安装包解压缩到canal-server目录中。

# 创建目录canal-server
sudo mkdir -p /usr/local/canal-server

# 将下载的安装包解压缩到canal-server目录中
sudo tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-server

2.4 修改配置

  1. 执行下面的命令,配置canal.properties。

sudo vi /usr/local/canal-server/conf/canal.properties
# 服务端模式
canal.serverMode = rocketMQ
# 云消息队列 RocketMQ 版实例的用户名。在控制台访问控制页面智能身份识别中获取。
canal.aliyun.accessKey = 6W0xz2uPf******
# 云消息队列 RocketMQ 版实例的密码。在控制台访问控制页面智能身份识别中获取。
canal.aliyun.secretKey = sK56k1DrGx******
# 消息队列接入的方式
canal.mq.accessChannel = cloud
# 消息发送格式。云消息队列 RocketMQ 版不支持批量发送,canal.mq.flatMessage需要设置成false;消费端获取到的消息body后需反序列化body内容;Java语言可使用com.alibaba.otter.canal.client.CanalMessageDeserializer#deserializer(byte[]) 方法转化,其他语言需参照该方法自行实现。
canal.mq.flatMessage = false
# 云消息队列 RocketMQ 版实例中Group名
rocketmq.producer.group = canal_test
# 是否开启消息轨迹
rocketmq.enable.message.trace = false
# message trace的topic
rocketmq.customized.trace.topic = 
# 云消息队列 RocketMQ 版实例的命名空间。云消息队列 RocketMQ 版5.x实例无需填写该参数。
rocketmq.namespace =
# 云消息队列 RocketMQ 版实例的接入点。在云消息队列 RocketMQ 版控制台实例详情页面获取。
rocketmq.namesrv.addr = rmq-cn-xxx.{$RegionId}.rmq.aliyuncs.com:8080
# 重试次数
rocketmq.retry.times.when.send.failed = 0
# 是否启用VIP Netty通道发送消息
rocketmq.vip.channel.enabled = false
# 消息的tag配置
rocketmq.tag = 
  1. 执行下面的命令,配置instance.properties。

sudo vi /usr/local/canal-server/conf/example/instance.properties
# 阿里云RDS MySQL数据库的连接地址
canal.instance.master.address=rm-uf62****.rwlb.rds.aliyuncs.com:3306
# 阿里云RDS MySQL数据库的账号
canal.instance.dbUsername=xxx
# 阿里云RDS MySQL数据库的密码
canal.instance.dbPassword=xxx
# mysql 数据解析关注的表,Perl正则表达式,canal\\..*表示canal schema下所有表 
canal.instance.filter.regex=canal\\..*
# 云消息队列 RocketMQ 版实例的topic名称
canal.mq.topic=canal_topic

2.5 启动canal-deployer

执行下面的命令启动canal-deployer。

/usr/local/canal-server/bin/startup.sh

2.6 验证启动

  1. 执行下面的命令查看canal.log日志文件,确认Canal成功启动。

sudo vi /usr/local/canal-server/logs/canal/canal.log  
2024-07-15 17:24:12.154 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2024-07-15 17:24:12.202 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2024-07-15 17:24:12.497 [main] INFO  c.a.o.c.c.rocketmq.producer.CanalRocketMQProducer - ##Start RocketMQ producer##
2024-07-15 17:24:12.799 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2024-07-15 17:24:12.984 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.XX.XX:11111]
2024-07-15 17:24:16.208 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
  1. 执行下面的命令查看example.log日志文件,确认Canal Instance成功启动。

sudo vi /usr/local/canal-server/logs/example/example.log 
2024-07-15 18:22:15.667 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2024-07-15 18:22:15.699 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal\..*$
2024-07-15 18:22:15.699 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2024-07-15 18:22:16.030 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

3.测试验证

3.1 向MYSQL数据库中添加数据

执行下面的SQL,向步骤1.3 创建表所创建的表students添加一条数据。

INSERT INTO`students` (`name`, `age`, `gender`)VALUES('Tome', 18, 'male');

3.2 查看Canal发送的消息

登录云消息队列 RocketMQ 版控制台,找到部署时配置的实例,在消息查询页面查看消息如下:

image

常见问题

操作过程中有哪些注意事项?

  1. Java版本需要为1.8。

  2. canal-deployer安装包建议下载最新版。

  3. 如果从服务器下载canal-deployer安装包慢,可以下载到本地后上传到服务器。

  4. canal.properties文件:

    设置canal.mq.flatMessage = false后,默认在控制台看到的不是明文,接收到消息后可以通过反序列化获取到明文,对于Java,可参考如下代码:

                    <dependency>
    			<groupId>com.alibaba.otter</groupId>
    			<artifactId>canal.client</artifactId>
    			<version>1.1.8</version>
    		</dependency>
    		<dependency>
    			<groupId>com.alibaba.otter</groupId>
    			<artifactId>canal.protocol</artifactId>
    			<version>1.1.8</version>
    		</dependency>
    说明

    以下为grpc版本的反序列化,remoting版本的会有些差异。

    PushConsumer pushConsumer0 = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    //设置消费者分组。
    .setConsumerGroup(consumerGroup)
    //设置预绑定的订阅关系;可以订阅多个topic;保证订阅关系一致性
    .setSubscriptionExpressions(tags)
    //设置消费监听器。
    .setMessageListener(messageView -> {
        // todo 处理消息
        //处理消息并返回消费结果。
        // logger.info("normal Consume message={}", messageView);
        byte[] copy = new byte[messageView.getBody().remaining()];
        // 注意:如果要监听指定多个表时,可设置canal.instance.filter.regex=test_db\\.user_table,test_db\\.order_table;\\. 是分隔符,可以用 . 和 * 来写正则
        // 填充copy
        messageView.getBody().get(copy);
        com.alibaba.otter.canal.protocol.Message normalMessage = com.alibaba.otter.canal.client.CanalMessageDeserializer.deserializer(copy);
        normalMessage.getEntries().forEach(entry -> {
            if (entry.getEntryType() == com.alibaba.otter.canal.protocol.CanalEntry.EntryType.ROWDATA) {
                try {
                    com.alibaba.otter.canal.protocol.CanalEntry.RowChange rowChange = com.alibaba.otter.canal.protocol.CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    // 事件类型
                    CanalEntry.EventType eventType = com.alibaba.otter.canal.protocol.CanalEntry.RowChange.parseFrom(entry.getStoreValue()).getEventType();
                    // 事件详情
                    List<CanalEntry. RowData> rowDataList = CanalEntry.RowChange.parseFrom(entry.getStoreValue()).getRowDatasList();
                    for(CanalEntry. RowData rowData : rowDataList){
                        List<CanalEntry. Column> columns = rowData.getAfterColumnsList();
                    }
                } catch (InvalidProtocolBufferException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        logger.info("normal Consume normalMessage={}", normalMessage);
        return ConsumeResult.SUCCESS;
    })
    .build();
    //如果不需要再使用PushConsumer,可关闭该进程。
    //pushConsumer.close();
    
  5. instance.properties文件:

    1. dbUsername使用高权限账户;

    2. canal.instance.filter.regex为需要关注的表,如果是关注指定数据库下所有表,可设置为:{databaseName}\\..*;如果是关注指定数据库下指定表,可设置为:{databaseName}\\.{table1Name},{databaseName}\\.{table2Name};其中\\. 是分隔符,可以用 . 和 * 来写正则表达式。

  6. 启动canal-deployer如果报权限问题,可以加上sudo来启动。

  7. 部署canal的服务器需要能够telnet连通数据库及mq的地址。

  8. RocketMQ版本为5.0-rmq-20250721-1(当前只有该版本可用)时,可以设置canal.mq.flatMessage=true,则在mq控制台查询消息可以看到明文JSON。