edit-icon download-icon

Send messages - three types

Last Updated: Jun 25, 2018

There are three ways to send MQ messages: reliable synchronous transmission, reliable asynchronous transmission, and one-way transmission. This topic describes implementation principles of each transmission method, their scenarios, and differences between the three methods, as well as code examples.

Note: Only reliable synchronous transmission is supported for ordered messages.

  • Reliable synchronous transmission

    Concept: The sender of a message sends a second data packet only when it receives a response from the receiver for receiving the first packet.sync-send

Scenario: This method can be used in many scenarios, for example, important email notifications, registration SMS notifications, and marketing SMS systems.

  • Reliable asynchronous transmission

    Concept: The sender of a message sends another packet before receiving the response of the message from the receiver. This method requires users to implement the SendCallback interface. The sender proceeds to send a second message before receiving a server response for the first message. It receives the server response through the SendCallback interface and processes it asynchronously.

    async_send

    Scenario: This method is generally used in time consuming links for business scenarios that are sensitive to response time (RT). For example, after a user uploads a video, a notification is sent immediately to enable the transcoding service.After transcoding is complete, a notification is sent to push transcoding results.

  • One-way transmission

    Concept: The sender simply sends messages and does not wait for a server response, and no callback function is triggered. With this method, messages are sent within microseconds.

    oneway

    Scenario: This method applies to scenarios where messages need to be sent with the least time-consumption but requirement for reliability is low, for example, log collection.

The following table summarizes the features and main differences between the three methods.

Transmission Method Transmission TPS Transmission Result Feedback Reliability
Synchronous Fast Yes No message loss
Asynchronous Fast Yes No message loss
One-way Fastest No Possible message loss

Sample code

For TCP access point domain names, refer to TCP access instructions.

