After you create all the resources in the console, you can call Message Queue for Apache RocketMQ TCP SDK to send and subscribe to normal messages.

Prerequisites

  • Create resources
    Note In this topic, normal messages are used in the example. The topic that is created for normal messages cannot be used to send or subscribe to other types of messages, such as scheduled messages, delayed messages, ordered messages, and transactional messages. You must create topics based on message types.
  • Create an AccessKey pair

Download and install a TCP SDK

Note Commercial SDKs provide more features and higher stability than open source SDKs. We recommend that you use commercial SDKs provided by Message Queue for Apache RocketMQ. Open source SDKs are used only when you migrate open source Apache RocketMQ to Message Queue for Apache RocketMQ but do not want to modify the code.

Message Queue for Apache RocketMQ provides the following commercial TCP SDKs. Obtain the client SDK for a specific programming language as needed.

Call TCP SDKs to send normal messages

After you obtain the client SDK for a specific programming language, you can run the following sample code to send normal messages:

                           
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.OnExceptionContext;
    import com.aliyun.openservices.ons.api.Producer;
    import com.aliyun.openservices.ons.api.SendCallback;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;

    import java.util.Properties;

    public class ProducerTest {
        public static void main(String[] args) {
            Properties properties = new Properties();
            // The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
            properties.put(PropertyKeyConst.AccessKey, "XXX");
            // The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
            properties.put(PropertyKeyConst.SecretKey, "XXX");
            // Set a timeout period in milliseconds. 
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
            // Set a TCP endpoint. You can query the TCP endpoint in the TCP Endpoint section of the Instance Details page in the Message Queue for Apache RocketMQ console. 
            properties.put(PropertyKeyConst.NAMESRV_ADDR,
              "XXX");

            Producer producer = ONSFactory.createProducer(properties);
            // Before you send a message, call the start() method once to start the producer. 
            producer.start();

            Message msg = new Message(
                    // The topic of the message. 
                    "TopicTestMQ",
                    // The message tag, which is similar to a Gmail tag. It is used to sort messages and facilitates the consumer to filter messages on the Message Queue for Apache RocketMQ broker based on specified conditions. 
                    "TagA",
                    // The message body in the binary format. Message Queue for Apache RocketMQ does not process the message body. The producer and consumer must agree on the serialization and deserialization methods. 
                    "Hello MQ".getBytes());

            // The message key. We recommend that you specify a globally unique key. A unique key allows you to query and resend a message in the Message Queue for Apache RocketMQ console if the message fails to be received. 
            // Note: Messages can be sent and subscribed to even if you do not set this parameter. 
            msg.setKey("ORDERID_100");

            // Send the message in asynchronous mode. The callback function returns the result to the client. 
            producer.sendAsync(msg, new SendCallback() {
                @Override
                public void onSuccess(final SendResult sendResult) {
                    // The message is sent. 
                    System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
                }

                @Override
                public void onException(OnExceptionContext context) {
                    // Specify the logic to resend or persist the message if an error occurs. 
                    System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
                }
            });

            // Obtain the msgId parameter before the callback function returns the result. 
            System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());

            // Before you exit the application, shut down the producer object. Note: You can choose not to shut down the producer object. 
            producer.shutdown();
        }
    }                
                           
using System;
using ons;

public class ProducerExampleForEx
{
    public ProducerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // Configure your account based on the settings in the Alibaba Cloud Management Console. 
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // The group ID that you created in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // The topic that you created in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // Set a TCP endpoint. You can query the TCP endpoint in the TCP Endpoint section of the Instance Details page in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // Specify the log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // Create a producer instance. 
        // Note: A producer instance is thread-secure and can be used to send messages to different topics. Each thread 
         //requires only one producer instance.
        Producer producer = ONSFactory.getInstance().createProducer(factoryInfo);

        // Start the consumer instance. 
        producer.start();

