すべてのプロダクト
Search
ドキュメントセンター

Simple Message Queue (formerly MNS):サンプルコードを使用してメッセージを受信する

最終更新日:Mar 13, 2025

このトピックでは、Simple Message Queue (旧称 MNS) (SMQ) SDK for Java を使用してキューからメッセージを受信する方法について説明します。

前提条件

権限付与情報

デフォルトでは、Alibaba Cloud アカウントでのみこの操作を呼び出すことができます。必要な権限を RAM ユーザーに付与した後でのみ、RAM ユーザーとしてこの操作を呼び出すことができます。次の表に、この操作の権限付与情報を示します。

名前

API

ReceiveMessage

アクション

mns:ReceiveMessage

リソース

acs:mns:$region:$accountid:/queues/$queueName/messages

使用上の注意

  • コンシューマーはこの操作を呼び出して、キューからメッセージを受信できます。ReceiveMessage 操作は、受信したメッセージの状態を「非アクティブ」に変更します。メッセージが非アクティブ状態のままである期間は、キューの VisibilityTimeout パラメーターによって指定されます。

  • コンシューマーが VisibilityTimeout パラメーターで指定された期間内にメッセージを消費した後、コンシューマーは DeleteMessage 操作を呼び出してメッセージを削除する必要があります。そうしないと、メッセージはアクティブ状態になり、再度消費される可能性があります。

メッセージ本文のエンコード方式

メッセージ本文に特殊文字が含まれていない場合は、Base64 エンコードを使用しないことをお勧めします。

  • メッセージを送信するには、message.setMessageBodyAsRawString メソッドを使用してメッセージ本文を設定します。

  • メッセージを受信するには、message.getMessageBodyAsRawString メソッドを使用してメッセージ本文を取得します。

サンプルコード

サンプルコードの詳細については、「ReceiveMessageDemo」をご参照ください。

package com.aliyun.mns.sample.queue;

import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.ServiceHandlingRequiredException;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import java.util.List;

/**
 * 1. Alibaba Cloud の仕様に基づいて、環境に AccessKey ID と AccessKey シークレットを構成します。
 * 2. 次の内容に基づいて、${"user.home"}/.aliyun-mns.properties ファイルを構成します。
 *           mns.endpoint=http://xxxxxxx
 *           mns.msgBodyBase64Switch=true/false
 */
public class ReceiveMessageDemo {
    /**
     * メッセージ本文を Base64 でエンコードするかどうかを指定します。
     */
    private static final Boolean IS_BASE64 = Boolean.valueOf(ServiceSettings.getMNSPropertyValue("msgBodyBase64Switch","false"));

    public static void main(String[] args) {
        String queueName = "cloud-queue-demo";

        // Alibaba Cloud の仕様に基づいて、環境に AccessKey ID と AccessKey シークレットを構成します。
        CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
        // このクライアントは一度だけ初期化する必要があります
        MNSClient client = account.getMNSClient();
        CloudQueue queue = client.getQueueRef(queueName);

        // ラウンドロビン方式でメッセージを取得して処理します。
        loopReceive(queue, client);

        // メッセージ処理後にクライアントを無効にします。
        client.close();
    }

    private static void loopReceive(CloudQueue queue, MNSClient client) {
        while (true) {
            // ラウンドロビン方式でメッセージを受信します。
            try {
                // キューからメッセージを受信します。これは基本的な方法です。
                singleReceive(queue);

                // ロングポーリングメカニズムを使用して、一度に複数のメッセージを受信します。これは推奨される方法です。
                longPollingBatchReceive(queue);
            } catch (ClientException ce) {
                System.out.println("クライアントと MNS サービス間のネットワーク接続に問題があります。"
                    + "ネットワークと DNS の可用性をご確認ください。");
                // クライアント例外: ネットワークジッターが原因でリトライがトリガーされます。
            } catch (ServiceException se) {
                if (se.getErrorCode().equals("QueueNotExist")) {
                    System.out.println("キューが存在しません。使用する前にキューを作成してください");
                    client.close();
                    return;
                } else if (se.getErrorCode().equals("TimeExpired")) {
                    System.out.println("リクエストがタイムアウトしました。ローカルマシンの時計をご確認ください");
                    return;
                }
                // サーバー例外: ネットワークジッターが原因でリトライがトリガーされます。
            } catch (Exception e) {
                System.out.println("不明な例外が発生しました!e:"+e.getMessage());
                // その他の例外: ネットワークジッターが原因でリトライがトリガーされます。
            }

        }
    }

    private static void longPollingBatchReceive(CloudQueue queue) throws ServiceHandlingRequiredException {

        System.out.println("=============longPollingBatchReceive 開始=============");

        // 一度に受信できるメッセージの最大数。
        int batchSize = 15;
        // ロングポーリング期間。単位: 秒。
        int waitSeconds = 15;

        List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
        if (messages != null && messages.size() > 0) {

            for (Message message : messages) {
                printMsgAndDelete(queue,message);
            }
        }

        System.out.println("=============longPollingBatchReceive 終了=============");

    }

    private static void singleReceive(CloudQueue queue) throws ServiceHandlingRequiredException {
        System.out.println("=============singleReceive 開始=============");

        Message popMsg = queue.popMessage();
        printMsgAndDelete(queue, popMsg);

        System.out.println("=============singleReceive 終了=============");
    }

    private static void printMsgAndDelete(CloudQueue queue, Message popMsg) throws ServiceHandlingRequiredException {
        if (popMsg != null) {
            System.out.println("メッセージハンドル: " + popMsg.getReceiptHandle());
            System.out.println("メッセージ本文: " + (IS_BASE64 ? popMsg.getMessageBody() : popMsg.getMessageBodyAsRawString()));
            System.out.println("メッセージ ID: " + popMsg.getMessageId());
            System.out.println("メッセージデキューカウント:" + popMsg.getDequeueCount());
            //<<特別なロジックを追加します。>>

            // メッセージの消費に成功したら、メッセージを削除することを忘れないでください。
            queue.deleteMessage(popMsg.getReceiptHandle());
            System.out.println("メッセージの削除に成功しました。\n");
        }
    }

}