すべてのプロダクト
Search
ドキュメントセンター

Simple Message Queue (formerly MNS):1 つのプロデューサークライアントから複数のコンシューマークライアントにメッセージを送信する

最終更新日:Jan 13, 2025

Simple Message Queue (旧称 MNS) は、複数のコンシューマークライアントが 1 つのプロデューサークライアントからメッセージをプルできるようにするメッセージモデルを提供します。このトピックでは、このモデルを使用して、1 つのプロデューサークライアントから複数のコンシューマークライアントにメッセージを送信する方法について説明します。

説明

この例では、Java 用 SDK を使用しています。他のプログラミング言語の SDK については、新バージョン SDK のリファレンス (推奨) をご参照ください。

前提条件

  • IntelliJ IDEA がインストールされていること。詳細については、IntelliJ IDEA をご参照ください。

    IntelliJ IDEA または Eclipse を使用できます。この例では、IntelliJ IDEA を使用します。

  • JDK 1.8 以降がインストールされていること。詳細については、Java のダウンロード をご参照ください。

  • Maven 2.5 以降がインストールされていること。詳細については、Apache Maven のダウンロード をご参照ください。

背景情報

SMQ は、ほとんどのビジネスシナリオの要件を満たすことができる、キューベースおよびトピックベースのメッセージモデルを提供します。

  • キューベースのメッセージモデル: 1 つのクライアントが SMQ キューにメッセージを送信します。複数のクライアントが、キューからメッセージをプルすることでメッセージを消費します。

  • トピックベースのメッセージモデル: 1 つのクライアントが SMQ トピックにメッセージを送信します。メッセージは SMQ サーバーから複数のクライアントに自動的にプッシュされます。

SMQ では、SMQ サーバーからクライアントにメッセージをプッシュすることで、クライアントがリアルタイムでメッセージを消費できます。ただし、このシナリオでは、コンシューマークライアントのエンドポイントはメッセージを受信するために公開されます。企業のプライベートネットワークなど、特定のシナリオでは、コンシューマークライアントのエンドポイントを公開することはできません。このような場合、コンシューマークライアントは SMQ サーバーからメッセージをプルすることでメッセージを消費できます。SMQ のキューとトピックを組み合わせることで、コンシューマークライアントのエンドポイントを公開することなく、1 つのクライアントから複数のコンシューマークライアントにメッセージを送信できます。

ソリューション

特定のキューエンドポイントを使用して、トピックへのサブスクリプションを作成します。その後、メッセージはトピックからキューにプッシュされ、コンシューマークライアントはキューからメッセージをプルできます。この場合、コンシューマークライアントのエンドポイントを公開することなく、1 つのクライアントから複数のクライアントにメッセージを送信できます。次の図は、このプロセスを示しています。消息流

Java 依存関係ライブラリをインストールする

  1. IntelliJ IDEA で Java プロジェクトを作成します。

  2. pom.xml ファイルに次の依存関係を追加して、Java 依存関係ライブラリをインポートします。

    <dependency>
        <groupId>com.aliyun.mns</groupId>
        <artifactId>aliyun-sdk-mns</artifactId>
        <version>1.1.9.2</version>
    </dependency>

API 操作

Java 1.1.8 用 SDK は、前述のソリューションをサポートするために CloudPullTopic クラスを提供します。MNSClient によって提供される次の API 操作を呼び出して、CloudPullTopic オブジェクトを作成できます。

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)

次の表は、上記のステートメントで設定できるパラメーターについて説明しています。

  • TopicMeta: トピックのメタデータを指定します。

  • QueueMeta: キューのメタデータを指定します。

  • queueNameList: 特定のトピックのメッセージがプッシュされるキューのリストを指定します。

  • needCreateQueue: queueNameList パラメーターで指定されたキューを作成するかどうかを指定します。

  • queueMetaTemplate: キューのメタデータテンプレートを指定します。

サンプルコード

package doc;

// SMQ クラスを指定します。
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.QueueMeta;
import com.aliyun.mns.model.TopicMeta;
import com.aliyun.mns.client.CloudPullTopic;
import com.aliyun.mns.model.TopicMessage;
import com.aliyun.mns.model.RawTopicMessage;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.ClientException;
import java.util.Vector;

public class DemoTopicMessageBroadcast {
    public static void main(String[] args) {
      
// Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret、および MNS のエンドポイントを取得します。
    CloudAccount account = new CloudAccount(
    ServiceSettings.getMNSAccessKeyId(),
    ServiceSettings.getMNSAccessKeySecret(),
    ServiceSettings.getMNSAccountEndpoint());
    MNSClient client = account.getMNSClient();

// コンシューマーを作成します。
Vector<String> consumerNameList = new Vector<String>();
String consumerName1 = "consumer001";
String consumerName2 = "consumer002";
String consumerName3 = "consumer003";
consumerNameList.add(consumerName1);
consumerNameList.add(consumerName2);
consumerNameList.add(consumerName3);
QueueMeta queueMetaTemplate = new QueueMeta();
queueMetaTemplate.setPollingWaitSeconds(30);

try{

    // トピックを作成します。
    String topicName = "demo-topic-for-pull";
    TopicMeta topicMeta = new TopicMeta();
    topicMeta.setTopicName(topicName);
    CloudPullTopic pullTopic = client.createPullTopic(topicMeta, consumerNameList, true, queueMetaTemplate);

    // トピックにメッセージをパブリッシュします。
    String messageBody = "broadcast message to all the consumers:hello the world.";
    // オリジナルメッセージが送信された場合は、getMessageBodyAsRawString を使用してメッセージ本文を解析します。
    TopicMessage tMessage = new RawTopicMessage();
    tMessage.setBaseMessageBody(messageBody);
    pullTopic.publishMessage(tMessage);


    // メッセージを受信します。
    CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
    CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
    CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);

    Message consumer1Msg = queueForConsumer1.popMessage(30);
    if(consumer1Msg != null)
    {
        System.out.println("consumer1 receive message:" + consumer1Msg.getMessageBodyAsRawString());
    } else{
        System.out.println("the queue is empty");
    }

    Message consumer2Msg = queueForConsumer2.popMessage(30);
    if(consumer2Msg != null)
    {
        System.out.println("consumer2 receive message:" + consumer2Msg.getMessageBodyAsRawString());
    }else{
        System.out.println("the queue is empty");
    }

    Message consumer3Msg = queueForConsumer3.popMessage(30);
    if(consumer3Msg != null)
    {
        System.out.println("consumer3 receive message:" + consumer3Msg.getMessageBodyAsRawString());
    }else{
        System.out.println("the queue is empty");
    }

    // トピックを削除します。
    pullTopic.delete();
} catch(ClientException ce) {
    System.out.println("Something wrong with the network connection between client and SMQ service."
            + "Please check your network and DNS availability.");
    ce.printStackTrace();
} catch(ServiceException se) {

    se.printStackTrace();
}

    client.close();
    }
}