edit-icon download-icon

Subscribe to messages

Last Updated: Jun 19, 2018

This topic introduces how to subscribe to messages using .NET SDK.

Note:

  • Make sure the subscription relationships of all consumer instances under the same consumer ID are consistent. For detailed information, see Subscription consistency.
  • For information on TCP access point domain names, see TCP access instruction.

Subscription Methods

MQ supports the following two subscription methods:

  • Clustering Subscription: All the consumers with the same consumer ID evenly share message consumption. For example, if there are 9 messages under a certain topic, and 3 consumer instances with same consumer ID, then in clustering consumption mode, each instance consumes 3 messages respectively.

    1. // Clustering subscription settings (it is the default method if not configured)
    2. factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
  • Broadcasting subscription: All the consumers with the same consumer ID consume each message once. For example, if there are 9 messages under a certain topic, and 3 consumer instances with the same consumer ID, then each instance consumes all the 9 messages respectively.

    1. ## Broadcasting subscription settings
    2. factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);

Sample Code

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Runtime.InteropServices;
  6. using ons;
  7. namespace ons
  8. {
  9. // When pushConsumer has pulled the message, it will actively invoke the consumer function of the instance.
  10. public class MyMsgListener : MessageListener
  11. {
  12. public MyMsgListener()
  13. {
  14. }
  15. ~MyMsgListener()
  16. {
  17. }
  18. public override Action consume(Message value, ConsumeContext context)
  19. {
  20. // The message includes the consumed message, and the message body can be obtained through getBody interface
  21. Console.WriteLine(value.getBody());
  22. /*
  23. All questions related to Chinese encoding are answered in the document included in the SDK package, please read it carefully
  24. */
  25. return ons.Action.CommitMessage;
  26. }
  27. }
  28. class onscsharp
  29. {
  30. static void Main(string[] args)
  31. {
  32. // Mandatory parameter required for the creation and operation of pushConsumer
  33. ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
  34. factoryInfo.setFactoryProperty(factoryInfo.ConsumerId, "XXX");//The consumer ID you have created in the MQ console
  35. factoryInfo.setFactoryProperty(factoryInfo.PublishTopics, "XXX");//The topic you have created in the MQ console
  36. factoryInfo.setFactoryProperty(factoryInfo.AccessKey,"xx");//AccessKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  37. factoryInfo.setFactoryProperty(factoryInfo.SecretKey, "xxxx");//SecretKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  38. // Clustering subscription (default)
  39. // factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
  40. // Broadcasting subscription
  41. // factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
  42. // Create a consumer for message consumption
  43. ONSFactory onsfactory = new ONSFactory();
  44. PushConsumer pConsumer = onsfactory.getInstance().createPushConsumer(factoryInfo);
  45. // Register message callback for the consumer, and the callback function will be triggered once there is a message.
  46. MessageListener msgListener = new MyMsgListener();
  47. pConsumer.subscribe(factoryInfo.getPublishTopics(), "*", msgListener);
  48. // Start the consumer
  49. pConsumer.start();
  50. // When the consumer is started, messages will be pulled asynchronously. When the messages have been pulled, MQ calls back the consumer function of the MyMsgListener instance to pass the message body to the consumer through a parameter.
  51. // Here you can continue to code the service related logic. When the consumption is completed, call shutdown function and release the resource. The shutdown function also needs to be called when quitting the application.
  52. pConsumer.shutdown();
  53. }
  54. }
  55. }
Thank you! We've received your feedback.