All Products
Search
Document Center

Send messages (using multiple threads)

Last Updated: Mar 29, 2019

The consumer and producer client objects of RocketMQ are thread-secure and can be shared among multiple threads.

You can deploy multiple producer and consumer instances on one or more servers. A producer or consumer instance can also run multiple threads to send or receive messages, improving the message transmitting or receiving TPS. Do not create a client instance for every thread.

The sample code for sharing a producer instance among multiple threads is as follows:

  1. import com.aliyun.openservices.ons.api.Message;
  2. import com.aliyun.openservices.ons.api.Producer;
  3. import com.aliyun.openservices.ons.api.ONSFactory;
  4. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  5. import com.aliyun.openservices.ons.api.SendResult;
  6. import java.util.Properties;
  7. public class SharedProducer {
  8. public static void main(String[] args) {
  9. // Initialize the producer configuration.
  10. Properties properties = new Properties();
  11. // The group ID you created in the console.
  12. properties.put(PropertyKeyConst.GROUP_ID, "XXX");
  13. // The AccessKeyId you created in the Alibaba Cloud console for identity authentication.
  14. properties.put(PropertyKeyConst.AccessKey,"XXX");
  15. // The AccessKeySecret you created in the Alibaba Cloud console for identity authentication.
  16. properties.put(PropertyKeyConst.SecretKey, "XXX");
  17. // Set the message transmission timeout period (in milliseconds).
  18. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
  19. // Set the TCP endpoint: Go to the **Instances** page in the RocketMQ console, and view the endpoint in the **Endpoint Information** area.
  20. properties.put(PropertyKeyConst.NAMESRV_ADDR,
  21. "XXX ");
  22. final Producer producer = ONSFactory.createProducer(properties);
  23. //Before sending a message, call the start() method once to start the producer.
  24. producer.start();
  25. // The created producer and consumer objects are thread-secure and can be shared among multiple threads. Do not create a client instance for every thread.
  26. // Two threads share the producer object and concurrently send messages to RocketMQ.
  27. Thread thread = new Thread(new Runnable() {
  28. @Override
  29. public void run() {
  30. try {
  31. Message msg = new Message( //
  32. // The topic of the message.
  33. "TopicTestMQ",
  34. // The message tag, which is similar to a Gmail tag. It is used to sort messages, enabling the consumer to filter messages on the RocketMQ broker based on the specified criteria.
  35. "TagA",
  36. // The message body in any binary format. RocketMQ does not process the message body.
  37. // The producer and consumer must negotiate the consistent serialization and deserialization methods.
  38. "Hello MQ".getBytes());
  39. SendResult sendResult = producer.send(msg);
  40. // Send the message in synchronous mode. The message is sent if no exception is thrown.
  41. if (sendResult ! = null) {
  42. System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
  43. }
  44. } catch (Exception e) {
  45. // The message failed to be sent and requires a retry. The system can resend the message or store message data persistently.
  46. System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
  47. e.printStackTrace();
  48. }
  49. }
  50. });
  51. thread.start();
  52. Thread anotherThread = new Thread(new Runnable() {
  53. @Override
  54. public void run() {
  55. try {
  56. Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
  57. SendResult sendResult = producer.send(msg);
  58. // Send the message in synchronous mode. The message is sent if no exception is thrown.
  59. if (sendResult ! = null) {
  60. System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
  61. }
  62. } catch (Exception e) {
  63. // The message failed to be sent and requires a retry. The system can resend the message or store message data persistently.
  64. System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
  65. e.printStackTrace();
  66. }
  67. }
  68. });
  69. anotherThread.start();
  70. // If the producer instance is no longer used, disable it to release resources.
  71. // producer.shutdown();
  72. }
  73. }