全部產品
Search
文件中心

Simple Message Queue (formerly MNS):消費訊息樣本

更新時間:Feb 19, 2025

本文介紹如何使用Java SDK消費隊列中的訊息。

前提條件

授權資訊

預設僅限阿里雲帳號使用本介面,RAM使用者只有在被授予了相關API操作許可權後方可使用。本介面的授權資訊如下表所示。

Name

Value

API

ReceiveMessage

RAM授權操作

mns:ReceiveMessage

資源

acs:mns:$region:$accountid:/queues/$queueName/messages

使用說明

  • 該介面用於消費者消費隊列中的訊息,ReceiveMessage操作會將取得的訊息狀態變成Inactive,Inactive狀態的時間長度由Queue屬性VisibilityTimeout指定。

  • 消費者在VisibilityTimeout時間內消費成功後需要調用DeleteMessage介面刪除該訊息,否則該訊息將會重新變成Active狀態,又可被消費者重新消費。

訊息體編碼選擇

當訊息體無特殊字元時,建議您不使用Base64編碼。

  • 發送訊息時使用message.setMessageBodyAsRawString方法設定訊息體。

  • 接收訊息時使用message.getMessageBodyAsRawString方法擷取訊息體。

範例程式碼

範例程式碼下載,請參見ReceiveMessageDemo

package com.aliyun.mns.sample.queue;

import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.ServiceHandlingRequiredException;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import java.util.List;

/**
 * 1. 遵循阿里雲規範,env 設定 ak、sk。
 * 2. ${"user.home"}/.aliyun-mns.properties 檔案配置如下:
 *           mns.endpoint=http://xxxxxxx
 *           mns.msgBodyBase64Switch=true/false
 */
public class ReceiveMessageDemo {
    /**
     * 是否做 base64 編碼
     */
    private static final Boolean IS_BASE64 = Boolean.valueOf(ServiceSettings.getMNSPropertyValue("msgBodyBase64Switch","false"));

    public static void main(String[] args) {
        String queueName = "cloud-queue-demo";

        // 遵循阿里雲規範,env 設定 ak、sk。
        CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
        //this client need only initialize once
        MNSClient client = account.getMNSClient();
        CloudQueue queue = client.getQueueRef(queueName);

        // 輪詢調用 訊息擷取和處理
        loopReceive(queue, client);

        // 處理完成後關閉client
        client.close();
    }

    private static void loopReceive(CloudQueue queue, MNSClient client) {
        while (true) {
            // 迴圈執行
            try {
                // 基礎: 單次拉取
                singleReceive(queue);

                // 推薦: 使用的 長輪詢批量拉模數型
                longPollingBatchReceive(queue);
            } catch (ClientException ce) {
                System.out.println("Something wrong with the network connection between client and MNS service."
                    + "Please check your network and DNS availablity.");
                // 用戶端異常,預設為抖動,觸發下次重試
            } catch (ServiceException se) {
                if (se.getErrorCode().equals("QueueNotExist")) {
                    System.out.println("Queue is not exist.Please create queue before use");
                    client.close();
                    return;
                } else if (se.getErrorCode().equals("TimeExpired")) {
                    System.out.println("The request is time expired. Please check your local machine timeclock");
                    return;
                }
                // 其他的服務端異常,預設為抖動,觸發下次重試
            } catch (Exception e) {
                System.out.println("Unknown exception happened!e:"+e.getMessage());
                // 其他異常,預設為抖動,觸發下次重試
            }

        }
    }

    private static void longPollingBatchReceive(CloudQueue queue) throws ServiceHandlingRequiredException {

        System.out.println("=============start longPollingBatchReceive=============");

        // 一次性拉取 最多 xx 條訊息
        int batchSize = 15;
        // 長輪詢時間為 xx s
        int waitSeconds = 15;

        List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
        if (messages != null && messages.size() > 0) {

            for (Message message : messages) {
                printMsgAndDelete(queue,message);
            }
        }

        System.out.println("=============end longPollingBatchReceive=============");

    }

    private static void singleReceive(CloudQueue queue) throws ServiceHandlingRequiredException {
        System.out.println("=============start singleReceive=============");

        Message popMsg = queue.popMessage();
        printMsgAndDelete(queue, popMsg);

        System.out.println("=============end singleReceive=============");
    }

    private static void printMsgAndDelete(CloudQueue queue, Message popMsg) throws ServiceHandlingRequiredException {
        if (popMsg != null) {
            System.out.println("message handle: " + popMsg.getReceiptHandle());
            System.out.println("message body: " + (IS_BASE64 ? popMsg.getMessageBody() : popMsg.getMessageBodyAsRawString()));
            System.out.println("message id: " + popMsg.getMessageId());
            System.out.println("message dequeue count:" + popMsg.getDequeueCount());
            //<<to add your special logic.>>

            //remember to  delete message when consume message successfully.
            queue.deleteMessage(popMsg.getReceiptHandle());
            System.out.println("delete message successfully.\n");
        }
    }

}