本文介紹Spring Boot環境下如何使用 rocketmq-spring-boot-starter 以及 rocketmq-v5-client-spring-boot-starter 快速接入雲訊息佇列 RocketMQ 版5.0執行個體。
背景資訊
Spring Boot的starter包封裝了建立producer和consumer的邏輯,實際上依賴的也是Remoting協議的rocketmq-client或者gRPC協議的rocketmq-client-java。
rocketmq-spring-boot-starter
引入依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>{請替換成實際版本號碼}</version>
</dependency>說明
實際依賴的是
rocketmq-client。版本號碼請參見Maven倉庫。
專案設定檔
#軌跡功能
rocketmq.access-channel=CLOUD
rocketmq.name-server={控制台擷取的存取點}
rocketmq.consumer.access-key={控制台擷取的使用者名稱}
rocketmq.consumer.secret-key={控制台擷取的密碼}
rocketmq.producer.access-key={控制台擷取的使用者名稱}
rocketmq.producer.secret-key={控制台擷取的密碼}
#rocketmq.producer.namespaceV2={執行個體id}
rocketmq.producer.group=test說明
注意將上述變數替換為實際值,不要保留{}:
存取點不要添加協議頭
http://。如果是Serverless執行個體公網串連,需要填寫namespaceV2配置。
發送程式碼範例
@Autowired
private RocketMQTemplate rocketMQTemplate;Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();
// 發送普通訊息,注意topic要先建立好
SendResult sendResult = rocketMQTemplate.syncSend("TEST_TOPIC:mytag", msg); // TEST_TOPIC為topic名稱,mytag為tag名稱
// 發送延遲訊息,注意topic建立時選擇類型為“定時/延時訊息”
SendResult delaySendResult = rocketMQTemplate.syncSendDelayTimeMills("delay:mytag", msg, 6000);
消費程式碼範例
@Component
@RocketMQMessageListener(topic = "TEST_TOPIC",selectorExpression = "*", consumerGroup = "GID_test",enableMsgTrace = true,messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY,accessChannel = "CLOUD")
public class MyMQListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.println("msg id is " + message.getMsgId() + " , msg body is " + new String(message.getBody()));
}
}說明
如果是Serverless公網串連,注意RocketMQMessageListener屬性中還需要填寫namespaceV2={執行個體id}。
rocketmq-v5-client-spring-boot-starter
引入依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-v5-client-spring-boot-starter</artifactId>
<version>{請替換成實際版本號碼}</version>
</dependency>說明
實際依賴的是rocketmq-client-java。
版本號碼請參見Maven倉庫。
專案設定檔
rocketmq.access-channel=CLOUD
rocketmq.push-consumer.endpoints={控制台擷取的存取點}
rocketmq.push-consumer.access-key={控制台擷取的使用者名稱}
rocketmq.push-consumer.secret-key={控制台擷取的密碼}
rocketmq.producer.access-key={控制台擷取的使用者名稱}
rocketmq.producer.secret-key={控制台擷取的密碼}
rocketmq.producer.namespace={執行個體id}
rocketmq.producer.endpoints={控制台擷取的存取點}發送程式碼範例
@Autowired
private RocketMQClientTemplate rocketMQClientTemplate;User user = new User();
user.setName(body);
user.setAge(18);
SendReceipt springbootv5 = rocketMQClientTemplate.syncSendNormalMessage("TEST_TOPIC:mytag", user); // TEST_TOPIC為topic名稱,mytag為tag名稱消費程式碼範例
@Component
@RocketMQMessageListener(topic = "data", consumerGroup = "GID_test", namespace = "{執行個體id}", tag = "*")
public class MyConsumer implements RocketMQListener {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView.getMessageId() + ": body is " + StandardCharsets.UTF_8.decode(messageView.getBody()));
return ConsumeResult.SUCCESS;
}
}說明
如果是Serverless執行個體,屬性中namespace需要配置執行個體id。
由於訊息body是
ByteBuffer,要注意轉碼,這裡使用StandardCharsets.UTF_8.decode(messageView.getBody())。如果還需要開啟軌跡,可以添加屬性
enableMsgTrace = true。
相關資訊擷取
擷取存取點資訊
進入RocketMQ執行個體詳情頁面,在基本資料頁簽的TCP 協議存取點地區擷取存取點和網路資訊,注意區分VPC專用網路和公網。

擷取訪問使用者名稱和密碼
進入RocketMQ執行個體詳情頁面,單擊左側導覽列中的存取控制,選擇智能身份識別,記錄使用者名稱和密碼。
