Topik ini menjelaskan cara menggunakan Simple Message Queue (sebelumnya MNS) (SMQ) SDK untuk Java untuk menerima pesan dari antrian.
Prasyarat
SMQ SDK untuk Java telah diinstal. Untuk informasi lebih lanjut, lihat Instal SDK untuk Java.
Titik akhir dan kredensial akses telah dikonfigurasi. Untuk informasi lebih lanjut, lihat Konfigurasikan Titik Akhir dan Kredensial Akses.
Informasi otorisasi
Secara default, Anda hanya dapat memanggil operasi ini dengan akun Alibaba Cloud. Anda dapat memanggil operasi ini sebagai Pengguna Resource Access Management (RAM) hanya setelah memberikan izin yang diperlukan kepada pengguna RAM tersebut. Tabel berikut menjelaskan informasi otorisasi untuk operasi ini.
Nama | Nilai |
API | ReceiveMessage |
Aksi | mns:ReceiveMessage |
Sumber daya | acs:mns:$region:$accountid:/queues/$queueName/messages |
Catatan penggunaan
Konsumen dapat memanggil operasi ini untuk menerima pesan dari antrian. Operasi ReceiveMessage mengubah status pesan yang diterima menjadi Inaktif. Periode selama pesan tetap dalam keadaan Inaktif ditentukan oleh parameter
VisibilityTimeoutdari antrian.Setelah konsumen mengonsumsi pesan dalam periode yang ditentukan oleh parameter
VisibilityTimeout, konsumen harus memanggil operasi DeleteMessage untuk menghapus pesan. Jika tidak, pesan akan masuk ke status Aktif dan dapat dikonsumsi lagi.
Metode pengkodean untuk badan pesan
Jika badan pesan tidak mengandung karakter khusus, disarankan untuk tidak menggunakan pengkodean Base64.
Untuk mengirim pesan, gunakan metode
message.setMessageBodyAsRawStringuntuk menetapkan badan pesan.Untuk menerima pesan, gunakan metode
message.getMessageBodyAsRawStringuntuk mendapatkan badan pesan.
Kode contoh
Untuk informasi lebih lanjut tentang kode contoh, lihat ReceiveMessageDemo.
package com.aliyun.mns.sample.queue;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.ServiceHandlingRequiredException;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import java.util.List;
/**
* 1. Konfigurasikan ID AccessKey dan Rahasia AccessKey di lingkungan berdasarkan spesifikasi Alibaba Cloud.
* 2.Konfigurasikan file ${"user.home"}/.aliyun-mns.properties berdasarkan konten berikut:
* mns.endpoint=http://xxxxxxx
* mns.msgBodyBase64Switch=true/false
*/
public class ReceiveMessageDemo {
/**
* Tentukan apakah akan mengkodekan badan pesan dalam Base64.
*/
private static final Boolean IS_BASE64 = Boolean.valueOf(ServiceSettings.getMNSPropertyValue("msgBodyBase64Switch","false"));
public static void main(String[] args) {
String queueName = "cloud-queue-demo";
// Konfigurasikan ID AccessKey dan Rahasia AccessKey di lingkungan berdasarkan spesifikasi Alibaba Cloud.
CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
//client ini hanya perlu diinisialisasi sekali
MNSClient client = account.getMNSClient();
CloudQueue queue = client.getQueueRef(queueName);
// Dapatkan dan proses pesan secara bergiliran.
loopReceive(queue, client);
// Nonaktifkan client setelah pemrosesan pesan.
client.close();
}
private static void loopReceive(CloudQueue queue, MNSClient client) {
while (true) {
// Terima pesan secara bergiliran.
try {
// Terima pesan dari antrian. Ini adalah metode dasar.
singleReceive(queue);
// Terima beberapa pesan sekaligus menggunakan mekanisme polling panjang. Ini adalah metode yang direkomendasikan.
longPollingBatchReceive(queue);
} catch (ClientException ce) {
System.out.println("Ada masalah dengan koneksi jaringan antara client dan layanan MNS."
+ "Silakan periksa ketersediaan jaringan dan DNS Anda.");
// Pengecualian client: retry dipicu karena jitter jaringan.
} catch (ServiceException se) {
if (se.getErrorCode().equals("QueueNotExist")) {
System.out.println("Antrian tidak ada. Silakan buat antrian sebelum digunakan");
client.close();
return;
} else if (se.getErrorCode().equals("TimeExpired")) {
System.out.println("Permintaan telah kedaluwarsa. Silakan periksa waktu sistem lokal Anda");
return;
}
// Pengecualian server: retry dipicu karena jitter jaringan.
} catch (Exception e) {
System.out.println("Pengecualian tidak dikenal terjadi!e:"+e.getMessage());
// Pengecualian lainnya: retry dipicu karena jitter jaringan.
}
}
}
private static void longPollingBatchReceive(CloudQueue queue) throws ServiceHandlingRequiredException {
System.out.println("=============mulai longPollingBatchReceive=============");
// Jumlah maksimum pesan yang dapat diterima sekaligus.
int batchSize = 15;
// Periode polling panjang. Satuan: detik.
int waitSeconds = 15;
List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
if (messages != null && messages.size() > 0) {
for (Message message : messages) {
printMsgAndDelete(queue,message);
}
}
System.out.println("=============akhir longPollingBatchReceive=============");
}
private static void singleReceive(CloudQueue queue) throws ServiceHandlingRequiredException {
System.out.println("=============mulai singleReceive=============");
Message popMsg = queue.popMessage();
printMsgAndDelete(queue, popMsg);
System.out.println("=============akhir singleReceive=============");
}
private static void printMsgAndDelete(CloudQueue queue, Message popMsg) throws ServiceHandlingRequiredException {
if (popMsg != null) {
System.out.println("penanganan pesan: " + popMsg.getReceiptHandle());
System.out.println("badan pesan: " + (IS_BASE64 ? popMsg.getMessageBody() : popMsg.getMessageBodyAsRawString()));
System.out.println("id pesan: " + popMsg.getMessageId());
System.out.println("hitungan dequeue pesan:" + popMsg.getDequeueCount());
//<<tambahkan logika khusus Anda.>>
//ingat untuk menghapus pesan saat pesan berhasil dikonsumsi.
queue.deleteMessage(popMsg.getReceiptHandle());
System.out.println("pesan berhasil dihapus.\n");
}
}
}