This topic describes how to use Message Service (MNS) (SMQ) SDK for Java to subscribe to a topic to receive messages.
Prerequisites
SMQ SDK for Java is installed. For more information, see Install SDK for Java.
An endpoint and an access credential are configured. For more information, see Configure endpoints and access credentials.
Encoding methods for message bodies
SIMPLIFIED: If the message body does not contain special characters, we recommend that you do not use Base64 encoding.
To send a message to a topic, use the
RawTopicMessage
method to initialize the message object.To consume a message from a queue, use the
message.getMessageBodyAsRawString()
method to obtain the message body.
JSON or XML: If the strings are transmitted in a text format such as JSON or XML, we recommend that you use Base64 encoding.
To send a message to a topic, use the TopicMessage method to initialize the message object. In this case, the message body is Base64-encoded and stored in the Message field for transmission.
To consume a message from a queue, use the
message.getMessageBodyAsRawString();
method to obtain the value of the Message field, and then perform Base64 decoding.JSONObject object = new JSONObject(message.getMessageBodyAsRawString()); String jsonMessageData = String.valueOf(object.get("Message")); String messageBody = new String(Base64.decodeBase64(jsonMessageData));
Sample code
For more information about how to download the sample code, see ConsumerQueueForTopicDemo.java.
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import java.io.StringReader;
import java.util.List;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.commons.codec.binary.Base64;
import org.json.JSONException;
import org.json.JSONObject;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
/**
* In topic-based messaging, queues are classified into the following types: XML, JSON, and simple, which differ from one another in Base64 encryption. For more information, see the following section.
* 1. Configure the AccessKey ID and AccessKey secret in the environment based on Alibaba Cloud specifications.
* 2. Configure the ${"user.home"}/.aliyun-mns.properties file based on the following content:
* mns.endpoint=http://xxxxxxx
* mns.msgBodyBase64Switch=true/false
*/
public class ConsumerQueueForTopicDemo {
/**
* Specify whether to encode the message body in Base64.
*/
private static final Boolean IS_BASE64 = Boolean.valueOf(ServiceSettings.getMNSPropertyValue("msgBodyBase64Switch","false"));
public static void main(String[] args) {
String QUEUE_NAME = "TestQueue";
// Configure the AccessKey ID and AccessKey secret in the environment based on Alibaba Cloud specifications.
CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
//this client need only initialize once
MNSClient client = account.getMNSClient();
CloudQueue queue = client.getQueueRef(QUEUE_NAME);
try {
longPollingBatchReceive(queue);
} catch (ClientException ce) {
System.out.println("Something wrong with the network connection between client and MNS service."
+ "Please check your network and DNS availablity.");
ce.printStackTrace();
} catch (ServiceException se) {
if (se.getErrorCode().equals("QueueNotExist")) {
System.out.println("Queue is not exist. Please create queue before use");
} else if (se.getErrorCode().equals("TimeExpired")) {
System.out.println("The request is time expired. Please check your local machine timeclock");
}
se.printStackTrace();
} catch (Exception e) {
System.out.println("Unknown exception happened!");
e.printStackTrace();
}
client.close();
}
private static void longPollingBatchReceive(CloudQueue queue) {
System.out.println("=============start longPollingBatchReceive=============");
// The maximum number of messages that can be received at a time.
int batchSize = 15;
// The long polling period. Unit: seconds.
int waitSeconds = 15;
List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
if (messages != null && messages.size() > 0) {
for (Message message : messages) {
System.out.println("message handle: " + message.getReceiptHandle());
System.out.println("message body: " + message.getOriginalMessageBody());
System.out.println("message body real data: " + getMessageBodyData(message));
System.out.println("message id: " + message.getMessageId());
System.out.println("message dequeue count:" + message.getDequeueCount());
//<<to add your special logic.>>
//remember to delete message when consume message successfully.
queue.deleteMessage(message.getReceiptHandle());
System.out.println("delete message successfully.\n");
}
}
System.out.println("=============end longPollingBatchReceive=============");
}
private static String getMessageBodyData(Message message){
if (message == null){
return null;
}
String originalMessageBody = message.getOriginalMessageBody();
// 1. Parse the message body into a JSON object.
try {
JSONObject object = new JSONObject(originalMessageBody);
String jsonMessageData = String.valueOf(object.get("Message"));
System.out.println("message body type: JSON,value:"+jsonMessageData );
return IS_BASE64? new String(Base64.decodeBase64(jsonMessageData)): jsonMessageData;
} catch (JSONException ex1) {
// If the message body is not parsed into a JSON object, check whether the message body can be parsed into an XML file.
}
// 2. Parse the message body into an XML file.
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
Document doc = builder.parse(new InputSource(new StringReader(originalMessageBody)));
Element root = doc.getDocumentElement();
NodeList nodeList = root.getElementsByTagName("Message");
String content = nodeList.item(0).getTextContent();
System.out.println("message body type: XML,value:"+content );
return IS_BASE64? new String(Base64.decodeBase64(content)): content;
} catch (Exception ex) {
// The XML file is invalid.
}
// If the message body is not parsed into a JSON object or an XML file, the parsed message body is considered a simple text.
System.out.println("message body type: SIMPLE" );
return IS_BASE64 ? message.getMessageBody() : message.getMessageBodyAsRawString();
}
}