このトピックでは、Simple Message Queue (旧称 MNS) (SMQ) SDK for Java を使用してキューからメッセージを受信する方法について説明します。
前提条件
SMQ SDK for Java がインストールされていること。詳細については、「SDK for Java をインストールする」をご参照ください。
エンドポイントとアクセス認証情報が構成されていること。詳細については、「エンドポイントとアクセス認証情報を構成する」をご参照ください。
権限付与情報
デフォルトでは、Alibaba Cloud アカウントでのみこの操作を呼び出すことができます。必要な権限を RAM ユーザーに付与した後でのみ、RAM ユーザーとしてこの操作を呼び出すことができます。次の表に、この操作の権限付与情報を示します。
名前 | 値 |
API | ReceiveMessage |
アクション | mns:ReceiveMessage |
リソース | acs:mns:$region:$accountid:/queues/$queueName/messages |
使用上の注意
コンシューマーはこの操作を呼び出して、キューからメッセージを受信できます。ReceiveMessage 操作は、受信したメッセージの状態を「非アクティブ」に変更します。メッセージが非アクティブ状態のままである期間は、キューの
VisibilityTimeoutパラメーターによって指定されます。コンシューマーが
VisibilityTimeoutパラメーターで指定された期間内にメッセージを消費した後、コンシューマーは DeleteMessage 操作を呼び出してメッセージを削除する必要があります。そうしないと、メッセージはアクティブ状態になり、再度消費される可能性があります。
メッセージ本文のエンコード方式
メッセージ本文に特殊文字が含まれていない場合は、Base64 エンコードを使用しないことをお勧めします。
メッセージを送信するには、
message.setMessageBodyAsRawStringメソッドを使用してメッセージ本文を設定します。メッセージを受信するには、
message.getMessageBodyAsRawStringメソッドを使用してメッセージ本文を取得します。
サンプルコード
サンプルコードの詳細については、「ReceiveMessageDemo」をご参照ください。
package com.aliyun.mns.sample.queue;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.ServiceHandlingRequiredException;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import java.util.List;
/**
* 1. Alibaba Cloud の仕様に基づいて、環境に AccessKey ID と AccessKey シークレットを構成します。
* 2. 次の内容に基づいて、${"user.home"}/.aliyun-mns.properties ファイルを構成します。
* mns.endpoint=http://xxxxxxx
* mns.msgBodyBase64Switch=true/false
*/
public class ReceiveMessageDemo {
/**
* メッセージ本文を Base64 でエンコードするかどうかを指定します。
*/
private static final Boolean IS_BASE64 = Boolean.valueOf(ServiceSettings.getMNSPropertyValue("msgBodyBase64Switch","false"));
public static void main(String[] args) {
String queueName = "cloud-queue-demo";
// Alibaba Cloud の仕様に基づいて、環境に AccessKey ID と AccessKey シークレットを構成します。
CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
// このクライアントは一度だけ初期化する必要があります
MNSClient client = account.getMNSClient();
CloudQueue queue = client.getQueueRef(queueName);
// ラウンドロビン方式でメッセージを取得して処理します。
loopReceive(queue, client);
// メッセージ処理後にクライアントを無効にします。
client.close();
}
private static void loopReceive(CloudQueue queue, MNSClient client) {
while (true) {
// ラウンドロビン方式でメッセージを受信します。
try {
// キューからメッセージを受信します。これは基本的な方法です。
singleReceive(queue);
// ロングポーリングメカニズムを使用して、一度に複数のメッセージを受信します。これは推奨される方法です。
longPollingBatchReceive(queue);
} catch (ClientException ce) {
System.out.println("クライアントと MNS サービス間のネットワーク接続に問題があります。"
+ "ネットワークと DNS の可用性をご確認ください。");
// クライアント例外: ネットワークジッターが原因でリトライがトリガーされます。
} catch (ServiceException se) {
if (se.getErrorCode().equals("QueueNotExist")) {
System.out.println("キューが存在しません。使用する前にキューを作成してください");
client.close();
return;
} else if (se.getErrorCode().equals("TimeExpired")) {
System.out.println("リクエストがタイムアウトしました。ローカルマシンの時計をご確認ください");
return;
}
// サーバー例外: ネットワークジッターが原因でリトライがトリガーされます。
} catch (Exception e) {
System.out.println("不明な例外が発生しました!e:"+e.getMessage());
// その他の例外: ネットワークジッターが原因でリトライがトリガーされます。
}
}
}
private static void longPollingBatchReceive(CloudQueue queue) throws ServiceHandlingRequiredException {
System.out.println("=============longPollingBatchReceive 開始=============");
// 一度に受信できるメッセージの最大数。
int batchSize = 15;
// ロングポーリング期間。単位: 秒。
int waitSeconds = 15;
List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
if (messages != null && messages.size() > 0) {
for (Message message : messages) {
printMsgAndDelete(queue,message);
}
}
System.out.println("=============longPollingBatchReceive 終了=============");
}
private static void singleReceive(CloudQueue queue) throws ServiceHandlingRequiredException {
System.out.println("=============singleReceive 開始=============");
Message popMsg = queue.popMessage();
printMsgAndDelete(queue, popMsg);
System.out.println("=============singleReceive 終了=============");
}
private static void printMsgAndDelete(CloudQueue queue, Message popMsg) throws ServiceHandlingRequiredException {
if (popMsg != null) {
System.out.println("メッセージハンドル: " + popMsg.getReceiptHandle());
System.out.println("メッセージ本文: " + (IS_BASE64 ? popMsg.getMessageBody() : popMsg.getMessageBodyAsRawString()));
System.out.println("メッセージ ID: " + popMsg.getMessageId());
System.out.println("メッセージデキューカウント:" + popMsg.getDequeueCount());
//<<特別なロジックを追加します。>>
// メッセージの消費に成功したら、メッセージを削除することを忘れないでください。
queue.deleteMessage(popMsg.getReceiptHandle());
System.out.println("メッセージの削除に成功しました。\n");
}
}
}