Synchronous transmission

  1. import com.aliyun.openservices.ons.api.Message;
  2. import com.aliyun.openservices.ons.api.Producer;
  3. import com.aliyun.openservices.ons.api.SendResult;
  4. import com.aliyun.openservices.ons.api.ONSFactory;
  5. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  6. import java.util.Properties;
  7. public class ProducerTest {
  8. public static void main(String[] args) {
  9. Properties properties = new Properties();
  10. //The producer ID you created on the console
  11. properties.put(PropertyKeyConst.ProducerId, "XXX");
  12. // AccessKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  13. properties.put(PropertyKeyConst.AccessKey,"XXX");
  14. // SecretKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  15. properties.put(PropertyKeyConst.SecretKey, "XXX");
  16. //Set transmission timeout (ms) period
  17. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
  18. // Set a TCP access domain name (the following uses public cloud production environment as an example)
  19. properties.put(PropertyKeyConst.ONSAddr,
  20. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
  21. Producer producer = ONSFactory.createProducer(properties);
  22. // Before sending messages, the start method must be called once to start the producer.
  23. producer.start();
  24. //Send messages in loop
  25. for (int i = 0; i < 100; i++){
  26. Message msg = new Message( //
  27. // The topic of the message
  28. "TopicTestMQ",
  29. //Message tag, which is similar to tag in Gmail, and is used to classify messages. Consumers can then set filtering conditions for messages to be filtered in MQ broker.
  30. "TagA",
  31. // Message body, which can be any data in binary format.
  32. // Consistent serialization and deserialization methods should be consistent between the producer and the consumer.
  33. "Hello MQ".getBytes());
  34. // The setting represents the key service property of the message, so please set it as globally unique as possible.
  35. // You can query a message and resend it through the MQ console when you cannot receive the message properly.
  36. // Note: Message sending and receiving is not affected if you do not configure this setting.
  37. msg.setKey("ORDERID_" + i);
  38. try {
  39. SendResult sendResult = producer.send(msg);
  40. // Synchronous message sending will succeed as long as an exception is not thrown.
  41. if (sendResult != null) {
  42. System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
  43. }
  44. }
  45. catch (Exception e) {
  46. // If the message sending fails and a retry is needed, the message can be re-sent or persisted for compensated processing.
  47. System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
  48. e.printStackTrace();
  49. }
  50. }
  51. // The producer object will be destroyed before exiting the application.
  52. // Note: It's ok if the producer object is not destroyed.
  53. producer.shutdown();
  54. }
  55. }

Asynchronous transmission

  1. import com.aliyun.openservices.ons.api.Message;
  2. import com.aliyun.openservices.ons.api.OnExceptionContext;
  3. import com.aliyun.openservices.ons.api.Producer;
  4. import com.aliyun.openservices.ons.api.SendCallback;
  5. import com.aliyun.openservices.ons.api.SendResult;
  6. import com.aliyun.openservices.ons.api.ONSFactory;
  7. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  8. import java.util.Properties;
  9. public static void main(String[] args) {
  10. Properties properties = new Properties();
  11. // AccessKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  12. properties.put(PropertyKeyConst.AccessKey, "DEMO_AK");
  13. // SecretKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  14. properties.put(PropertyKeyConst.SecretKey, "DEMO_SK");
  15. //The producer ID you have created on the console
  16. properties.put(PropertyKeyConst.ProducerId, "DEMO_PID");
  17. //Set transmission timeout (ms) period
  18. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
  19. // Set a TCP access domain name (the following uses public cloud production environment as an example)
  20. properties.put(PropertyKeyConst.ONSAddr,
  21. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
  22. Producer producer = ONSFactory.createProducer(properties);
  23. // Before sending messages, the start method must be called once to start the producer.
  24. producer.start();
  25. Message msg = new Message(
  26. // The topic of the message
  27. "TopicTestMQ",
  28. //Message tag, which is similar to tag in Gmail, and is used to classify messages. Consumers can then set filtering conditions for messages to be filtered in MQ broker.
  29. "TagA",
  30. // Message body, which can be any data in binary format. Consistent serialization and deserialization methods should be consistent between the producer and the consumer.
  31. "Hello MQ".getBytes());
  32. // The setting represents the key service property of the message, so please set it as globally unique as possible. // You can query a message and resend it through the MQ console when you cannot receive the message properly.
  33. // Note: Message sending and receiving is not affected if you do not configure this setting.
  34. msg.setKey("ORDERID_100");
  35. // Asynchronously sending the message, and the result will be returned to the client through callback.
  36. producer.sendAsync(msg, new SendCallback() {
  37. @Override
  38. public void onSuccess(final SendResult sendResult) {
  39. // Message sent successfully
  40. System.out.println("send message success. Topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
  41. }
  42. @Override
  43. public void onException(OnExceptionContext context) {
  44. // If the message sending fails and a retry is needed, the message can be re-sent or persisted for compensated processing.
  45. System.out.println("send message failed. Topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
  46. }
  47. });
  48. // msgId can be obtained before callback returns the result.
  49. System.out.println("send message async. Topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());
  50. // The producer object will be destroyed before exiting the application. // Note: It's ok if the producer object is not destroyed.
  51. producer.shutdown();
  52. }

One-way transmission

  1. import com.aliyun.openservices.ons.api.Message;
  2. import com.aliyun.openservices.ons.api.Producer;
  3. import com.aliyun.openservices.ons.api.ONSFactory;
  4. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  5. import java.util.Properties;
  6. public static void main(String[] args) {
  7. Properties properties = new Properties();
  8. // AccessKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  9. properties.put(PropertyKeyConst.AccessKey, "DEMO_AK");
  10. // SecretKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  11. properties.put(PropertyKeyConst.SecretKey, "DEMO_SK");
  12. //The producer ID you have created on the console
  13. properties.put(PropertyKeyConst.ProducerId, "DEMO_PID");
  14. //Set transmission timeout (ms) period
  15. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
  16. // Set a TCP access domain name (the following uses public cloud production environment as an example)
  17. properties.put(PropertyKeyConst.ONSAddr,
  18. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
  19. Producer producer = ONSFactory.createProducer(properties);
  20. // Before sending messages, the start method must be called once to start the producer.
  21. producer.start();
  22. //Send messages in loop
  23. for (int i = 0; i < 100; i++){
  24. Message msg = new Message(
  25. // The topic of the message
  26. "TopicTestMQ",
  27. //Message tag is similar to tag in Gmail, and is used to classify messages. Consumers can then set filtering conditions for messages to be filtered in MQ broker.
  28. "TagA",
  29. // Message body, which can be any data in binary format. Consistent serialization and deserialization methods should be consistent between the producer and the consumer.
  30. "Hello MQ".getBytes());
  31. // The setting represents the key service property of the message, so please set it as globally unique as possible.
  32. // You can query a message and resend it through the MQ console when you cannot receive the message properly.
  33. // Note: Message sending and receiving is not affected if you do not configure this setting.
  34. msg.setKey("ORDERID_" + i);
  35. // As there is no request for response processing during one-way transmission, once the message fails to be sent, the data will be lost because no retry will be triggered. If the data cannot be lost, it is recommended to use reliable synchronous or reliable asynchronous transmission method.
  36. producer.sendOneway(msg);
  37. }
  38. // The producer object will be destroyed before exiting the application.
  39. // Note: It's ok if the producer object is not destroyed.
  40. producer.shutdown();
  41. }
Thank you! We've received your feedback.