コンテキスト
Alibaba Cloud Message Service はキューモデルとトピックモデルを提供します。キューは、1対多の共有メッセージ使用モデルを提供し、クライアント側でプルベース方式を採用します。トピックは、1対多のブロードキャストメッセージ使用モデルを提供し、サーバー側でpushベース方式を採用します。上記の2つのモデルはほとんどの適用シナリオの要件を満たすことができます。
プッシュモードの利点はリアルタイム性です。ただし、サーバーからプッシュされたメッセージを受信するにはクライアントのアドレスを公開する必要があります。たとえば、企業のイントラネットなどの状況によってはプッシュするアドレスを公開することはできません。それに応じてプルモード方式を使用する必要があります。MNSサービスでは1対多のプルベースのメッセージ使用モデルを提供していませんが、このモデルはキューとトピックに基づいて活用することができます。解決策は次のとおりです。
解決策
トピックは、メッセージをキューにプッシュし、そこからコンシューマーがメッセージをプルします。次の図に示すように、このようにすれば 1対多のブロードキャストメッセージが有効になり、コンシューマーのアドレスは公開しなくても済みます。
インターフェイスの説明
最新の Java SDK のCloudPullTopic は上記のソリューションをデフォルトでサポートしています。ここで、MNSClientは CloudPullTopic をすばやく作成するため、次の2つのインターフェイスを提供します。
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)
ここで、TopicMeta はトピックを作成するためのメタ設定を指定します。 queueNameList はトピックによってプッシュされるキュー名リストを指定します。 needCreateQueue はqueueNameListを作成する必要があるかどうかを指定します。 queueMetaTemplate はキューの作成に必要なキュー・メタ・パラメーターを指定します。
デモコード
CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
MNSClient client = account.getMNSClient();
// build consumer name list.
Vector<String> consumerNameList = new Vector<String>();
String consumerName1 = "consumer001";
String consumerName2 = "consumer002";
String consumerName3 = "consumer003";
consumerNameList.add(consumerName1);
consumerNameList.add(consumerName2);
consumerNameList.add(consumerName3);
QueueMeta queueMetaTemplate = new QueueMeta();
queueMetaTemplate.setPollingWaitSeconds(30);
try{
//producer code:
// create pull topic which will send message to 3 queues for consumer.
String topicName = "demo-topic-for-pull";
TopicMeta topicMeta = new TopicMeta();
topicMeta.setTopicName(topicName);
CloudPullTopic pullTopic = client.createPullTopic(topicMeta, consumerNameList, true, queueMetaTemplate);
//publish message and consume message.
String messageBody = "broadcast message to all the consumers:hello the world.";
// if we sent raw message,then should use getMessageBodyAsRawString to parse the message body correctly.
TopicMessage tMessage = new RawTopicMessage();
tMessage.setBaseMessageBody(messageBody);
pullTopic.publishMessage(tMessage);
// consumer code:
//3 consumers receive the message.
CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);
Message consumer1Msg = queueForConsumer1.popMessage(30);
if(consumer1Msg != null)
{
System.out.println("consumer1 receive message:" + consumer1Msg.getMessageBodyAsRawString());
}else{
System.out.println("the queue is empty");
}
Message consumer2Msg = queueForConsumer2.popMessage(30);
if(consumer2Msg != null)
{
System.out.println("consumer2 receive message:" + consumer2Msg.getMessageBodyAsRawString());
}else{
System.out.println("the queue is empty");
}
Message consumer3Msg = queueForConsumer3.popMessage(30);
if(consumer3Msg != null)
{
System.out.println("consumer3 receive message:" + consumer3Msg.getMessageBodyAsRawString());
}else{
System.out.println("the queue is empty");
}
// delete the pullTopic.
pullTopic.delete();
}catch(ClientException ce)
{
System.out.println("Something wrong with the network connection between client and MNS service."
+ "Please check your network and DNS availablity.");
ce.printStackTrace();
}
catch(ServiceException se)
{
se.printStackTrace();
}
client.close();
MNSサービスエラーコードの詳細は エラーコードを参照してください。