This topic describes how to subscribe to messages by using the .NET SDK of Message Queue for Apache RocketMQ.
Message Queue for Apache RocketMQ supports the following modes:
- Clustering subscription
All consumers identified by the same group ID consume messages in an even manner. For example, a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes three messages.
// The configuration of clustering subscription (default mode). factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
- Broadcasting subscription
Each of the consumers identified by the same group ID consumes all messages once. For example, a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes nine messages.
// The configuration of broadcasting subscription. factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
Sample code
using System;
using System.Threading;
using System.Text;
using ons;
// The callback function to be executed when a message is pulled from the broker.
public class MyMsgListener : MessageListener
{
public MyMsgListener()
{
}
~MyMsgListener()
{
}
public override ons.Action consume(Message value, ConsumeContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.Action.CommitMessage;
}
}
public class ConsumerExampleForEx
{
public ConsumerExampleForEx()
{
}
static void Main(string[] args) {
// Configure your account according to the settings in the console.
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// The AccessKey ID you created in the Alibaba Cloud console for identity authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
// The AccessKey secret you created in the Alibaba Cloud console for identity authentication.
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
// The group ID you created in the console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
// The topic you created in the console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// The log path.
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// The clustering consumption pattern.
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
// The broadcasting consumption pattern.
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);
// Create consumer instances.
PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);
// Subscribe to topics.
consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());
// Start the consumer instance.
consumer.start();
// This setting is only for the demo. In actual production environments, you must not exit the process.
Thread.Sleep(300000);
// Disable the consumer instance when the process is about to exit.
consumer.shutdown();
}
}