This topic describes how to subscribe to messages by using the .NET SDK of Message Queue for Apache RocketMQ.

Note Maintain consistent subscriptions for all consumer instances with the same group ID. For more information, see Subscription consistency.

Message Queue for Apache RocketMQ supports the following modes:

  • Clustering subscription

    All consumers identified by the same group ID consume messages in an even manner. For example, a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes three messages.

        // The configuration of clustering subscription (default mode).
        factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);              
  • Broadcasting subscription

    Each of the consumers identified by the same group ID consumes all messages once. For example, a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes nine messages.

        // The configuration of broadcasting subscription.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);                

Sample code

using System;
using System.Threading;
using System.Text;
using ons;

// The callback function to be executed when a message is pulled from the broker.
public class MyMsgListener : MessageListener
{
    public MyMsgListener()
    {
    }

    ~MyMsgListener()
    {
    }

    public override ons.Action consume(Message value, ConsumeContext context)
    {
        Byte[] text = Encoding.Default.GetBytes(value.getBody());
        Console.WriteLine(Encoding.UTF8.GetString(text));
        return ons.Action.CommitMessage;
    }
}

public class ConsumerExampleForEx
{
    public ConsumerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // Configure your account according to the settings in the console.
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // The AccessKey ID you created in the Alibaba Cloud console for identity authentication.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // The AccessKey secret you created in the Alibaba Cloud console for identity authentication.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // The group ID you created in the console.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
        // The topic you created in the console.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // The log path.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
        // The clustering consumption pattern.
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
        // The broadcasting consumption pattern.
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);

        // Create consumer instances.
        PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);

        // Subscribe to topics.
        consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());

        // Start the consumer instance.
        consumer.start();

        // This setting is only for the demo. In actual production environments, you must not exit the process.
        Thread.Sleep(300000);

        // Disable the consumer instance when the process is about to exit.
        consumer.shutdown();
    }
}