        // Create a message object. 
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        msg.setKey(Guid.NewGuid().ToString());
        for (int i = 0; i < 32; i++) {
            try
            {
                SendResultONS sendResult = producer.send(msg);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            }
            catch (Exception ex)
            {
                Console.WriteLine("send failure{0}", ex.ToString());
            }
        }

        // Before you exit your thread, shut down the producer instance. 
        producer.shutdown();

    }
}    
                           
#include "ONSFactory.h"
#include "ONSClientException.h"

using namespace ons;

int main()
{

    // Create a producer instance and set the parameters that are required to send messages. 
    ONSFactoryProperty factoryInfo;   
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");// The group ID that you created in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); // Set a TCP endpoint. You can query the TCP endpoint in the TCP Endpoint section of the Instance Details page in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// The topic that you created in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");// The message content. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");// The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX" );// The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 


    //create producer;
    Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);

    // Before you send a message, call the start() method once to start the producer. 
    pProducer->start();

    Message msg(
            //Message Topic
            factoryInfo.getPublishTopics(),
            // The message tag, which is similar to a Gmail tag. It is used to sort messages and facilitates the consumer to filter messages on the Message Queue for Apache RocketMQ broker based on specified conditions. 
            "TagA",
            // The message body, which cannot be empty. Message Queue for Apache RocketMQ does not process the message body. The producer and consumer must agree on the serialization and deserialization methods. 
            factoryInfo.getMessageContent()
    );

    // The message key. We recommend that you specify a globally unique key. 
    // A unique key allows you to query and resend a message in the Message Queue for Apache RocketMQ console if the message fails to be received. 
    // Note: Messages can be sent and subscribed to even if you do not set this parameter. 
    msg.setKey("ORDERID_100");

    // Send the message. The message is sent if no error occurs. 
    try
    {
        SendResultONS sendResult = pProducer->send(msg);
    }
    catch(ONSClientException & e)
    {
        // Specify the logic to process errors. 
    }
    // Before you exit your application, shut down the producer object. Otherwise, memory leaks may occur. 
    pProducer->shutdown();

    return 0;
}

You can also start your instance by performing the following steps: Log on to the Message Queue for Apache RocketMQ console. Find the created instance and click More in the Actions column. Select Quick Start from the drop-down list.

Check whether messages are sent

After a message is sent, you can check its status in the Message Queue for Apache RocketMQ console by performing the following operations:

  1. On the Instance Details page, click Message Query in the left-side navigation pane.
  2. On the Message Query page, select a query method and specify the parameters as required, and then click Search.

    Stored At indicates the time when the Message Queue for Apache RocketMQ broker stores the message. If a message can be queried, the message has been sent to the Message Queue for Apache RocketMQ broker.

Notice This step demonstrates the scenario where Message Queue for Apache RocketMQ is used for the first time and the consumer has not been started. Therefore, no consumption data is displayed in the console. To start the consumer and subscribe to messages, see the next section. For more information about the message status, see Query messages and Query a message trace.

Call TCP SDKs to subscribe to normal messages

After a normal message is sent, you must start a consumer to subscribe to the message. You can use the following sample code for a specific programming language based on your business requirements to start the consumer and test the message subscription feature. Set the parameters based on the instructions.

                           
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class ConsumerTest {
   public static void main(String[] args) {
       Properties properties = new Properties();
        // The group ID that you created in the Message Queue for Apache RocketMQ console. 
       properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
       properties.put(PropertyKeyConst.AccessKey, "XXX");
        // The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
       properties.put(PropertyKeyConst.SecretKey, "XXX");
        // Set a TCP endpoint. You can query the TCP endpoint in the TCP Endpoint section of the Instance Details page in the Message Queue for Apache RocketMQ console. 
       properties.put(PropertyKeyConst.NAMESRV_ADDR,
         "XXX");
          // The clustering consumption mode. This mode is used by default. 
          // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
          // The broadcasting consumption mode. 
          // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

       Consumer consumer = ONSFactory.createConsumer(properties);
       consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { // Subscribe to multiple tags.
           public Action consume(Message message, ConsumeContext context) {
               System.out.println("Receive: " + message);
               return Action.CommitMessage;
           }
       });

        // Subscribe to another topic. To unsubscribe from this topic, delete the corresponding code for subscription and restart the consumer. 
        consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // Subscribe to all tags. 
           public Action consume(Message message, ConsumeContext context) {
               System.out.println("Receive: " + message);
               return Action.CommitMessage;
           }
       });

       consumer.start();
       System.out.println("Consumer Started");
   }
}            
                           
