本文介绍Spring Boot环境下如何使用 rocketmq-spring-boot-starter 以及 rocketmq-v5-client-spring-boot-starter 快速接入云消息队列 RocketMQ 版5.0实例。
背景信息
Spring Boot的starter包封装了创建producer和consumer的逻辑,实际上依赖的也是Remoting协议的rocketmq-client或者gRPC协议的rocketmq-client-java。
rocketmq-spring-boot-starter
引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>{请替换成实际版本号}</version>
</dependency>说明
实际依赖的是
rocketmq-client。版本号请参见Maven仓库。
项目配置文件
#轨迹功能
rocketmq.access-channel=CLOUD
rocketmq.name-server={控制台获取的接入点}
rocketmq.consumer.access-key={控制台获取的用户名}
rocketmq.consumer.secret-key={控制台获取的密码}
rocketmq.producer.access-key={控制台获取的用户名}
rocketmq.producer.secret-key={控制台获取的密码}
#rocketmq.producer.namespaceV2={实例id}
rocketmq.producer.group=test说明
注意将上述变量替换为实际值,不要保留{}:
接入点不要添加协议头
http://。如果是Serverless实例公网连接,需要填写namespaceV2配置。
发送代码示例
@Autowired
private RocketMQTemplate rocketMQTemplate;Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();
// 发送普通消息,注意topic要先创建好
SendResult sendResult = rocketMQTemplate.syncSend("TEST_TOPIC:mytag", msg); // TEST_TOPIC为topic名称,mytag为tag名称
// 发送延迟消息,注意topic创建时选择类型为“定时/延时消息”
SendResult delaySendResult = rocketMQTemplate.syncSendDelayTimeMills("delay:mytag", msg, 6000);
消费代码示例
@Component
@RocketMQMessageListener(topic = "TEST_TOPIC",selectorExpression = "*", consumerGroup = "GID_test",enableMsgTrace = true,messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY,accessChannel = "CLOUD")
public class MyMQListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.println("msg id is " + message.getMsgId() + " , msg body is " + new String(message.getBody()));
}
}说明
如果是Serverless公网连接,注意RocketMQMessageListener属性中还需要填写namespaceV2={实例id}。
rocketmq-v5-client-spring-boot-starter
引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
<version>{请替换成实际版本号}</version>
</dependency>说明
实际依赖的是rocketmq-client-java。
版本号请参见Maven仓库。
项目配置文件
rocketmq.access-channel=CLOUD
rocketmq.push-consumer.endpoints={控制台获取的接入点}
rocketmq.push-consumer.access-key={控制台获取的用户名}
rocketmq.push-consumer.secret-key={控制台获取的密码}
rocketmq.producer.access-key={控制台获取的用户名}
rocketmq.producer.secret-key={控制台获取的密码}
rocketmq.producer.namespace={实例id}
rocketmq.producer.endpoints={控制台获取的接入点}发送代码示例
@Autowired
private RocketMQClientTemplate rocketMQClientTemplate;User user = new User();
user.setName(body);
user.setAge(18);
SendReceipt springbootv5 = rocketMQClientTemplate.syncSendNormalMessage("TEST_TOPIC:mytag", user); // TEST_TOPIC为topic名称,mytag为tag名称消费代码示例
@Component
@RocketMQMessageListener(topic = "data", consumerGroup = "GID_test", namespace = "{实例id}", tag = "*")
public class MyConsumer implements RocketMQListener {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView.getMessageId() + ": body is " + StandardCharsets.UTF_8.decode(messageView.getBody()));
return ConsumeResult.SUCCESS;
}
}说明
如果是Serverless实例,属性中namespace需要配置实例id。
由于消息body是
ByteBuffer,要注意转码,这里使用StandardCharsets.UTF_8.decode(messageView.getBody())。如果还需要开启轨迹,可以添加属性
enableMsgTrace = true。
相关信息获取
获取接入点信息
进入RocketMQ实例详情页面,在基本信息页签的TCP 协议接入点区域获取接入点和网络信息,注意区分VPC专有网络和公网。

获取访问用户名和密码
进入RocketMQ实例详情页面,单击左侧导航栏中的访问控制,选择智能身份识别,记录用户名和密码。
