After you create the required resources in the Message Queue for Apache RocketMQ console, you can use Message Queue for Apache RocketMQ TCP client SDK to send and subscribe to normal messages.

Prerequisites

  • Create resources
    Note Normal messages are used in the provided examples. The topic that is created for normal messages cannot be used to send and subscribe to other types of messages, such as scheduled messages, delayed messages, ordered messages, and transactional messages. You must create a topic based on the message type of your messages.
  • An AccessKey pair is obtained.

Download and install a TCP client SDK

Note Commercial SDKs provide more features and higher stability than open source SDKs. We recommend that you use commercial SDKs provided by Message Queue for Apache RocketMQ. Open source SDKs can be used only when you migrate your data from open source Apache RocketMQ to Message Queue for Apache RocketMQ without modifying the code.

Message Queue for Apache RocketMQ provides the following commercial TCP client SDKs. Obtain a client SDK for a specific programming language based on your business requirements.

Use TCP client SDKs to send normal messages

After you obtain the client SDK for a specific programming language, you can run the sample code of the programming language to send normal messages:

                           
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.OnExceptionContext;
    import com.aliyun.openservices.ons.api.Producer;
    import com.aliyun.openservices.ons.api.SendCallback;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;

    import java.util.Properties;

    public class ProducerTest {
        public static void main(String[] args) {
            Properties properties = new Properties();
            // Specify the AccessKey ID that you created in the Alibaba Cloud Management Console. The AccessKey ID is used for identity authentication. 
            properties.put(PropertyKeyConst.AccessKey, "XXX");
            // Specify the AccessKey secret that you created in the Alibaba Cloud Management Console. The AccessKey secret is used for identity authentication. 
            properties.put(PropertyKeyConst.SecretKey, "XXX");
            // Specify the timeout period for sending a message. Unit: milliseconds. 
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
            // Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section on the Instance Details page in the Message Queue for Apache RocketMQ console. 
            properties.put(PropertyKeyConst.NAMESRV_ADDR,
              "XXX");

            Producer producer = ONSFactory.createProducer(properties);
            // Before you send a message, call the start() method to start your producer. You can call this method only once. 
            producer.start();

            Message msg = new Message(
                    // Specify the topic to which the messages belong. 
                    "TopicTestMQ",
                    // Specify the message tag. A message tag is similar to a Gmail label. A message tag is used to classify messages in a topic and facilitates a consumer to filter messages in a Message Queue for Apache RocketMQ broker based on specified conditions. 
                    "TagA",
                    // Specify the message body that is in the binary format. Message Queue for Apache RocketMQ does not process the message body. The serialization method used by the producer and the deserialization method used by the consumer must be consistent. 
                    "Hello MQ".getBytes());

            // Specify the message key. We recommend that you specify a globally unique key. A globally unique key helps you query and resend a message in the Message Queue for Apache RocketMQ console if the message fails to be received. 
            // Take note that you can send and subscribe to messages without the need to specify a message key. 
            msg.setKey("ORDERID_100");

            // Send the messages in asynchronous mode. The callback function returns the result to the client. 
            producer.sendAsync(msg, new SendCallback() {
                @Override
                public void onSuccess(final SendResult sendResult) {
                    // A message is sent. 
                    System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
                }

                @Override
                public void onException(OnExceptionContext context) {
                    // A message failed to be sent and needs to be sent again. You can resend the message or persist the message. 
                    System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
                }
            });

            // Obtain the value of msgId before the callback function returns the result. 
            System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());

            // Before you exit the application, shut down your producer object.  Take note that you are not required to shut down the producer object. 
            producer.shutdown();
        }
    }                
                           
using System;
using ons;

public class ProducerExampleForEx
{
    public ProducerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // Configure your account based on the information in the Alibaba Cloud Management Console. 
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // Specify the AccessKey ID that you created in the Alibaba Cloud Management Console. The AccessKey ID is used for identity authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // Specify the AccessKey secret that you created in the Alibaba Cloud Management Console. The AccessKey secret is used for identity authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // Specify the ID of the group that you created in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // Specify the topic that you created in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section on the Instance Details page in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // Specify the log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // Create a producer instance. 
        // Take note that a producer instance is thread-safe and can be used to send messages to different topics. In most cases, each thread requires only one producer instance. 
        Producer producer = ONSFactory.getInstance().createProducer(factoryInfo);