using System;
using System.Threading;
using System.Text;
using ons;

// The callback function that is executed when a message is pulled from the Message Queue for Apache RocketMQ broker. 
public class MyMsgListener : MessageListener
{
    public MyMsgListener()
    {
    }

    ~MyMsgListener()
    {
    }

    public override ons.Action consume(Message value, ConsumeContext context)
    {
        Byte[] text = Encoding.Default.GetBytes(value.getBody());
        Console.WriteLine(Encoding.UTF8.GetString(text));
        return ons.Action.CommitMessage;
    }
}

public class ConsumerExampleForEx
{
    public ConsumerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // Configure your account based on the settings in the console. 
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // The group ID that you created in the Message Queue for Apache RocketMQ console.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
        // The topic that you created in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // Set a TCP endpoint. You can query the TCP endpoint in the TCP Endpoint section of the Instance Details page in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // Specify the log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
        // The clustering consumption mode. 
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
        // The broadcasting consumption mode. 
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);

        // Create a consumer instance. 
        PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);

        // Subscribe to topics. 
        consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());

        // Start the consumer instance. 
        consumer.start();

        // This setting is only for the demo. In actual production environments, you cannot exit the process. 
        Thread.Sleep(300000);

        // Before you exit the process, shut down the consumer instance. 
        consumer.shutdown();
    }
}        
                           
#include "ONSFactory.h"
using namespace ons;

// Create a MyMsgListener instance to consume messages. 
// After the pushConsumer object pulls the message, the consumer function of the instance is called. 
class MyMsgListener : public MessageListener
{

    public:

        MyMsgListener()
        {
        }

        virtual ~MyMsgListener()
        {
        }

        virtual Action consume(Message &message, ConsumeContext &context)
        {
            // Specify the logic to process messages. 
            return CommitMessage; //CONSUME_SUCCESS;
        }
};


int main(int argc, char* argv[])
{

    // The parameters that are required to create the pushConsumer object and make it work. 
    ONSFactoryProperty factoryInfo;
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");// The group ID that you created in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); // Set a TCP endpoint. You can query the TCP endpoint in the TCP Endpoint section of the Instance Details page in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// The topic that you created in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");// The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX");// The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
      // The clustering consumption mode. This mode is used by default. 
      // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
      // The broadcasting consumption mode. 
      // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);

    //create pushConsumer
    PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    // Specify the topic to which the pushConsumer object subscribes, and the corresponding tag. Register a message callback function. 
    MyMsgListener  msglistener;
    pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    //start pushConsumer
    pushConsumer->start();

    // Note: The shutdown() method can be called only when no messages are received. After the shutdown() method is called, the consumer object exits and no longer receives messages. 

    // Shut down the pushConsumer object. Before you exit the application, shut down the consumer object. Otherwise, memory leaks may occur. 
    pushConsumer->shutdown();
    return 0;

}

Check whether the message subscription is successful

  1. On the Instance Details page of the instance, click Groups in the left-side navigation pane.
  2. On the Groups page, click the TCP tab.
  3. Find the Group ID whose subscription status that you want to view, and click Details in the Actions column.
    If the value of Consumer Status is Online and the value of the Is Subscription Consistent parameter is Yes, the message subscription is successful.