全部产品
Search
文档中心

Simple Message Queue (formerly MNS):Fan out messages from one producer to multiple consumers

更新时间:Mar 01, 2026

Simple Message Queue (formerly MNS) mendukung pola gabungan topik dan antrian yang mengirimkan setiap pesan yang dipublikasikan ke beberapa antrian konsumen. Konsumen menarik (pull) pesan dari antrian masing-masing, sehingga titik akhir mereka tetap pribadi. Tutorial ini menjelaskan arsitektur fan-out ini menggunakan SMQ Java SDK.

Catatan

Tutorial ini menggunakan SDK untuk Java. Untuk SDK dalam bahasa lain, lihat Referensi untuk SDK Versi Baru (Direkomendasikan).

Perbandingan model perpesanan

SMQ menyediakan dua model perpesanan dasar. Pilihan yang tepat bergantung pada cara konsumen menerima pesan dan apakah titik akhir mereka dapat diekspos.

ModelCara kerjaVisibilitas konsumenPaling cocok untuk
Berbasis AntrianSatu client mengirim pesan ke sebuah antrian. Beberapa konsumen menarik pesan, tetapi setiap pesan hanya dikirim ke satu konsumen.Titik akhir tetap pribadiDistribusi tugas point-to-point
Berbasis TopikSatu client mempublikasikan pesan ke sebuah topik. Server SMQ mendorong (push) pesan ke semua konsumen yang berlangganan.Titik akhir harus dieksposNotifikasi push Real-time
Gabungan (tutorial ini)Satu client mempublikasikan ke sebuah topik. Topik tersebut mendorong pesan ke beberapa antrian. Setiap konsumen menarik dari antriannya sendiri.Titik akhir tetap pribadiFan-out dengan konsumen pribadi

Pola gabungan ini cocok untuk skenario di mana sebuah pesan harus sampai ke beberapa konsumen yang titik akhirnya tidak dapat diekspos—misalnya, konsumen yang berjalan di dalam jaringan pribadi.

Contoh skenario: Sistem pemrosesan pesanan mempublikasikan setiap pesanan baru ke sebuah topik. Antrian terpisah memberi masukan ke layanan fulfillment, analitik, dan notifikasi. Setiap layanan menarik pesanan dari antriannya sendiri secara independen, tanpa mengekspos titik akhir inbound.

Cara kerja

Pola gabungan ini merangkai sebuah topik dengan beberapa antrian melalui langganan:

  1. Buat sebuah topik dan satu antrian untuk setiap konsumen.

  2. Berlangganan setiap antrian ke topik tersebut (titik akhir antrian menjadi target langganan).

  3. Produsen mempublikasikan pesan ke topik.

  4. SMQ menyalin pesan tersebut ke setiap antrian yang berlangganan.

  5. Setiap konsumen menarik salinannya dari antrian masing-masing.

消息流

Karena konsumen menarik dari antrian alih-alih menerima pesan yang didorong, titik akhir mereka tidak pernah diekspos ke server SMQ atau satu sama lain.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

Instal dependensi Java

Tambahkan dependensi berikut ke file pom.xml proyek Java Anda:

<dependency>
    <groupId>com.aliyun.mns</groupId>
    <artifactId>aliyun-sdk-mns</artifactId>
    <version>1.1.9.2</version>
</dependency>

Referensi API CloudPullTopic

SDK untuk Java 1.1.8 memperkenalkan kelas CloudPullTopic yang mengenkapsulasi pola gabungan topik dan antrian. Buat objek CloudPullTopic melalui MNSClient:

// Bentuk lengkap: buat antrian secara otomatis dengan templat metadata antrian
public CloudPullTopic createPullTopic(
    TopicMeta topicMeta,
    Vector<String> queueNameList,
    boolean needCreateQueue,
    QueueMeta queueMetaTemplate
)

// Bentuk singkat: gunakan antrian yang sudah ada (tanpa pembuatan otomatis)
public CloudPullTopic createPullTopic(
    TopicMeta topicMeta,
    Vector<String> queueNameList
)
ParameterDeskripsi
topicMetaMetadata topik (nama, properti).
queueNameListDaftar nama antrian. Setiap antrian menerima salinan dari setiap pesan yang dipublikasikan.
needCreateQueueApakah akan membuat antrian yang tercantum dalam queueNameList. Atur ke true untuk membuatnya secara otomatis.
queueMetaTemplateTemplat metadata antrian yang diterapkan ke semua antrian yang dibuat otomatis (misalnya, waktu tunggu polling).

Publikasikan dan konsumsi pesan

Contoh berikut menggunakan CloudPullTopic untuk menyiapkan fan-out, mempublikasikan pesan, dan mengonsumsinya dari tiga antrian terpisah.

package doc;

