After you create all the resources in the console, you can call a TCP SDK to send and subscribe to messages in Message Queue for Apache RocketMQ.

Download and install a TCP SDK

We recommend that you use the following multi-language TCP SDKs provided by Message Queue for Apache RocketMQ. Obtain the client SDK in a specific language as needed.

Call TCP SDKs to send messages

After you obtain the client SDK in a specific language, you can run the following sample code to send messages.

                           
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.OnExceptionContext;
    import com.aliyun.openservices.ons.api.Producer;
    import com.aliyun.openservices.ons.api.SendCallback;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;

    import java.util.Properties;

    public class ProducerTest {
        public static void main(String[] args) {
            Properties properties = new Properties();
            // The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
            properties.put(PropertyKeyConst.AccessKey, "XXX");
            // The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
            properties.put(PropertyKeyConst.SecretKey, "XXX");
            // The transmission timeout period, in milliseconds.
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
            // The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
            properties.put(PropertyKeyConst.NAMESRV_ADDR,
              "XXX");

            Producer producer = ONSFactory.createProducer(properties);
            // Before you send a message, call the start method once to start the producer.
            producer.start();

            Message msg = new Message(
                    // The topic of the message.
                    "TopicTestMQ",
                    // The message tag, which is similar to a Gmail tag. It is used to sort messages, enabling the consumer to filter messages on the Message Queue for Apache RocketMQ broker based on the specified criteria.
                    "TagA",
                    // The message body in any binary format. Message Queue for Apache RocketMQ does not process the message body. The producer and consumer must agree to the serialization and deserialization methods.
                    "Hello MQ".getBytes());

            // The message key. We recommend that you keep it globally unique. A unique identifier enables you to query a message and resend it in the console if you fail to receive the message.
            // Note: Messages can still be sent and received even if this attribute is not set.
            msg.setKey("ORDERID_100");

            // Send the message in asynchronous mode. The callback function returns the sending result to the client. 
            producer.sendAsync(msg, new SendCallback() {
                @Override
                public void onSuccess(final SendResult sendResult) {
                    // The message is sent.
                    System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
                }

                @Override
                public void onException(OnExceptionContext context) {
                    // The message failed to be sent and must be resent. The system can resend the message or store message data persistently.
                    System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
                }
            });

            // The message ID can be obtained before the callback function returns the result.
            System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());

            // Before you exit the application, shut down the producer object. Note: You can choose not to shut down the producer object.
            producer.shutdown();
        }
    }                
                           
using System;
using ons;

public class ProducerExampleForEx
{
    public ProducerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // Configure your account according to the settings in the console.
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // The group ID that you created in the console.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // The topic that you created in the console.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // The log path.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // Create a producer instance.
        // Note: A producer instance is thread-secure and can be used to send messages of different topics. Each thread
        // needs only one producer instance.
        Producer producer = ONSFactory.getInstance().createProducer(factoryInfo);

        // Start the consumer instance.
        producer.start();

        // Create a message object.
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        msg.setKey(Guid.NewGuid().ToString());
        for (int i = 0; i < 32; i++) {
            try
            {
                SendResultONS sendResult = producer.send(msg);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            }
            catch (Exception ex)
            {
                Console.WriteLine("send failure{0}", ex.ToString());
            }
        }

        // Before you exit your thread, shut down the producer instance.
        producer.shutdown();

    }
}    
                           
#include "ONSFactory.h"
#include "ONSClientException.h"

using namespace ons;

int main()
{

    // Create a producer instance and configure the information required for sending messages.
    ONSFactoryProperty factoryInfo;   
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");// The group ID that you created in the console.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); // The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// The topic that you created in the console.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");// The message content.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");// The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX" );// The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.


    //create producer;
    Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);

    // Before you send a message, call the start method once to start the producer.
    pProducer->start();

    Message msg(
            //Message Topic
            factoryInfo.getPublishTopics(),
            // The message tag, which is similar to a Gmail tag. It is used to sort messages, enabling the consumer to filter messages on the Message Queue for Apache RocketMQ broker based on the specified criteria.
            "TagA",
            // The message body, which cannot be empty. Message Queue for Apache RocketMQ does not process the message body. The producer and consumer must agree to the serialization and deserialization methods.
            factoryInfo.getMessageContent()
    );

    // The message key. We recommend that you keep it globally unique.
    // A unique identifier enables you to query a message and resend it in the console if you fail to receive the message.
    // Note: Messages can still be sent and received even if this attribute is not set.
    msg.setKey("ORDERID_100");

    // Send the message. The message is sent if no exception occurs.
    try
    {
        SendResultONS sendResult = pProducer->send(msg);
    }
    catch(ONSClientException & e)
    {
        // Customize exception processing details.
    }
    // Before you exit your application, shut down the producer object. Otherwise, it may cause memory leakage.
    pProducer->shutdown();

    return 0;
}

On the Topics page, find the target topic and click Send Message in the Actions column.

