All Products
Search
Document Center

Simple Message Queue (formerly MNS):Use sample code to subscribe to a topic

Last Updated:Oct 28, 2024

This topic describes how to use Message Service (MNS) (SMQ) SDK for Java to subscribe to a topic to receive messages.

Prerequisites

Encoding methods for message bodies

  • SIMPLIFIED: If the message body does not contain special characters, we recommend that you do not use Base64 encoding.

    • To send a message to a topic, use the RawTopicMessage method to initialize the message object.

    • To consume a message from a queue, use the message.getMessageBodyAsRawString() method to obtain the message body.

  • JSON or XML: If the strings are transmitted in a text format such as JSON or XML, we recommend that you use Base64 encoding.

    • To send a message to a topic, use the TopicMessage method to initialize the message object. In this case, the message body is Base64-encoded and stored in the Message field for transmission.

    • To consume a message from a queue, use the message.getMessageBodyAsRawString(); method to obtain the value of the Message field, and then perform Base64 decoding.

      JSONObject object = new JSONObject(message.getMessageBodyAsRawString());
      String jsonMessageData = String.valueOf(object.get("Message"));
      String messageBody = new String(Base64.decodeBase64(jsonMessageData));

Sample code

For more information about how to download the sample code, see ConsumerQueueForTopicDemo.java.

import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import java.io.StringReader;
import java.util.List;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.commons.codec.binary.Base64;
import org.json.JSONException;
import org.json.JSONObject;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;

/**
 * In topic-based messaging, queues are classified into the following types: XML, JSON, and simple, which differ from one another in Base64 encryption. For more information, see the following section.
 * 1. Configure the AccessKey ID and AccessKey secret in the environment based on Alibaba Cloud specifications. 
 * 2. Configure the ${"user.home"}/.aliyun-mns.properties file based on the following content:
 *           mns.endpoint=http://xxxxxxx
 *           mns.msgBodyBase64Switch=true/false
 */
public class ConsumerQueueForTopicDemo {
    /**
     * Specify whether to encode the message body in Base64.
     */
    private static final Boolean IS_BASE64 = Boolean.valueOf(ServiceSettings.getMNSPropertyValue("msgBodyBase64Switch","false"));

    public static void main(String[] args) {
        String QUEUE_NAME = "TestQueue";

        // Configure the AccessKey ID and AccessKey secret in the environment based on Alibaba Cloud specifications. 
        CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
        //this client need only initialize once
        MNSClient client = account.getMNSClient();
        CloudQueue queue = client.getQueueRef(QUEUE_NAME);

        try {
            longPollingBatchReceive(queue);
        } catch (ClientException ce) {
            System.out.println("Something wrong with the network connection between client and MNS service."
                + "Please check your network and DNS availablity.");
            ce.printStackTrace();
        } catch (ServiceException se) {
            if (se.getErrorCode().equals("QueueNotExist")) {
                System.out.println("Queue is not exist. Please create queue before use");
            } else if (se.getErrorCode().equals("TimeExpired")) {
                System.out.println("The request is time expired. Please check your local machine timeclock");
            }
            se.printStackTrace();
        } catch (Exception e) {
            System.out.println("Unknown exception happened!");
            e.printStackTrace();
        }

        client.close();
    }

    private static void longPollingBatchReceive(CloudQueue queue) {
        System.out.println("=============start longPollingBatchReceive=============");

        // The maximum number of messages that can be received at a time.
        int batchSize = 15;
        // The long polling period. Unit: seconds.
        int waitSeconds = 15;

        List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
        if (messages != null && messages.size() > 0) {

            for (Message message : messages) {
                System.out.println("message handle: " + message.getReceiptHandle());
                System.out.println("message body: " + message.getOriginalMessageBody());
                System.out.println("message body real data: " + getMessageBodyData(message));
                System.out.println("message id: " + message.getMessageId());
                System.out.println("message dequeue count:" + message.getDequeueCount());
                //<<to add your special logic.>>

                //remember to  delete message when consume message successfully.
                queue.deleteMessage(message.getReceiptHandle());
                System.out.println("delete message successfully.\n");
            }
        }

        System.out.println("=============end longPollingBatchReceive=============");

    }

    private static String getMessageBodyData(Message message){
        if (message == null){
            return null;
        }
        String originalMessageBody = message.getOriginalMessageBody();

        // 1. Parse the message body into a JSON object.
        try {
            JSONObject object = new JSONObject(originalMessageBody);
            String jsonMessageData = String.valueOf(object.get("Message"));
            System.out.println("message body type: JSON,value:"+jsonMessageData );
            return IS_BASE64?  new String(Base64.decodeBase64(jsonMessageData)): jsonMessageData;
        } catch (JSONException ex1) {
            // If the message body is not parsed into a JSON object, check whether the message body can be parsed into an XML file.
        }

        // 2. Parse the message body into an XML file.
        try {
            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
            DocumentBuilder builder = factory.newDocumentBuilder();
            Document doc = builder.parse(new InputSource(new StringReader(originalMessageBody)));
            Element root = doc.getDocumentElement();
            NodeList nodeList = root.getElementsByTagName("Message");
            String content = nodeList.item(0).getTextContent();
            System.out.println("message body type: XML,value:"+content );

            return IS_BASE64?  new String(Base64.decodeBase64(content)): content;
        } catch (Exception ex) {
            // The XML file is invalid.
        }

        // If the message body is not parsed into a JSON object or an XML file, the parsed message body is considered a simple text.
        System.out.println("message body type: SIMPLE" );
        return IS_BASE64 ?  message.getMessageBody() : message.getMessageBodyAsRawString();

    }
}