import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.client.CloudPullTopic;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.model.QueueMeta;
import com.aliyun.mns.model.TopicMeta;
import com.aliyun.mns.model.TopicMessage;
import com.aliyun.mns.model.RawTopicMessage;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.ClientException;
import java.util.Vector;

public class DemoTopicMessageBroadcast {
    public static void main(String[] args) {

        // Inisialisasi client dengan ID AccessKey, Rahasia AccessKey, dan titik akhir Anda.
        CloudAccount account = new CloudAccount(
            ServiceSettings.getMNSAccessKeyId(),
            ServiceSettings.getMNSAccessKeySecret(),
            ServiceSettings.getMNSAccountEndpoint());
        MNSClient client = account.getMNSClient();

        // Definisikan tiga antrian konsumen.
        Vector<String> consumerNameList = new Vector<String>();
        String consumerName1 = "consumer001";
        String consumerName2 = "consumer002";
        String consumerName3 = "consumer003";
        consumerNameList.add(consumerName1);
        consumerNameList.add(consumerName2);
        consumerNameList.add(consumerName3);

        // Atur polling panjang menjadi 30 detik untuk antrian yang dibuat otomatis.
        QueueMeta queueMetaTemplate = new QueueMeta();
        queueMetaTemplate.setPollingWaitSeconds(30);

        try {
            // --- Produsen: buat topik dan publikasikan pesan ---

            String topicName = "demo-topic-for-pull";
            TopicMeta topicMeta = new TopicMeta();
            topicMeta.setTopicName(topicName);

            // Buat topik dan buat otomatis tiga antrian konsumen.
            CloudPullTopic pullTopic = client.createPullTopic(
                topicMeta, consumerNameList, true, queueMetaTemplate);

            // Publikasikan pesan mentah ke topik.
            String messageBody = "broadcast message to all the consumers:hello the world.";
            TopicMessage tMessage = new RawTopicMessage();
            tMessage.setBaseMessageBody(messageBody);
            pullTopic.publishMessage(tMessage);

            // --- Konsumen: tarik pesan dari setiap antrian ---

            CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
            CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
            CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);

            // popMessage(30) menunggu hingga 30 detik untuk pesan (long polling).
            Message consumer1Msg = queueForConsumer1.popMessage(30);
            if (consumer1Msg != null) {
                System.out.println("consumer1 received: "
                    + consumer1Msg.getMessageBodyAsRawString());
            } else {
                System.out.println("consumer1 queue is empty");
            }

            Message consumer2Msg = queueForConsumer2.popMessage(30);
            if (consumer2Msg != null) {
                System.out.println("consumer2 received: "
                    + consumer2Msg.getMessageBodyAsRawString());
            } else {
                System.out.println("consumer2 queue is empty");
            }

            Message consumer3Msg = queueForConsumer3.popMessage(30);
            if (consumer3Msg != null) {
                System.out.println("consumer3 received: "
                    + consumer3Msg.getMessageBodyAsRawString());
            } else {
                System.out.println("consumer3 queue is empty");
            }

            // Pembersihan: hapus topik, langganan-nya, dan semua antrian terkait.
            pullTopic.delete();

        } catch (ClientException ce) {
            System.out.println("Kesalahan jaringan antara client dan SMQ. "
                + "Periksa pengaturan jaringan dan DNS Anda.");
            ce.printStackTrace();
        } catch (ServiceException se) {
            se.printStackTrace();
        }

        client.close();
    }
}

Verifikasi hasil

Setelah menjalankan kode contoh, pastikan setiap konsumen menerima pesan tersebut. Output yang diharapkan:

consumer1 received: broadcast message to all the consumers:hello the world.
consumer2 received: broadcast message to all the consumers:hello the world.
consumer3 received: broadcast message to all the consumers:hello the world.

Jika suatu antrian menampilkan "queue is empty", pesan mungkin belum terkirim. Tingkatkan timeout popMessage atau periksa apakah topik dan langganan telah dibuat dengan benar.

Penjelasan kode

LangkahApa yang terjadi
Inisialisasi CloudAccountMelakukan autentikasi dengan ID AccessKey, Rahasia AccessKey, dan titik akhir SMQ Anda.
Panggil createPullTopicMembuat topik, membuat tiga antrian konsumen, dan berlangganan setiap antrian ke topik tersebut.
Panggil publishMessageMempublikasikan pesan mentah (RawTopicMessage) ke topik. SMQ menyalinnya ke semua antrian yang berlangganan.
Panggil popMessage(30)Setiap konsumen menarik salinannya dengan timeout long-polling 30 detik.
Panggil pullTopic.delete()Menghapus topik, langganan-nya, dan semua antrian terkait.
Contoh ini mempublikasikan pesan mentah (RawTopicMessage). Urai pesan mentah dengan getMessageBodyAsRawString(). Jika Anda menggunakan Base64TopicMessage sebagai gantinya, urai dengan getMessageBodyAsString().

Langkah selanjutnya