Topik ini menjelaskan cara menggunakan Simple Message Queue (sebelumnya MNS) SDK untuk Java untuk mengirim dan menerima beberapa pesan secara bersamaan.
Informasi latar belakang
Pengujian konkurensi adalah metode pengujian kinerja yang digunakan untuk memverifikasi kinerja dan stabilitas sistem perpesanan ketika memproses beberapa pesan atau permintaan secara bersamaan. Dalam pengujian ini, Anda dapat menentukan tingkat konkurensi dan durasi pengujian. Permintaan per detik (QPS) dihitung menggunakan rumus berikut: QPS = Jumlah total permintaan / Durasi pengujian.
Prasyarat
SMQ SDK untuk Java telah diinstal. Untuk informasi lebih lanjut, lihat Instal SDK untuk Java.
Titik akhir dan kredensial akses telah dikonfigurasi. Untuk informasi lebih lanjut, lihat Konfigurasikan titik akhir dan kredensial akses.
Konfigurasikan file properti
mns.accountendpoint=http://12xxxxxxxx.mns.cn-xxx.aliyuncs.com
mns.perf.queueName=Queue_Test # Nama antrian.
mns.perf.threadNum=2 # Jumlah thread konkuren.
mns.perf.durationTime=6 # Durasi pengujian. Satuan: detik.Kode contoh
Untuk informasi lebih lanjut tentang cara mengunduh kode contoh, lihat JavaSDKPerfTest.java.
package com.aliyun.mns.sample.scenarios.perf;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.http.ClientConfiguration;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.common.utils.ThreadUtil;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
/**
Gunakan kode contoh untuk melakukan pengujian konkurensi.
* Persiapan
* 1. Konfigurasikan ID AccessKey dan Rahasia AccessKey di lingkungan berdasarkan spesifikasi Alibaba Cloud.
* 2. Konfigurasikan file ${"user.home"}/.aliyun-mns.properties berdasarkan konten berikut:
* mns.endpoint=http://xxxxxxx
* mns.perf.queueName=JavaSDKPerfTestQueue # Nama antrian.
* mns.perf.threadNum=200 # Jumlah thread konkuren.
* mns.perf.durationTime=180 # Durasi pengujian. Satuan: detik.
*/
public class JavaSDKPerfTest {
private static MNSClient client = null;
private static String endpoint = null;
private static String queueName;
private static int threadNum;
/**
* Durasi pengujian. Satuan: detik.
*/
private static long durationTime;
public static void main(String[] args) throws InterruptedException {
if (!parseConf()) {
return;
}
// 1. init client
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setMaxConnections(threadNum);
clientConfiguration.setMaxConnectionsPerRoute(threadNum);
CloudAccount cloudAccount = new CloudAccount(endpoint, clientConfiguration);
client = cloudAccount.getMNSClient();
// 2. reCreateQueue
ReCreateUtil.reCreateQueue(client,queueName);
// 3. SendMessage
Function<CloudQueue,Message> sendFunction = new Function<CloudQueue, Message>() {
@Override
public Message apply(CloudQueue queue) {
Message message = new Message();
message.setMessageBody("BodyTest");
return queue.putMessage(message);
}
};
actionProcess("SendMessage", sendFunction , durationTime);
// 4. Sekarang adalah ReceiveMessage
Function<CloudQueue,Message> receiveFunction = new Function<CloudQueue, Message>() {
@Override
public Message apply(CloudQueue queue) {
Message message = queue.popMessage();
String handle = message == null?null:message.getReceiptHandle();
if (StringUtils.isNotBlank(handle)) {
queue.deleteMessage(handle);
}
return message;
}
};
actionProcess("ReceiveAndDelMessage", receiveFunction, durationTime);
client.close();
System.out.println("=======end=======");
}
private static void actionProcess(String actionName, final Function<CloudQueue, Message> function, final long durationSeconds) throws InterruptedException {
System.out.println(actionName +" mulai!");
final AtomicLong totalCount = new AtomicLong(0);
ThreadPoolExecutor executor = ThreadUtil.initThreadPoolExecutorAbort();
ThreadUtil.asyncWithReturn(executor, threadNum, new ThreadUtil.AsyncRunInterface() {
@Override
public void run() {
try {
String threadName = Thread.currentThread().getName();
CloudQueue queue = client.getQueueRef(queueName);
Message message = new Message();
message.setMessageBody("BodyTest");
long count = 0;
Date startDate = new Date();
long startTime = startDate.getTime();
System.out.printf("[Thread%s]startTime:%s %n", threadName, getBjTime(startDate));
long endTime = startTime + durationSeconds * 1000L;
while (true) {
for (int i = 0; i < 50; ++i) {
function.apply(queue);
}
count += 50;
if (System.currentTimeMillis() >= endTime) {
break;
}
}
System.out.printf("[Thread%s]endTime:%s,count:%d %n", threadName, getBjTime(new Date()),count);
totalCount.addAndGet(count);
} catch (Exception e) {
e.printStackTrace();
}
}
});
executor.shutdown();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
System.out.println(actionName +" QPS: "+(totalCount.get() / durationSeconds));
}
protected static boolean parseConf() {
// inisialisasi parameter anggota
endpoint = ServiceSettings.getMNSAccountEndpoint();
System.out.println("Endpoint: " + endpoint);
queueName = ServiceSettings.getMNSPropertyValue("perf.queueName","JavaSDKPerfTestQueue");
System.out.println("QueueName: " + queueName);
threadNum = Integer.parseInt(ServiceSettings.getMNSPropertyValue("perf.threadNum","2"));
System.out.println("ThreadNum: " + threadNum);
durationTime = Long.parseLong(ServiceSettings.getMNSPropertyValue("perf.totalSeconds","6"));
System.out.println("DurationTime: " + durationTime);
return true;
}
/**
* Kueri waktu dalam UTC+8.
*/
private static String getBjTime(Date date){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
return sdf.format(date);
}
public interface Function<T, R> {
/**
* Terapkan fungsi ini ke argumen yang diberikan.
*
* @param t argumen fungsi
* @return hasil fungsi
*/
R apply(T t);
}
}