This topic describes how to use Simple Message Queue (formerly MNS) (SMQ) SDK for Java to receive messages from a queue.
Prerequisites
SMQ SDK for Java is installed. For more information, see Install SDK for Java.
An endpoint and an access credential are configured. For more information, see Configure endpoints and access credentials.
Authorization information
By default, you can call this operation only with an Alibaba Cloud account. You can call this operation as a Resource Access Management (RAM) user only after you grant the required permissions to the RAM user. The following table describes the authorization information of this operation.
Name | Value |
API | ReceiveMessage |
Action | mns:ReceiveMessage |
Resource | acs:mns:$region:$accountid:/queues/$queueName/messages |
Usage notes
A consumer can call this operation to receive messages from a queue. The ReceiveMessage operation changes the state of the received messages to Inactive. The period within which the messages remain in the Inactive state is specified by the
VisibilityTimeoutparameter of the queue.After the consumer consumes the messages within the period specified by the
VisibilityTimeoutparameter, the consumer must call the DeleteMessage operation to delete the messages. Otherwise, the messages enter the Active state and can be consumed again.
Encoding methods for message bodies
If the message body does not contain special characters, we recommend that you do not use Base64 encoding.
To send a message, use the
message.setMessageBodyAsRawStringmethod to set the message body.To receive a message, use the
message.getMessageBodyAsRawStringmethod to obtain the message body.
Sample code
For more information about sample code, see ReceiveMessageDemo.
package com.aliyun.mns.sample.queue;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.ServiceHandlingRequiredException;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import java.util.List;
/**
* 1. Configure the AccessKey ID and AccessKey secret in the environment based on Alibaba Cloud specifications.
* 2.Configure the ${"user.home"}/.aliyun-mns.properties file based on the following content:
* mns.endpoint=http://xxxxxxx
* mns.msgBodyBase64Switch=true/false
*/
public class ReceiveMessageDemo {
/**
* Specify whether to encode the message body in Base64.
*/
private static final Boolean IS_BASE64 = Boolean.valueOf(ServiceSettings.getMNSPropertyValue("msgBodyBase64Switch","false"));
public static void main(String[] args) {
String queueName = "cloud-queue-demo";
// Configure the AccessKey ID and AccessKey secret in the environment based on Alibaba Cloud specifications.
CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
//this client need only initialize once
MNSClient client = account.getMNSClient();
CloudQueue queue = client.getQueueRef(queueName);
// Obtain and process messages in a round-robin manner.
loopReceive(queue, client);
// Disable the client after message processing.
client.close();
}
private static void loopReceive(CloudQueue queue, MNSClient client) {
while (true) {
// Receive messages in round-robin manner.
try {
// Receive a message from the queue. This is the basic method.
singleReceive(queue);
// Receive multiple messages at a time by using the long polling mechanism. This is the recommended method.
longPollingBatchReceive(queue);
} catch (ClientException ce) {
System.out.println("Something wrong with the network connection between client and MNS service."
+ "Please check your network and DNS availablity.");
// Client exception: retry is triggered due to network jitter.
} catch (ServiceException se) {
if (se.getErrorCode().equals("QueueNotExist")) {
System.out.println("Queue is not exist. Please create queue before use");
client.close();
return;
} else if (se.getErrorCode().equals("TimeExpired")) {
System.out.println("The request is time expired. Please check your local machine timeclock");
return;
}
// Server exception: retry is triggered due to network jitter.
} catch (Exception e) {
System.out.println("Unknown exception happened!e:"+e.getMessage());
// Other exception: retry is triggered due to network jitter.
}
}
}
private static void longPollingBatchReceive(CloudQueue queue) throws ServiceHandlingRequiredException {
System.out.println("=============start longPollingBatchReceive=============");
// The maximum number of messages that can be received at a time.
int batchSize = 15;
// The long polling period. Unit: seconds.
int waitSeconds = 15;
List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
if (messages != null && messages.size() > 0) {
for (Message message : messages) {
printMsgAndDelete(queue,message);
}
}
System.out.println("=============end longPollingBatchReceive=============");
}
private static void singleReceive(CloudQueue queue) throws ServiceHandlingRequiredException {
System.out.println("=============start singleReceive=============");
Message popMsg = queue.popMessage();
printMsgAndDelete(queue, popMsg);
System.out.println("=============end singleReceive=============");
}
private static void printMsgAndDelete(CloudQueue queue, Message popMsg) throws ServiceHandlingRequiredException {
if (popMsg != null) {
System.out.println("message handle: " + popMsg.getReceiptHandle());
System.out.println("message body: " + (IS_BASE64 ? popMsg.getMessageBody() : popMsg.getMessageBodyAsRawString()));
System.out.println("message id: " + popMsg.getMessageId());
System.out.println("message dequeue count:" + popMsg.getDequeueCount());
//<<to add your special logic.>>
//remember to delete message when consume message successfully.
queue.deleteMessage(popMsg.getReceiptHandle());
System.out.println("delete message successfully.\n");
}
}
}