順序訊息(FIFO訊息)是雲訊息佇列 RocketMQ 版提供的一種嚴格按照順序來發布和消費的訊息類型。本文提供使用TCP協議下的社區版Java SDK收發順序訊息的範例程式碼供您參考。
背景資訊
順序訊息分為兩類:
全域順序:對於指定的一個Topic,所有訊息按照嚴格的先入先出FIFO(First In First Out)的順序進行發布和消費。
分區順序:對於指定的一個Topic,所有訊息根據Sharding Key進行區塊分區。同一個分區內的訊息按照嚴格的FIFO順序進行發布和消費。Sharding Key是順序訊息中用來區分不同分區的關鍵字段,和普通訊息的Key是完全不同的概念。
更多資訊,請參見順序訊息。
前提條件
您已完成以下操作:
下載4.5.2或以上版本的社區版Java SDK。
準備工作。
擷取阿里雲存取金鑰AccessKey ID和AccessKey Secret。更多資訊,請參見建立AccessKey。
發送順序訊息
重要
雲訊息佇列 RocketMQ 版服務端判定訊息產生的順序性是參照單一生產者、單一線程並發下訊息發送的時序。如果發送方有多個生產者或者有多個線程並發發送訊息,則此時只能以到達雲訊息佇列 RocketMQ 版服務端的時序作為訊息順序的依據,和業務側的發送順序未必一致。
發送順序訊息的範例程式碼如下。
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQOrderProducer {
private static RPCHook getAclRPCHook()
{
/**
* 替換為您阿里雲帳號的AccessKey ID和AccessKey Secret。
* 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
*/
return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
/**
* 建立Producer,並開啟訊息軌跡。
* 如果不想開啟訊息軌跡,可以按照以下方式建立:
* DefaultMQProducer producer = new DefaultMQProducer("YOUR ORDER GROUP ID", getAclRPCHook());
*/
DefaultMQProducer producer = new DefaultMQProducer("YOUR ORDER GROUP ID", getAclRPCHook(), true, null);
/**
* 設定使用接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項,如果不開啟訊息軌跡功能,則運行不設定此項。
*/
producer.setAccessChannel(AccessChannel.CLOUD);
/**
* 設定為您從阿里雲控制台擷取的存取點資訊,類似“http://MQ_INST_XXXX.aliyuncs.com:80”。
*/
producer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
producer.start();
for (int i = 0; i < 128; i++) {
try {
int orderId = i % 10;
Message msg = new Message("YOUR ORDER TOPIC",
"YOUR MESSAGE TAG",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
/**
* 注意!!!請務必設定該配置項,順序訊息才能均勻分布到各隊列中。
* 如果SDK版本為5.x以上,可以按照以下方式設定:
* msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
*/
msg.putUserProperty("__SHARDINGKEY", orderId + "");
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 選擇適合自己的分區選擇演算法,保證同一個參數得到的結果相同。
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}訂閱順序訊息
訂閱順序訊息的範例程式碼如下。
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQOrderConsumer {
private static RPCHook getAclRPCHook()
{
/**
* 替換為您阿里雲帳號的AccessKey ID和AccessKey Secret。
* 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
*/
return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
/**
* 建立Consumer,並開啟訊息軌跡。
* 如果不想開啟訊息軌跡,可以按照如下方式建立:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR ORDER GROUP ID", getAclRPCHook(), null);
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR ORDER GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
/**
* 設定使用接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項,如果不開啟訊息軌跡功能,則運行不設定此項。
*/
consumer.setAccessChannel(AccessChannel.CLOUD);
/**
* 設定為您從阿里雲控制台擷取的存取點資訊,類似“http://MQ_INST_XXXX.aliyuncs.com:80”。
*/
consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
consumer.subscribe("YOUR ORDER TOPIC", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;// 消費失敗則掛起重試返回:ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}