All Products
Search
Document Center

ApsaraMQ for RocketMQ:Subscribe to messages

Last Updated:Aug 21, 2023

This topic describes how to subscribe to messages by using the TCP client SDK for .NET provided by ApsaraMQ for RocketMQ.

Note

You must maintain consistent subscriptions for all consumer instances that are identified by the same group ID. For more information, see Subscription consistency.

ApsaraMQ for RocketMQ supports the following subscription modes:

  • Clustering subscription

    All consumers identified by the same group ID consume an equal number of messages. For example, a topic contains nine messages and a consumer group contains three consumers. In clustering consumption mode, each consumer consumes three messages.

        // Configure clustering subscription, which is the default mode.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);              
  • Broadcasting subscription

    Each of the consumers identified by the same group ID consumes all messages once. For example, a topic contains nine messages and a consumer group contains three consumers. In broadcasting consumption mode, each consumer consumes all nine messages.

        // Configure broadcasting subscription.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);                

Sample code:

using System;
using System.Threading;
using System.Text;
using ons;

// The callback function that is executed when a message is pulled from the ApsaraMQ for RocketMQ broker. 
public class MyMsgListener : MessageListener
{
    public MyMsgListener()
    {
    }

    ~MyMsgListener()
    {
    }

    public override ons.Action consume(Message value, ConsumeContext context)
    {
        Byte[] text = Encoding.Default.GetBytes(value.getBody());
        Console.WriteLine(Encoding.UTF8.GetString(text));
        return ons.Action.CommitMessage;
    }
}

public class ConsumerExampleForEx
{
    public ConsumerExampleForEx()
    {
    }

    static void Main(string[] args) {
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
        // The AccessKey ID that is used for authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
	      // The AccessKey secret that is used for authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
        // The topic that you created in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // The log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
        // Clustering consumption. 
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
        // Broadcasting consumption. 
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);

        // Create the consumer instance. 
        PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);

        // Subscribe to the topic. 
        consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());

        // Start the consumer instance. 
        consumer.start();

        // This setting is used only in this demo. In actual production environments, you cannot exit the process. 
        Thread.Sleep(300000);

        // Before you exit the process, terminate the consumer instance. 
        consumer.shutdown();
    }
}