edit-icon download-icon

Send and receive ordered messages

Last Updated: Jun 19, 2018

In order to send and receive ordered messages, use Java SDK 1.2.7 or a later version.

Ordered message is a type of message provided by MQ, which is delivered and consumed in order, and is used in scenarios where all messages are delivered and consumed in strict FIFO order. For detailed information, see Ordered messages.

The sending and receiving methods for globally ordered messages and partitionally ordered messages are similar. See the following sample code for reference.

Note: For information on TCP access point domain names, see TCP access instructions.

Send Ordered Messages

The sample code for sending messages is as follows:

  1. package com.aliyun.openservices.ons.example.order;
  2. import com.aliyun.openservices.ons.api.Message;
  3. import com.aliyun.openservices.ons.api.ONSFactory;
  4. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  5. import com.aliyun.openservices.ons.api.SendResult;
  6. import com.aliyun.openservices.ons.api.order.OrderProducer;
  7. import java.util.Properties;
  8. public class ProducerClient {
  9. public static void main(String[] args) {
  10. Properties properties = new Properties();
  11. // The producer ID you created on the console
  12. properties.put(PropertyKeyConst.ProducerId, "XXX");
  13. // AccessKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  14. properties.put(PropertyKeyConst.AccessKey, "XXX");
  15. // SecretKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  16. properties.put(PropertyKeyConst.SecretKey, "XXX");
  17. // Set a TCP access domain name (the following uses public cloud production environment as an example)
  18. properties.put(PropertyKeyConst.ONSAddr,
  19. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
  20. OrderProducer producer = ONSFactory.createOrderProducer(properties);
  21. // Before sending messages, the start method must be called once to start the producer.
  22. producer.start();
  23. for (int i = 0; i < 1000; i++) {
  24. String orderId = "biz_" + i % 10;
  25. Message msg = new Message(//
  26. // The topic of the message
  27. "Order_global_Topic",
  28. //Message tag, which is similar to tag in Gmail, and is used to classify messages. Consumers can then set filtering conditions for messages to be filtered in MQ broker.
  29. "TagA",
  30. // Message body, which can be any data in binary format. Consistent serialization and deserialization methods should be consistent between the producer and the consumer.
  31. "send order global msg".getBytes()
  32. );
  33. // The setting represents the key service property of the message, so please set it as globally unique as possible.
  34. // You can query a message and resend it through the MQ console when you cannot receive the message properly.
  35. // Note: Message sending and receiving is not affected if you do not configure this setting.
  36. msg.setKey(orderId);
  37. // Sharding key is a key field used in ordered message to distinguish different shards, which is completely different with the key in normal messages.
  38. // In globally ordered messages, the field can be set to any string that is not null.
  39. String shardingKey = String.valueOf(orderId);
  40. try {
  41. SendResult sendResult = producer.send(msg, shardingKey);
  42. // Synchronous message sending will succeed as long as no exception is thrown.
  43. if (sendResult != null) {
  44. System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
  45. }
  46. }
  47. catch (Exception e) {
  48. // If the message sending fails and a retry is needed, the message can be re-sent or persisted for compensated processing.
  49. System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
  50. e.printStackTrace();
  51. }
  52. }
  53. // The producer object will be destroyed before exiting the application.
  54. // Note: It's ok if the producer object is not destroyed.
  55. producer.shutdown();
  56. }
  57. }

Receive Ordered Messages

The sample code for subscribing to ordered messages is as follows:

  1. package com.aliyun.openservices.ons.example.order;
  2. import com.aliyun.openservices.ons.api.Message;
  3. import com.aliyun.openservices.ons.api.ONSFactory;
  4. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  5. import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
  6. import com.aliyun.openservices.ons.api.order.MessageOrderListener;
  7. import com.aliyun.openservices.ons.api.order.OrderAction;
  8. import com.aliyun.openservices.ons.api.order.OrderConsumer;
  9. import java.util.Properties;
  10. public class ConsumerClient {
  11. public static void main(String[] args) {
  12. Properties properties = new Properties();
  13. // The topic you created on the console
  14. properties.put(PropertyKeyConst.ConsumerId, "XXX");
  15. // AccessKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  16. properties.put(PropertyKeyConst.AccessKey, "XXX");
  17. // SecretKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  18. properties.put(PropertyKeyConst.SecretKey, "XXX");
  19. // Set a TCP access domain name (the following uses public cloud production environment as an example)
  20. properties.put(PropertyKeyConst.ONSAddr,
  21. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
  22. // The waiting time before retry after the ordered message consumption fails (ms)
  23. properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
  24. // The max retry times when message consumption fails
  25. properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");
  26. // Before sending messages, the start method must be called once to start the consumer.
  27. OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);
  28. consumer.subscribe(
  29. // The topic of the message
  30. "Jodie_Order_Topic",
  31. // Subscribe to all tags in a topic:
  32. // 1. * means subscribing to all messages
  33. // 2. TagA || TagB || TagC means subscribing to messages with TagA, TagB, or TagC
  34. "*",
  35. new MessageOrderListener() {
  36. /**
  37. * 1. When message consumption fails or processing exception occurs, return OrderAction.Suspend<br>
  38. * 2. When message processing succeeds, return OrderAction.Success
  39. */
  40. @Override
  41. public OrderAction consume(Message message, ConsumeOrderContext context) {
  42. System.out.println(message);
  43. return OrderAction.Success;
  44. }
  45. });
  46. consumer.start();
  47. }
  48. }
Thank you! We've received your feedback.