edit-icon download-icon

Send messages - multiple threads

Last Updated: Jun 19, 2018

MQ consumer and producer client objects are thread-safe, which can be shared between multiple threads.

You can deploy multiple producer and consumer instances on a server (or multiple servers), or you can use multi-threads within one producer or consumer instance to send or receive messages, so as to increase message sending and receiving TPS. Do not create a client instance for every thread.

Note: For information on TCP access point domain names, see TCP access instructions.

The sample code for sharing producer between multiple threads is as follows:

  1. import com.aliyun.openservices.ons.api.Message;
  2. import com.aliyun.openservices.ons.api.Producer;
  3. import com.aliyun.openservices.ons.api.ONSFactory;
  4. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  5. import com.aliyun.openservices.ons.api.SendResult;
  6. import java.util.Properties;
  7. public class SharedProducer {
  8. public static void main(String[] args) {
  9. // Initialize producer instance configurations
  10. Properties properties = new Properties();
  11. //The producer ID you created on the console
  12. properties.put(PropertyKeyConst.ProducerId, "XXX");
  13. // AccessKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  14. properties.put(PropertyKeyConst.AccessKey,"XXX");
  15. // SecretKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  16. properties.put(PropertyKeyConst.SecretKey, "XXX");
  17. //Set transmission timeout (ms) period
  18. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
  19. // Set a TCP access domain name (the following uses public cloud production environment as an example)
  20. properties.put(PropertyKeyConst.ONSAddr,
  21. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
  22. final Producer producer = ONSFactory.createProducer(properties);
  23. // Before sending messages, the start method must be called once to start the producer.
  24. producer.start();
  25. // The producer and consumer objects are thread-safe, which can be shared between multiple threads. Do not create an instance for every thread.
  26. final Message msg = new Message( //
  27. // The topic of the message
  28. "TopicTestMQ",
  29. //Message tag, which is similar to tag in Gmail, and is used to classify messages. Consumers can then set filtering conditions for messages to be filtered in MQ broker.
  30. "TagA",
  31. // Message body, which can be any data in binary format.
  32. // Serialization and deserialization methods need to be negotiated and remain consistent between the producer and the consumer.
  33. "Hello MQ".getBytes());
  34. // Share producer object between a thread and another thread, and the two send messages concurrently to the MQ.
  35. Thread thread = new Thread(new Runnable() {
  36. @Override
  37. public void run() {
  38. try {
  39. SendResult sendResult = producer.send(message);
  40. // Synchronous message sending will succeed as long as no exception is thrown.
  41. if (sendResult != null) {
  42. System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
  43. }
  44. } catch (Exception e) {
  45. // If the message sending fails and a retry is needed, the message can be re-sent or persisted for compensated processing.
  46. System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
  47. e.printStackTrace();
  48. }
  49. }
  50. });
  51. thread.start();
  52. Thread anotherThread = new Thread(new Runnable() {
  53. @Override
  54. public void run() {
  55. try {
  56. SendResult sendResult = producer.send(message);
  57. // Synchronous message sending will succeed as long as no exception is thrown.
  58. if (sendResult != null) {
  59. System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
  60. }
  61. } catch (Exception e) {
  62. // If the message sending fails and a retry is needed, the message can be re-sent or persisted for compensated processing.
  63. System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
  64. e.printStackTrace();
  65. }
  66. }
  67. });
  68. anotherThread.start();
  69. // If the producer instance will not be used any more, it can be shut down to release resources.
  70. // producer.shutdown();
  71. }
  72. }
Thank you! We've received your feedback.