Simple Message Queue (旧称 MNS) は、複数のコンシューマークライアントが 1 つのプロデューサークライアントからメッセージをプルできるようにするメッセージモデルを提供します。このトピックでは、このモデルを使用して、1 つのプロデューサークライアントから複数のコンシューマークライアントにメッセージを送信する方法について説明します。
この例では、Java 用 SDK を使用しています。他のプログラミング言語の SDK については、新バージョン SDK のリファレンス (推奨) をご参照ください。
前提条件
IntelliJ IDEA がインストールされていること。詳細については、IntelliJ IDEA をご参照ください。
IntelliJ IDEA または Eclipse を使用できます。この例では、IntelliJ IDEA を使用します。
JDK 1.8 以降がインストールされていること。詳細については、Java のダウンロード をご参照ください。
Maven 2.5 以降がインストールされていること。詳細については、Apache Maven のダウンロード をご参照ください。
背景情報
SMQ は、ほとんどのビジネスシナリオの要件を満たすことができる、キューベースおよびトピックベースのメッセージモデルを提供します。
キューベースのメッセージモデル: 1 つのクライアントが SMQ キューにメッセージを送信します。複数のクライアントが、キューからメッセージをプルすることでメッセージを消費します。
トピックベースのメッセージモデル: 1 つのクライアントが SMQ トピックにメッセージを送信します。メッセージは SMQ サーバーから複数のクライアントに自動的にプッシュされます。
SMQ では、SMQ サーバーからクライアントにメッセージをプッシュすることで、クライアントがリアルタイムでメッセージを消費できます。ただし、このシナリオでは、コンシューマークライアントのエンドポイントはメッセージを受信するために公開されます。企業のプライベートネットワークなど、特定のシナリオでは、コンシューマークライアントのエンドポイントを公開することはできません。このような場合、コンシューマークライアントは SMQ サーバーからメッセージをプルすることでメッセージを消費できます。SMQ のキューとトピックを組み合わせることで、コンシューマークライアントのエンドポイントを公開することなく、1 つのクライアントから複数のコンシューマークライアントにメッセージを送信できます。
ソリューション
特定のキューエンドポイントを使用して、トピックへのサブスクリプションを作成します。その後、メッセージはトピックからキューにプッシュされ、コンシューマークライアントはキューからメッセージをプルできます。この場合、コンシューマークライアントのエンドポイントを公開することなく、1 つのクライアントから複数のクライアントにメッセージを送信できます。次の図は、このプロセスを示しています。
Java 依存関係ライブラリをインストールする
IntelliJ IDEA で Java プロジェクトを作成します。
pom.xml ファイルに次の依存関係を追加して、Java 依存関係ライブラリをインポートします。
<dependency> <groupId>com.aliyun.mns</groupId> <artifactId>aliyun-sdk-mns</artifactId> <version>1.1.9.2</version> </dependency>
API 操作
Java 1.1.8 用 SDK は、前述のソリューションをサポートするために CloudPullTopic クラスを提供します。MNSClient によって提供される次の API 操作を呼び出して、CloudPullTopic オブジェクトを作成できます。
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)
次の表は、上記のステートメントで設定できるパラメーターについて説明しています。
TopicMeta: トピックのメタデータを指定します。
QueueMeta: キューのメタデータを指定します。
queueNameList: 特定のトピックのメッセージがプッシュされるキューのリストを指定します。
needCreateQueue: queueNameList パラメーターで指定されたキューを作成するかどうかを指定します。
queueMetaTemplate: キューのメタデータテンプレートを指定します。
サンプルコード
package doc;
// SMQ クラスを指定します。
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.QueueMeta;
import com.aliyun.mns.model.TopicMeta;
import com.aliyun.mns.client.CloudPullTopic;
import com.aliyun.mns.model.TopicMessage;
import com.aliyun.mns.model.RawTopicMessage;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.ClientException;
import java.util.Vector;
public class DemoTopicMessageBroadcast {
public static void main(String[] args) {
// Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret、および MNS のエンドポイントを取得します。
CloudAccount account = new CloudAccount(
ServiceSettings.getMNSAccessKeyId(),
ServiceSettings.getMNSAccessKeySecret(),
ServiceSettings.getMNSAccountEndpoint());
MNSClient client = account.getMNSClient();
// コンシューマーを作成します。
Vector<String> consumerNameList = new Vector<String>();
String consumerName1 = "consumer001";
String consumerName2 = "consumer002";
String consumerName3 = "consumer003";
consumerNameList.add(consumerName1);
consumerNameList.add(consumerName2);
consumerNameList.add(consumerName3);
QueueMeta queueMetaTemplate = new QueueMeta();
queueMetaTemplate.setPollingWaitSeconds(30);
try{
// トピックを作成します。
String topicName = "demo-topic-for-pull";
TopicMeta topicMeta = new TopicMeta();
topicMeta.setTopicName(topicName);
CloudPullTopic pullTopic = client.createPullTopic(topicMeta, consumerNameList, true, queueMetaTemplate);
// トピックにメッセージをパブリッシュします。
String messageBody = "broadcast message to all the consumers:hello the world.";
// オリジナルメッセージが送信された場合は、getMessageBodyAsRawString を使用してメッセージ本文を解析します。
TopicMessage tMessage = new RawTopicMessage();
tMessage.setBaseMessageBody(messageBody);
pullTopic.publishMessage(tMessage);
// メッセージを受信します。
CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);
Message consumer1Msg = queueForConsumer1.popMessage(30);
if(consumer1Msg != null)
{
System.out.println("consumer1 receive message:" + consumer1Msg.getMessageBodyAsRawString());
} else{
System.out.println("the queue is empty");
}
Message consumer2Msg = queueForConsumer2.popMessage(30);
if(consumer2Msg != null)
{
System.out.println("consumer2 receive message:" + consumer2Msg.getMessageBodyAsRawString());
}else{
System.out.println("the queue is empty");
}
Message consumer3Msg = queueForConsumer3.popMessage(30);
if(consumer3Msg != null)
{
System.out.println("consumer3 receive message:" + consumer3Msg.getMessageBodyAsRawString());
}else{
System.out.println("the queue is empty");
}
// トピックを削除します。
pullTopic.delete();
} catch(ClientException ce) {
System.out.println("Something wrong with the network connection between client and SMQ service."
+ "Please check your network and DNS availability.");
ce.printStackTrace();
} catch(ServiceException se) {
se.printStackTrace();
}
client.close();
}
}