This topic describes how to use the SDK for Java to concurrently send and receive multiple messages.

During a concurrency test, you can perform the following operations:
  • Specify the ThreadNum and TotalSeconds parameters.
  • Retrieve the number of queries per second (QPS) by dividing the total number of queries by the seconds of the test period.

Step 1: Initialize parameters

Create a text file named perf_test_config.properties in the working directory, and specify the following parameters. Note that you must specify an existing queue.

Endpoint=
AccessId=
AccessKey=
QueueName=JavaSDKPerfTestQueue
ThreadNum=200
TotalSeconds=180

Parameters

  • ThreadNum: the number of threads that are used to send and receive messages. MNS provides high concurrency and scalability.
  • TotalSeconds: the number of seconds that are used for concurrency testing.

Step 2: Run the code

package com.aliyun.mns;

import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.http.ClientConfiguration;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.model.QueueMeta;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;

public class JavaSDKPerfTest {
    private static MNSClient client = null;
    private static AtomicLong totalCount = new AtomicLong(0);

    private static String endpoint = null;
    private static String accessId = null;
    private static String accessKey = null;

    private static String queueName = "JavaSDKPerfTestQueue";
    private static int threadNum = 100;
    private static int totalSeconds = 180;

    protected static boolean parseConf() {
        String confFilePath = System.getProperty("user.dir") + System.getProperty("file.separator") + "perf_test_config.properties"; 

        BufferedInputStream bis = null;
        try {
            bis = new BufferedInputStream(new FileInputStream(confFilePath));
            if (bis == null) {
                System.out.println("ConfFile not opened: " + confFilePath);
                return false;
            }
        } catch (FileNotFoundException e) {
            System.out.println("ConfFile not found: " + confFilePath);
            return false;
        }

        // Load files.
        Properties properties = new Properties();
        try {
            properties.load(bis);
        } catch(IOException e) {
            System.out.println("Load ConfFile Failed: " + e.getMessage());
            return false;
        } finally {
            try {
                bis.close();
            } catch (Exception e) {
               
            }
        }

        // Initialize the member parameters.
        endpoint = properties.getProperty("Endpoint");
        System.out.println("Endpoint: " + endpoint);
        accessId = properties.getProperty("AccessId");
        System.out.println("AccessId: " + accessId);
        accessKey = properties.getProperty("AccessKey");

        queueName = properties.getProperty("QueueName", queueName);
        System.out.println("QueueName: " + queueName);
        threadNum = Integer.parseInt(properties.getProperty("ThreadNum", String.valueOf(threadNum)));
        System.out.println("ThreadNum: " + threadNum);
        totalSeconds = Integer.parseInt(properties.getProperty("TotalSeconds", String.valueOf(totalSeconds)));
        System.out.println("TotalSeconds: " + totalSeconds);

        return true;
    }

    public static void main(String[] args) {
        if (! parseConf()) {
            return;
        }

        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setMaxConnections(threadNum);
        clientConfiguration.setMaxConnectionsPerRoute(threadNum);

        CloudAccount cloudAccount = new CloudAccount(accessId, accessKey, endpoint, clientConfiguration);
        client = cloudAccount.getMNSClient();

        CloudQueue queue = client.getQueueRef(queueName);
        queue.delete();

        QueueMeta meta = new QueueMeta();
        meta.setQueueName(queueName);
        client.createQueue(meta);

        // 1. Send messages.
        ArrayList<Thread> threads = new ArrayList<Thread>();
        for (int i = 0; i < threadNum; ++i){
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    try {
                        CloudQueue queue = client.getQueueRef(queueName);
                        Message message = new Message();
                        message.setMessageBody("Test");
                        long count = 0;
                        long startTime = System.currentTimeMillis();

                        System.out.println(startTime);
                        long endTime = startTime + totalSeconds * 1000;
                        while (true) {
                            for (int i = 0; i < 50; ++i) {
                                queue.putMessage(message);
                            }
                            count += 50;

                            if (System.currentTimeMillis() >= endTime) {
                                break;
                            }
                        }

                        System.out.println(System.currentTimeMillis());
                        System.out.println("Thread" + Thread.currentThread().getName() + ": " + String.valueOf(count));
                        totalCount.addAndGet(count);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, String.valueOf(i));
            thread.start();
            threads.add(thread);
        }

        for (int i = 0; i < threadNum; ++i) {
            try {
                threads.get(i).join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("SendMessage QPS: ");
        System.out.println(totalCount.get() / totalSeconds);

        // 2. Receive the messages.
        threads.clear();
        totalCount.set(0);

        totalSeconds = totalSeconds; 
        // 3. Make sure that the messages are received.
        for (int i = 0; i < threadNum; ++i){
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    try {
                        CloudQueue queue = client.getQueueRef(queueName);
                        long count = 0;
                        long endTime = System.currentTimeMillis() + totalSeconds * 1000;

                        while (true) {
                            for (int i = 0; i < 50; ++i) {
                                queue.popMessage();
                            }
                            count += 50;

                            if (System.currentTimeMillis() >= endTime) {
                                break;
                            }
                        }

                        System.out.println("Thread" + Thread.currentThread().getName() + ": " + String.valueOf(count));
                        totalCount.addAndGet(count);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, String.valueOf(i));
            thread.start();
            threads.add(thread);
        }

        for (int i = 0; i < threadNum; ++i) {
            try {
                threads.get(i).join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("ReceiveMessage QPS: ");
        System.out.println(totalCount.get() / totalSeconds);

        return;
    }
}