All Products
Search
Document Center

Subscribe to messages

Last Updated: Mar 29, 2019

This topic describes how to subscribe to messages by using the .NET SDK of Aliware RocketMQ.

Note:

  • Maintain consistent subscription for all consumer instances with the same group ID. For more information, see Subscription consistency.

Subscription modes

RocketMQ supports the following two message subscription modes:

  • Clustering subscription: All the consumers identified by the same group ID equally share messages. For example, a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes three messages.

    1. // The configuration of clustering subscription (default mode).
    2. factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
  • Broadcasting subscription: All the consumers identified by the same group ID consume every message once. For example, a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes nine messages.

    1. // The configuration of broadcasting subscription.
    2. factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);

Sample code

  1. using System;
  2. using System.Threading;
  3. using System.Text;
  4. using ons;
  5. // The callback function you need to execute when the message is pulled from the broker.
  6. public class MyMsgListener : MessageListener
  7. {
  8. public MyMsgListener()
  9. {
  10. }
  11. ~MyMsgListener()
  12. {
  13. }
  14. public override ons.Action consume(Message value, ConsumeContext context)
  15. {
  16. Byte[] text = Encoding.Default.GetBytes(value.getBody());
  17. Console.WriteLine(Encoding.UTF8. GetString(text));
  18. return ons.Action.CommitMessage;
  19. }
  20. }
  21. public class ConsumerExampleForEx
  22. {
  23. public ConsumerExampleForEx()
  24. {
  25. }
  26. static void Main(string[] args) {
  27. // Configure your account according to the following settings. You can obtain these settings in the console.
  28. ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
  29. // The AccessKeyId you created in the Alibaba Cloud console for identity authentication.
  30. factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
  31. // The AccessKeySecret you created in the Alibaba Cloud console for identity authentication.
  32. factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
  33. // The group ID you created in the console.
  34. factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
  35. // The topic you created in the console.
  36. factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
  37. // Set the TCP endpoint: Go to the **Instances** page in the RocketMQ console, and view the endpoint in the **Endpoint Information** area.
  38. factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
  39. // Set the log path.
  40. factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
  41. // Clustering consumption
  42. // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
  43. // Broadcasting consumption
  44. // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);
  45. // Create consumer instances.
  46. PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);
  47. // Subscribe to topics.
  48. consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());
  49. // Start the instance at the client.
  50. consumer.start();
  51. //This setting is only used in the demo. In actual production environment, you cannot exit the process.
  52. Thread.Sleep(300000);
  53. // Disable the consumer instance when the process is about to exit.
  54. consumer.shutdown();
  55. }
  56. }