Simple Message Queue (formerly MNS) mendukung pola gabungan topik dan antrian yang mengirimkan setiap pesan yang dipublikasikan ke beberapa antrian konsumen. Konsumen menarik (pull) pesan dari antrian masing-masing, sehingga titik akhir mereka tetap pribadi. Tutorial ini menjelaskan arsitektur fan-out ini menggunakan SMQ Java SDK.
Tutorial ini menggunakan SDK untuk Java. Untuk SDK dalam bahasa lain, lihat Referensi untuk SDK Versi Baru (Direkomendasikan).
Perbandingan model perpesanan
SMQ menyediakan dua model perpesanan dasar. Pilihan yang tepat bergantung pada cara konsumen menerima pesan dan apakah titik akhir mereka dapat diekspos.
| Model | Cara kerja | Visibilitas konsumen | Paling cocok untuk |
|---|---|---|---|
| Berbasis Antrian | Satu client mengirim pesan ke sebuah antrian. Beberapa konsumen menarik pesan, tetapi setiap pesan hanya dikirim ke satu konsumen. | Titik akhir tetap pribadi | Distribusi tugas point-to-point |
| Berbasis Topik | Satu client mempublikasikan pesan ke sebuah topik. Server SMQ mendorong (push) pesan ke semua konsumen yang berlangganan. | Titik akhir harus diekspos | Notifikasi push Real-time |
| Gabungan (tutorial ini) | Satu client mempublikasikan ke sebuah topik. Topik tersebut mendorong pesan ke beberapa antrian. Setiap konsumen menarik dari antriannya sendiri. | Titik akhir tetap pribadi | Fan-out dengan konsumen pribadi |
Pola gabungan ini cocok untuk skenario di mana sebuah pesan harus sampai ke beberapa konsumen yang titik akhirnya tidak dapat diekspos—misalnya, konsumen yang berjalan di dalam jaringan pribadi.
Contoh skenario: Sistem pemrosesan pesanan mempublikasikan setiap pesanan baru ke sebuah topik. Antrian terpisah memberi masukan ke layanan fulfillment, analitik, dan notifikasi. Setiap layanan menarik pesanan dari antriannya sendiri secara independen, tanpa mengekspos titik akhir inbound.
Cara kerja
Pola gabungan ini merangkai sebuah topik dengan beberapa antrian melalui langganan:
Buat sebuah topik dan satu antrian untuk setiap konsumen.
Berlangganan setiap antrian ke topik tersebut (titik akhir antrian menjadi target langganan).
Produsen mempublikasikan pesan ke topik.
SMQ menyalin pesan tersebut ke setiap antrian yang berlangganan.
Setiap konsumen menarik salinannya dari antrian masing-masing.