Check whether messages are sent

After a message is sent, you can check its sending status in the console by performing the following operations:

  1. On the details page of the target instance, choose Message Query > By Topic from the left-side navigation pane.
  2. In the search box, enter the topic to which the message is sent, and click Searchto check its sending status.

    Stored Atindicates the time when the Message Queue for Apache RocketMQ broker stores the message. If the message can be queried out, the message has been sent to the Message Queue for Apache RocketMQ broker.

Notice This step demonstrates the situation where Message Queue for Apache RocketMQ is used for the first time and the consumer has not been started yet. Therefore, no consumption data is displayed in the message status information. To start the consumer and subscribe to messages, go to the next step. For more information about the message status, see Query messages and Query a message trace.

Call TCP SDKs to subscribe to messages

Once a message is sent, you need to start the consumer to subscribe to the message. Run the following sample code as needed to start the consumer and test message subscription. You must set related parameters correctly according to the instructions.

                           
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class ConsumerTest {
   public static void main(String[] args) {
       Properties properties = new Properties();
        // The group ID that you created in the console.
       properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
       properties.put(PropertyKeyConst.AccessKey, "XXX");
        // The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
       properties.put(PropertyKeyConst.SecretKey, "XXX");
        // The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
       properties.put(PropertyKeyConst.NAMESRV_ADDR,
         "XXX");
          // Clustering subscription (default).
          // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
          // Broadcasting subscription.
          // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

       Consumer consumer = ONSFactory.createConsumer(properties);
       consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { // Subscribe to multiple tags.
           public Action consume(Message message, ConsumeContext context) {
               System.out.println("Receive: " + message);
               return Action.CommitMessage;
           }
       });

        //Subscribe to another topic. To unsubscribe from this topic, delete the subscription code and restart the consumer.
        consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // Subscribe to all tags.
           public Action consume(Message message, ConsumeContext context) {
               System.out.println("Receive: " + message);
               return Action.CommitMessage;
           }
       });

       consumer.start();
       System.out.println("Consumer Started");
   }
}            
                           
using System;
using System.Threading;
using System.Text;
using ons;

// The callback function to be executed when a message is pulled from the broker.
public class MyMsgListener : MessageListener
{
    public MyMsgListener()
    {
    }

    ~MyMsgListener()
    {
    }

    public override ons.Action consume(Message value, ConsumeContext context)
    {
        Byte[] text = Encoding.Default.GetBytes(value.getBody());
        Console.WriteLine(Encoding.UTF8.GetString(text));
        return ons.Action.CommitMessage;
    }
}

public class ConsumerExampleForEx
{
    public ConsumerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // Configure your account according to the settings in the console.
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // The group ID that you created in the console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
        // The topic that you created in the console.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // The log path.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
        // Clustering consumption.
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
        // Broadcasting consumption.
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);

        // Create a consumer instance.
        PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);

        // Subscribe to topics.
        consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());

        // Start the consumer instance.
        consumer.start();

        // This setting is only for the demo. In actual production environments, you cannot exit the process.
        Thread.Sleep(300000);

        // Before you exit the process, shut down the consumer instance.
        consumer.shutdown();
    }
}        
                           
#include "ONSFactory.h"
using namespace ons;

// MyMsgListener: Create an instance for message consumption.
// After pushConsumer pulls the message, it proactively calls the consumer function of the instance.
class MyMsgListener : public MessageListener
{

    public:

        MyMsgListener()
        {
        }

        virtual ~MyMsgListener()
        {
        }

        virtual Action consume(Message &message, ConsumeContext &context)
        {
            // Customize message processing details.
            return CommitMessage; //CONSUME_SUCCESS;
        }
};


int main(int argc, char* argv[])
{

    // The parameter required for creating pushConsumer and its normal work. It is a required parameter.
    ONSFactoryProperty factoryInfo;
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");// The group ID that you created in the console.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); // The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// The topic that you created in the console.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");// The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey,  "XXX");// The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
      // Clustering subscription (default).
      // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
      // Broadcasting subscription.
      // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);

    //create pushConsumer
    PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    // Specify the message topic and tag that pushConsumer subscribes to and register the message callback function.
    MyMsgListener  msglistener;
    pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    //start pushConsumer
    pushConsumer->start();

    // Note: The shutdown() method can be called only when no messages are received. After the shutdown() method is called, the consumer instance exits and no longer receives messages.

    // Before you exit the application, shut down pushConsumer. Otherwise, it may cause memory leakage.
    pushConsumer->shutdown();
    return 0;

}

Check whether the message subscription is successful

After the preceding steps are completed, you can check whether the consumer has been started in the console, that is, whether the message subscription is successful.

  1. In the left-side navigation pane, choose Groups > TCP.
  2. Find the target group ID and click Subscription in the Actions column.
    If the value of Online is Yes and the value of Subscription Consistency is Yes, the subscription is successful.