        // Start the producer instance. 
        producer.start();

        // Create a message object. 
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        msg.setKey(Guid.NewGuid().ToString());
        for (int i = 0; i < 32; i++) {
            try
            {
                SendResultONS sendResult = producer.send(msg);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            }
            catch (Exception ex)
            {
                Console.WriteLine("send failure{0}", ex.ToString());
            }
        }

        // Before you exit your thread, shut down the producer instance. 
        producer.shutdown();

    }
}    
                           
#include "ONSFactory.h"
#include "ONSClientException.h"

using namespace ons;

int main()
{

    // Create a producer and set the parameters that are required to send messages. 
    ONSFactoryProperty factoryInfo;   
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");// Specify he ID of the group that you created in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); // Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section on the Instance Details page in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// Specify the topic that you created in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");// Specify the message content. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");// Specify the AccessKey ID that you created in the Alibaba Cloud Management Console. The AccessKey ID is used for identity authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX" );// Specify the AccessKey secret that you created in the Alibaba Cloud Management Console. The AccessKey secret is used for identity authentication. 


    //create producer;
    Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);

    // Before you send a message, call the start() method to start the producer. You can call this method only once. 
    pProducer->start();

    Message msg(
            //Message Topic
            factoryInfo.getPublishTopics(),
            // Specify the message tag. A message tag is similar to a Gmail label. A message tag is used to classify messages in a topic and facilitates a consumer to filter messages in a Message Queue for Apache RocketMQ broker based on specified conditions. 
            "TagA",
            // Specify the message body. You must specify the message body. Message Queue for Apache RocketMQ does not process the message body. The serialization method used by the producer and the deserialization method used by the consumer must be consistent. 
            factoryInfo.getMessageContent()
    );

    // Specify the message key. We recommend that you specify a globally unique key. 
    // A globally unique key helps you query and resend a message in the Message Queue for Apache RocketMQ console if the message fails to be received. 
    // Take note that you can send and subscribe to messages without the need to specify a message key. 
    msg.setKey("ORDERID_100");

    // Send the messages. If no exception occurs, the messages are sent. 
    try
    {
        SendResultONS sendResult = pProducer->send(msg);
    }
    catch(ONSClientException & e)
    {
        // Specify how to process exceptions. 
    }
    // Before you exit your application, shut down your producer object. Otherwise, issues such as memory leaks may occur. 
    pProducer->shutdown();

    return 0;
}

You can also start your instance by performing the following steps: Log on to the Message Queue for Apache RocketMQ console. Find the created instance and click More in the Actions column. Select Quick Start from the drop-down list.

Check whether messages are sent

After a message is sent, you can check its status in the Message Queue for Apache RocketMQ console by performing the following operations:

  1. On the Instance Details page, click Message Query in the left-side navigation pane.
  2. On the Message Query page, select a query method and specify the parameters as required, and then click Search.

    Stored At indicates the time when the Message Queue for Apache RocketMQ broker stores the message. If a message can be queried, the message has been sent to the Message Queue for Apache RocketMQ broker.

Notice This step demonstrates the scenario where Message Queue for Apache RocketMQ is used for the first time and the consumer has not been started. Therefore, no consumption data is displayed in the console. To start the consumer and subscribe to messages, see the next section. For more information about the message status, see Query a message and Query a message trace.

Use TCP client SDKs to subscribe to normal messages

