全部产品
Search
文档中心

Simple Message Queue (formerly MNS):Lakukan pengujian konkurensi

更新时间:Jul 02, 2025

Topik ini menjelaskan cara menggunakan Simple Message Queue (sebelumnya MNS) SDK untuk Java untuk mengirim dan menerima beberapa pesan secara bersamaan.

Informasi latar belakang

Pengujian konkurensi adalah metode pengujian kinerja yang digunakan untuk memverifikasi kinerja dan stabilitas sistem perpesanan ketika memproses beberapa pesan atau permintaan secara bersamaan. Dalam pengujian ini, Anda dapat menentukan tingkat konkurensi dan durasi pengujian. Permintaan per detik (QPS) dihitung menggunakan rumus berikut: QPS = Jumlah total permintaan / Durasi pengujian.

Prasyarat

Konfigurasikan file properti

mns.accountendpoint=http://12xxxxxxxx.mns.cn-xxx.aliyuncs.com
mns.perf.queueName=Queue_Test # Nama antrian.
mns.perf.threadNum=2 # Jumlah thread konkuren.
mns.perf.durationTime=6 # Durasi pengujian. Satuan: detik.

Kode contoh

Untuk informasi lebih lanjut tentang cara mengunduh kode contoh, lihat JavaSDKPerfTest.java.

package com.aliyun.mns.sample.scenarios.perf;

import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.http.ClientConfiguration;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.common.utils.ThreadUtil;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;

/**
 Gunakan kode contoh untuk melakukan pengujian konkurensi.
 * Persiapan
 * 1. Konfigurasikan ID AccessKey dan Rahasia AccessKey di lingkungan berdasarkan spesifikasi Alibaba Cloud. 
 * 2. Konfigurasikan file ${"user.home"}/.aliyun-mns.properties berdasarkan konten berikut:
 *          mns.endpoint=http://xxxxxxx
 *          mns.perf.queueName=JavaSDKPerfTestQueue # Nama antrian.
 *          mns.perf.threadNum=200 # Jumlah thread konkuren.
 *          mns.perf.durationTime=180 # Durasi pengujian. Satuan: detik.
 */
public class JavaSDKPerfTest {
    private static MNSClient client = null;

    private static String endpoint = null;

    private static String queueName;
    private static int threadNum;

    /**
     * Durasi pengujian. Satuan: detik.
     */
    private static long durationTime;


    public static void main(String[] args) throws InterruptedException {
        if (!parseConf()) {
            return;
        }

        // 1. init client
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setMaxConnections(threadNum);
        clientConfiguration.setMaxConnectionsPerRoute(threadNum);
        CloudAccount cloudAccount = new CloudAccount(endpoint, clientConfiguration);
        client = cloudAccount.getMNSClient();

        // 2. reCreateQueue
        ReCreateUtil.reCreateQueue(client,queueName);
        // 3. SendMessage
        Function<CloudQueue,Message> sendFunction = new Function<CloudQueue, Message>() {
            @Override
            public Message apply(CloudQueue queue) {
                Message message = new Message();
                message.setMessageBody("BodyTest");
                return queue.putMessage(message);
            }
        };
        actionProcess("SendMessage", sendFunction , durationTime);
        // 4. Sekarang adalah ReceiveMessage
        Function<CloudQueue,Message> receiveFunction = new Function<CloudQueue, Message>() {
            @Override
            public Message apply(CloudQueue queue) {
                Message message = queue.popMessage();
                String handle = message == null?null:message.getReceiptHandle();
                if (StringUtils.isNotBlank(handle)) {
                    queue.deleteMessage(handle);
                }
                return message;
            }
        };
        actionProcess("ReceiveAndDelMessage", receiveFunction, durationTime);



        client.close();
        System.out.println("=======end=======");
    }

    private static void actionProcess(String actionName, final Function<CloudQueue, Message> function, final long durationSeconds) throws InterruptedException {
        System.out.println(actionName +" mulai!");

        final AtomicLong totalCount = new AtomicLong(0);

        ThreadPoolExecutor executor = ThreadUtil.initThreadPoolExecutorAbort();
        ThreadUtil.asyncWithReturn(executor, threadNum, new ThreadUtil.AsyncRunInterface() {
            @Override
            public void run() {
                try {
                    String threadName = Thread.currentThread().getName();

                    CloudQueue queue = client.getQueueRef(queueName);
                    Message message = new Message();
                    message.setMessageBody("BodyTest");
                    long count = 0;

                    Date startDate = new Date();
                    long startTime = startDate.getTime();

                    System.out.printf("[Thread%s]startTime:%s %n", threadName, getBjTime(startDate));
                    long endTime = startTime + durationSeconds * 1000L;
                    while (true) {
                        for (int i = 0; i < 50; ++i) {
                            function.apply(queue);
                        }
                        count += 50;

                        if (System.currentTimeMillis() >= endTime) {
                            break;
                        }
                    }

                    System.out.printf("[Thread%s]endTime:%s,count:%d %n", threadName, getBjTime(new Date()),count);

                    totalCount.addAndGet(count);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });


        executor.shutdown();
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            executor.shutdownNow();
        }

        System.out.println(actionName +" QPS: "+(totalCount.get() / durationSeconds));
    }


    protected static boolean parseConf() {

        // inisialisasi parameter anggota
        endpoint = ServiceSettings.getMNSAccountEndpoint();
        System.out.println("Endpoint: " + endpoint);

        queueName = ServiceSettings.getMNSPropertyValue("perf.queueName","JavaSDKPerfTestQueue");
        System.out.println("QueueName: " + queueName);
        threadNum = Integer.parseInt(ServiceSettings.getMNSPropertyValue("perf.threadNum","2"));
        System.out.println("ThreadNum: " + threadNum);
        durationTime = Long.parseLong(ServiceSettings.getMNSPropertyValue("perf.totalSeconds","6"));
        System.out.println("DurationTime: " + durationTime);

        return true;
    }

    /**
     * Kueri waktu dalam UTC+8.
     */
    private static String getBjTime(Date date){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        return sdf.format(date);
    }


    public interface Function<T, R> {

        /**
         * Terapkan fungsi ini ke argumen yang diberikan.
         *
         * @param t argumen fungsi
         * @return hasil fungsi
         */
        R apply(T t);

    }
}