Karena konsumen menarik dari antrian alih-alih menerima pesan yang didorong, titik akhir mereka tidak pernah diekspos ke server SMQ atau satu sama lain.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
IntelliJ IDEA atau Eclipse yang terinstal (tutorial ini menggunakan IntelliJ IDEA)
JDK 1.8 atau versi lebih baru
Apache Maven 2.5 atau versi lebih baru
Instal dependensi Java
Tambahkan dependensi berikut ke file pom.xml proyek Java Anda:
<dependency>
<groupId>com.aliyun.mns</groupId>
<artifactId>aliyun-sdk-mns</artifactId>
<version>1.1.9.2</version>
</dependency>Referensi API CloudPullTopic
SDK untuk Java 1.1.8 memperkenalkan kelas CloudPullTopic yang mengenkapsulasi pola gabungan topik dan antrian. Buat objek CloudPullTopic melalui MNSClient:
// Bentuk lengkap: buat antrian secara otomatis dengan templat metadata antrian
public CloudPullTopic createPullTopic(
TopicMeta topicMeta,
Vector<String> queueNameList,
boolean needCreateQueue,
QueueMeta queueMetaTemplate
)
// Bentuk singkat: gunakan antrian yang sudah ada (tanpa pembuatan otomatis)
public CloudPullTopic createPullTopic(
TopicMeta topicMeta,
Vector<String> queueNameList
)| Parameter | Deskripsi |
|---|---|
topicMeta | Metadata topik (nama, properti). |
queueNameList | Daftar nama antrian. Setiap antrian menerima salinan dari setiap pesan yang dipublikasikan. |
needCreateQueue | Apakah akan membuat antrian yang tercantum dalam queueNameList. Atur ke true untuk membuatnya secara otomatis. |
queueMetaTemplate | Templat metadata antrian yang diterapkan ke semua antrian yang dibuat otomatis (misalnya, waktu tunggu polling). |
Publikasikan dan konsumsi pesan
Contoh berikut menggunakan CloudPullTopic untuk menyiapkan fan-out, mempublikasikan pesan, dan mengonsumsinya dari tiga antrian terpisah.
package doc;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.client.CloudPullTopic;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.model.QueueMeta;
import com.aliyun.mns.model.TopicMeta;
import com.aliyun.mns.model.TopicMessage;
import com.aliyun.mns.model.RawTopicMessage;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.ClientException;
import java.util.Vector;
public class DemoTopicMessageBroadcast {
public static void main(String[] args) {
// Inisialisasi client dengan ID AccessKey, Rahasia AccessKey, dan titik akhir Anda.
CloudAccount account = new CloudAccount(
ServiceSettings.getMNSAccessKeyId(),
ServiceSettings.getMNSAccessKeySecret(),
ServiceSettings.getMNSAccountEndpoint());
MNSClient client = account.getMNSClient();
// Definisikan tiga antrian konsumen.
Vector<String> consumerNameList = new Vector<String>();
String consumerName1 = "consumer001";
String consumerName2 = "consumer002";
String consumerName3 = "consumer003";
consumerNameList.add(consumerName1);
consumerNameList.add(consumerName2);
consumerNameList.add(consumerName3);
// Atur polling panjang menjadi 30 detik untuk antrian yang dibuat otomatis.
QueueMeta queueMetaTemplate = new QueueMeta();
queueMetaTemplate.setPollingWaitSeconds(30);
try {
// --- Produsen: buat topik dan publikasikan pesan ---
String topicName = "demo-topic-for-pull";
TopicMeta topicMeta = new TopicMeta();
topicMeta.setTopicName(topicName);
// Buat topik dan buat otomatis tiga antrian konsumen.
CloudPullTopic pullTopic = client.createPullTopic(
topicMeta, consumerNameList, true, queueMetaTemplate);
// Publikasikan pesan mentah ke topik.
String messageBody = "broadcast message to all the consumers:hello the world.";
TopicMessage tMessage = new RawTopicMessage();
tMessage.setBaseMessageBody(messageBody);
pullTopic.publishMessage(tMessage);
// --- Konsumen: tarik pesan dari setiap antrian ---
CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);
// popMessage(30) menunggu hingga 30 detik untuk pesan (long polling).
Message consumer1Msg = queueForConsumer1.popMessage(30);
if (consumer1Msg != null) {
System.out.println("consumer1 received: "
+ consumer1Msg.getMessageBodyAsRawString());
} else {
System.out.println("consumer1 queue is empty");
}
Message consumer2Msg = queueForConsumer2.popMessage(30);
if (consumer2Msg != null) {
System.out.println("consumer2 received: "
+ consumer2Msg.getMessageBodyAsRawString());
} else {
System.out.println("consumer2 queue is empty");
}
Message consumer3Msg = queueForConsumer3.popMessage(30);
if (consumer3Msg != null) {
System.out.println("consumer3 received: "
+ consumer3Msg.getMessageBodyAsRawString());
} else {
System.out.println("consumer3 queue is empty");
}
// Pembersihan: hapus topik, langganan-nya, dan semua antrian terkait.
pullTopic.delete();
} catch (ClientException ce) {
System.out.println("Kesalahan jaringan antara client dan SMQ. "
+ "Periksa pengaturan jaringan dan DNS Anda.");
ce.printStackTrace();
} catch (ServiceException se) {
se.printStackTrace();
}
client.close();
}
}Verifikasi hasil
Setelah menjalankan kode contoh, pastikan setiap konsumen menerima pesan tersebut. Output yang diharapkan:
consumer1 received: broadcast message to all the consumers:hello the world.
consumer2 received: broadcast message to all the consumers:hello the world.
consumer3 received: broadcast message to all the consumers:hello the world.Jika suatu antrian menampilkan "queue is empty", pesan mungkin belum terkirim. Tingkatkan timeout popMessage atau periksa apakah topik dan langganan telah dibuat dengan benar.
Penjelasan kode
| Langkah | Apa yang terjadi |
|---|---|
Inisialisasi CloudAccount | Melakukan autentikasi dengan ID AccessKey, Rahasia AccessKey, dan titik akhir SMQ Anda. |
Panggil createPullTopic | Membuat topik, membuat tiga antrian konsumen, dan berlangganan setiap antrian ke topik tersebut. |
Panggil publishMessage | Mempublikasikan pesan mentah (RawTopicMessage) ke topik. SMQ menyalinnya ke semua antrian yang berlangganan. |
Panggil popMessage(30) | Setiap konsumen menarik salinannya dengan timeout long-polling 30 detik. |
Panggil pullTopic.delete() | Menghapus topik, langganan-nya, dan semua antrian terkait. |
Contoh ini mempublikasikan pesan mentah (RawTopicMessage). Urai pesan mentah dengangetMessageBodyAsRawString(). Jika Anda menggunakanBase64TopicMessagesebagai gantinya, urai dengangetMessageBodyAsString().
Langkah selanjutnya
Jelajahi pola perpesanan SMQ lainnya di Referensi untuk SDK Versi Baru (Direkomendasikan).