All Products
Search
Document Center

Send and receive ordered messages

Last Updated: Sep 13, 2019

Use Java SDK 1.2.7 or later to send and subscribe to ordered messages.

Ordered messages are delivered and consumed in order. MQ provides this type of messages for applications that require message delivery and consumption in strict FIFO order. For more information, see Ordered messages.

Globally and partitionally ordered messages are sent and received in similar ways. See the following sample code for reference.

Send ordered messages

The sample code for sending messages is as follows:

  1. package com.aliyun.openservices.ons.example.order;
  2. import com.aliyun.openservices.ons.api.Message;
  3. import com.aliyun.openservices.ons.api.ONSFactory;
  4. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  5. import com.aliyun.openservices.ons.api.SendResult;
  6. import com.aliyun.openservices.ons.api.order.OrderProducer;
  7. import java.util.Properties;
  8. public class ProducerClient {
  9. public static void main(String[] args) {
  10. Properties properties = new Properties();
  11. // The group ID you created in the console.
  12. properties.put(PropertyKeyConst.GROUP_ID, "XXX");
  13. // The AccessKeyId you created in the Alibaba Cloud console for identity authentication.
  14. properties.put(PropertyKeyConst.AccessKey, "XXX");
  15. // The AccessKeySecret you created in the Alibaba Cloud console for identity authentication.
  16. properties.put(PropertyKeyConst.SecretKey, "XXX");
  17. // Set the TCP endpoint: Go to the **Instances** page in the MQ console, and view the endpoint in the **Endpoint Information** area.
  18. properties.put(PropertyKeyConst.NAMESRV_ADDR,
  19. "XXX ");
  20. OrderProducer producer = ONSFactory.createOrderProducer(properties);
  21. //Before sending a message, call the start() method once to start the producer.
  22. producer.start();
  23. for (int i = 0; i < 1000; i++) {
  24. String orderId = "biz_" + i % 10;
  25. Message msg = new Message(//
  26. // The topic of the message.
  27. "Order_global_topic ",
  28. // The message tag, which is similar to a Gmail tag. It is used to sort messages, enabling the consumer to filter messages on the MQ broker based on the specified criteria.
  29. "TagA",
  30. // The message body in any binary format. MQ does not process the message body. The producer and consumer must negotiate the consistent serialization and deserialization methods.
  31. "send order global msg".getBytes()
  32. );
  33. // Set a key service property representing the message, that is, the message key, and try to keep it globally unique.
  34. // A unique identifier enables you to query a message and resend it in the console if you fail to receive the message.
  35. // Note: Messages can still be sent and received if you do not set this attribute.
  36. msg.setKey(orderId);
  37. // The key field that identifies the partition of partitionally ordered messages. This sharding key is different from the key of normal messages.
  38. // This field can be set to any non-empty string for globally ordered messages.
  39. String shardingKey = String.valueOf(orderId);
  40. try {
  41. SendResult sendResult = producer.send(msg, shardingKey);
  42. // The message is sent if no exception is thrown.
  43. if (sendResult ! = null) {
  44. System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
  45. }
  46. }
  47. catch (Exception e) {
  48. // The message failed to be sent and requires a retry. The system can resend the message or store message data persistently.
  49. System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
  50. e.printStackTrace();
  51. }
  52. }
  53. // Destroy the producer object before exiting from the application.
  54. // Note: You can choose not to destroy the producer object.
  55. producer.shutdown();
  56. }
  57. }

Subscribe to ordered messages

The sample code for subscribing to ordered messages is as follows:

  1. package com.aliyun.openservices.ons.example.order;
  2. import com.aliyun.openservices.ons.api.Message;
  3. import com.aliyun.openservices.ons.api.ONSFactory;
  4. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  5. import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
  6. import com.aliyun.openservices.ons.api.order.MessageOrderListener;
  7. import com.aliyun.openservices.ons.api.order.OrderAction;
  8. import com.aliyun.openservices.ons.api.order.OrderConsumer;
  9. import java.util.Properties;
  10. public class ConsumerClient {
  11. public static void main(String[] args) {
  12. Properties properties = new Properties();
  13. // The group ID you created in the console.
  14. properties.put(PropertyKeyConst.GROUP_ID, "XXX");
  15. // The AccessKeyId you created in the Alibaba Cloud console for identity authentication.
  16. properties.put(PropertyKeyConst.AccessKey, "XXX");
  17. // The AccessKeySecret you created in the Alibaba Cloud console for identity authentication.
  18. properties.put(PropertyKeyConst.SecretKey, "XXX");
  19. // Set the TCP endpoint: Go to the **Instances** page in the MQ console, and view the endpoint in the **Endpoint Information** area.
  20. properties.put(PropertyKeyConst.NAMESRV_ADDR,
  21. "XXX ");
  22. // Set the delay time (in milliseconds) before the retry upon an ordered message consumption failure. Value range: 10~1800.
  23. properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
  24. // Set the maximum number of retries upon a message consumption failure.
  25. properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
  26. // Before message subscription, call the start() method once to start the consumer.
  27. OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);
  28. consumer.subscribe(
  29. // The topic of the message.
  30. "Jodie_Order_Topic ",
  31. // Subscribe to message tags under the specified topic.
  32. // 1. * indicates the subscription of all messages.
  33. // 2. TagA || TagB || TagC indicates the subscription of messages with TagA, TagB, or TagC.
  34. "*",
  35. new MessageOrderListener() {
  36. /**
  37. * 1. OrderAction.Suspend is returned if a message fails to be consumed or an exception occurs during message processing.<br>
  38. * 2. OrderAction.Success is returned if a message is processed.
  39. */
  40. @Override
  41. public OrderAction consume(Message message, ConsumeOrderContext context) {
  42. System.out.println(message);
  43. return OrderAction.Success;
  44. }
  45. });
  46. consumer.start();
  47. }
  48. }