After normal messages are sent, you must start consumers to subscribe to the messages. You can use the following sample code based on the programming language and your business requirements to start a consumer and test the message subscription feature. Specify parameters based on the instructions.

                           
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class ConsumerTest {
   public static void main(String[] args) {
       Properties properties = new Properties();
        // Specify the ID of the group that you created in the Message Queue for Apache RocketMQ console. 
       properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
       properties.put(PropertyKeyConst.AccessKey, "XXX");
        // Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
       properties.put(PropertyKeyConst.SecretKey, "XXX");
        // Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section on the Instance Details page in the Message Queue for Apache RocketMQ console. 
       properties.put(PropertyKeyConst.NAMESRV_ADDR,
         "XXX");
          // Specify the clustering consumption mode. This mode is used by default. 
          // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
          // Specify the broadcasting consumption mode. 
          // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

       Consumer consumer = ONSFactory.createConsumer(properties);
       consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { // Subscribe to multiple tags.
           public Action consume(Message message, ConsumeContext context) {
               System.out.println("Receive: " + message);
               return Action.CommitMessage;
           }
       });

        // Subscribe to another topic. To unsubscribe from this topic, delete the corresponding code for subscription and restart your consumer. 
        consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // Subscribe to all tags. 
           public Action consume(Message message, ConsumeContext context) {
               System.out.println("Receive: " + message);
               return Action.CommitMessage;
           }
       });

       consumer.start();
       System.out.println("Consumer Started");
   }
}            
                           
using System;
using System.Threading;
using System.Text;
using ons;

// Specify the callback function that is executed when a message is pulled from a Message Queue for Apache RocketMQ broker. 
public class MyMsgListener : MessageListener
{
    public MyMsgListener()
    {
    }

    ~MyMsgListener()
    {
    }

    public override ons.Action consume(Message value, ConsumeContext context)
    {
        Byte[] text = Encoding.Default.GetBytes(value.getBody());
        Console.WriteLine(Encoding.UTF8.GetString(text));
        return ons.Action.CommitMessage;
    }
}

public class ConsumerExampleForEx
{
    public ConsumerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // Configure your account based on the information in the Alibaba Cloud Management Console. 
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // Specify the ID of the group that you created in the Message Queue for Apache RocketMQ console.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
        // Specify the topic that you created in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section on the Instance Details page in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // Specify the log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
        // Specify the clustering consumption mode. 
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
        // Specify the broadcasting consumption mode. 
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);

        // Create a consumer instance. 
        PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);

        // Subscribe to topics. 
        consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());

        // Start the consumer instance. 
        consumer.start();

        // This configuration can be used only in this sample code. In actual production environments, make sure that your process does not exit. 
        Thread.Sleep(300000);

        // Before you exit your process, shut down the consumer instance. 
        consumer.shutdown();
    }
}        
                           
#include "ONSFactory.h"
using namespace ons;

// Create a MyMsgListener instance to consume the messages. 
// After pushConsumer pulls the messages, the consumer function of the MyMsgListener instance is called. 
class MyMsgListener : public MessageListener
{

    public:

        MyMsgListener()
        {
        }

        virtual ~MyMsgListener()
        {
        }

        virtual Action consume(Message &message, ConsumeContext &context)
        {
            // Specify how to process the messages. 
            return CommitMessage; //CONSUME_SUCCESS;
        }
};


int main(int argc, char* argv[])
{

    // Specify the parameters that are required to create pushConsumer and make pushConsumer work. You must specify the parameters. 
    ONSFactoryProperty factoryInfo;
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");// Specify the ID of the group that you created in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); // Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section on the Instance Details page in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// Specify the topic that you created in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");// Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey,  "XXX");// Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
      // Specify the clustering consumption mode. This mode is used by default. 
      // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
      // Specify the broadcasting consumption mode. 
      // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);

    //create pushConsumer
    PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    // Specify the topic to which pushConsumer subscribes and the tag, and register a message callback function. 
    MyMsgListener  msglistener;
    pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    //start pushConsumer
    pushConsumer->start();

    // Take note that the shutdown() method can be called only if your consumer no longer subscribes to messages. After the shutdown() method is called, the consumer exits and can no longer subscribes to messages. 

    // Shut down pushConsumer. Before you exit your application, shut down pushConsumer. Otherwise, issues such as memory leaks may occur. 
    pushConsumer->shutdown();
    return 0;

}

Check whether the message subscription is successful

  1. On the Instance Details page of the instance, click Groups in the left-side navigation pane.
  2. On the Groups page, click the TCP tab.
  3. Find the Group ID whose subscription status that you want to view, and click Details in the Actions column.
    If the value of Consumer Status is Online and the value of the Is Subscription Consistent parameter is Yes, the message subscription is successful.