Simple Message Queue (SMQ) は、公開された各メッセージを複数のコンシューマーキューに配信する、Topic とキューを組み合わせたパターンをサポートしています。コンシューマーは自身のキューからメッセージをプルするため、エンドポイントは非公開のままです。このチュートリアルでは、SMQ Java SDK を使用して、このファンアウトアーキテクチャについて説明します。
このチュートリアルでは、Java 用 SDK を使用します。他の言語の SDK については、「新しいバージョンの SDK のリファレンス (推奨)」をご参照ください。
メッセージングモデルの比較
SMQ は、2 つの基本的なメッセージングモデルを提供します。どちらを選択するかは、コンシューマーがメッセージをどのように受信するか、またそのエンドポイントを公開できるかどうかによって決まります。
| モデル | 仕組み | コンシューマーの可視性 | 最適な用途 |
|---|---|---|---|
| キューベース | 1 つのクライアントがキューにメッセージを送信します。複数のコンシューマーがメッセージをプルしますが、各メッセージは 1 つのコンシューマーにのみ送信されます。 | エンドポイントは非公開のまま | ポイントツーポイントのタスク分散 |
| Topic ベース | 1 つのクライアントが Topic にメッセージを公開します。SMQ サーバーは、サブスクライブしているすべてのコンシューマーにメッセージをプッシュします。 | エンドポイントを公開する必要がある | リアルタイムのプッシュ通知 |
| 組み合わせ (このチュートリアル) | 1 つのクライアントが Topic に公開します。Topic はメッセージを複数のキューにプッシュします。各コンシューマーは自身のキューからプルします。 | エンドポイントは非公開のまま | 非公開コンシューマーによるファンアウト |
組み合わせパターンは、メッセージを複数のコンシューマーに到達させる必要があるが、それらのエンドポイントを公開できないシナリオに適しています。たとえば、プライベートネットワーク内で実行されているコンシューマーなどです。
シナリオ例:注文処理システムが、新しい注文ごとに Topic に公開します。個別のキューが、フルフィルメント、分析、および通知サービスにデータを供給します。各サービスは、インバウンドエンドポイントを公開することなく、独立して自身のキューから注文をプルします。
仕組み
組み合わせパターンは、サブスクリプションを介して 1 つの Topic と複数のキューを連鎖させます。
Topic と、コンシューマーごとに 1 つのキューを作成します。
各キューを Topic にサブスクライブします (キューのエンドポイントがサブスクリプションのターゲットになります)。
プロデューサーが Topic にメッセージを公開します。
SMQ は、サブスクライブされているすべてのキューにメッセージをコピーします。
各コンシューマーは、自身のキューからそのコピーをプルします。

コンシューマーはプッシュされたメッセージを受信するのではなく、キューからプルするため、そのエンドポイントが SMQ サーバーや他のコンシューマーに公開されることはありません。
前提条件
開始する前に、以下を確認してください:
IntelliJ IDEA または Eclipse (このチュートリアルでは IntelliJ IDEA を使用)
JDK 1.8 以降
Java 依存関係のインストール
次の依存関係を Java プロジェクトの pom.xml ファイルに追加します。
<dependency>
<groupId>com.aliyun.mns</groupId>
<artifactId>aliyun-sdk-mns</artifactId>
<version>1.1.9.2</version>
</dependency>CloudPullTopic API リファレンス
SDK for Java 1.1.8 では、Topic とキューを組み合わせたパターンをカプセル化する CloudPullTopic クラスが導入されました。MNSClient を介して CloudPullTopic オブジェクトを作成します。
// 完全な形式:キューメタデータテンプレートを使用してキューを自動的に作成
public CloudPullTopic createPullTopic(
TopicMeta topicMeta,
Vector<String> queueNameList,
boolean needCreateQueue,
QueueMeta queueMetaTemplate
)
// 短い形式:既存のキューを使用 (自動作成なし)
public CloudPullTopic createPullTopic(
TopicMeta topicMeta,
Vector<String> queueNameList
)| パラメーター | 説明 |
|---|---|
topicMeta | Topic のメタデータ (名前、プロパティ)。 |
queueNameList | キュー名のリスト。各キューは、公開されたすべてのメッセージのコピーを受信します。 |
needCreateQueue | queueNameList にリストされているキューを作成するかどうか。自動的に作成するには true に設定します。 |
queueMetaTemplate | 自動作成されるすべてのキューに適用されるキューメタデータテンプレート (例:ポーリング待機時間)。 |
メッセージの公開と消費
次のサンプルでは、CloudPullTopic を使用してファンアウトを設定し、メッセージを公開し、3 つの個別のキューからそれを消費します。
package doc;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.client.CloudPullTopic;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.model.QueueMeta;
import com.aliyun.mns.model.TopicMeta;
import com.aliyun.mns.model.TopicMessage;
import com.aliyun.mns.model.RawTopicMessage;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.ClientException;
import java.util.Vector;
public class DemoTopicMessageBroadcast {
public static void main(String[] args) {
// AccessKey ID、AccessKey Secret、およびエンドポイントでクライアントを初期化します。
CloudAccount account = new CloudAccount(
ServiceSettings.getMNSAccessKeyId(),
ServiceSettings.getMNSAccessKeySecret(),
ServiceSettings.getMNSAccountEndpoint());
MNSClient client = account.getMNSClient();
// 3 つのコンシューマーキューを定義します。
Vector<String> consumerNameList = new Vector<String>();
String consumerName1 = "consumer001";
String consumerName2 = "consumer002";
String consumerName3 = "consumer003";
consumerNameList.add(consumerName1);
consumerNameList.add(consumerName2);
consumerNameList.add(consumerName3);
// 自動作成されたキューのロングポーリングを 30 秒に設定します。
QueueMeta queueMetaTemplate = new QueueMeta();
queueMetaTemplate.setPollingWaitSeconds(30);
try {
// --- プロデューサー:Topic を作成し、メッセージを公開 ---
String topicName = "demo-topic-for-pull";
TopicMeta topicMeta = new TopicMeta();
topicMeta.setTopicName(topicName);
// Topic を作成し、3 つのコンシューマーキューを自動作成します。
CloudPullTopic pullTopic = client.createPullTopic(
topicMeta, consumerNameList, true, queueMetaTemplate);
// Topic に未加工のメッセージを公開します。
String messageBody = "broadcast message to all the consumers:hello the world.";
TopicMessage tMessage = new RawTopicMessage();
tMessage.setBaseMessageBody(messageBody);
pullTopic.publishMessage(tMessage);
// --- コンシューマー:各キューからメッセージをプル ---
CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);
// popMessage(30) はメッセージを最大 30 秒間待機します (ロングポーリング)。
Message consumer1Msg = queueForConsumer1.popMessage(30);
if (consumer1Msg != null) {
System.out.println("consumer1 received: "
+ consumer1Msg.getMessageBodyAsRawString());
} else {
System.out.println("consumer1 queue is empty");
}
Message consumer2Msg = queueForConsumer2.popMessage(30);
if (consumer2Msg != null) {
System.out.println("consumer2 received: "
+ consumer2Msg.getMessageBodyAsRawString());
} else {
System.out.println("consumer2 queue is empty");
}
Message consumer3Msg = queueForConsumer3.popMessage(30);
if (consumer3Msg != null) {
System.out.println("consumer3 received: "
+ consumer3Msg.getMessageBodyAsRawString());
} else {
System.out.println("consumer3 queue is empty");
}
// クリーンアップ:Topic、そのサブスクリプション、および関連するすべてのキューを削除します。
pullTopic.delete();
} catch (ClientException ce) {
System.out.println("クライアントと SMQ 間のネットワークエラー。ネットワークと DNS の設定を確認してください。");
ce.printStackTrace();
} catch (ServiceException se) {
se.printStackTrace();
}
client.close();
}
}結果の検証
サンプルコードを実行した後、各コンシューマーがメッセージを受信したことを確認します。期待される出力は次のとおりです。
consumer1 received: broadcast message to all the consumers:hello the world.
consumer2 received: broadcast message to all the consumers:hello the world.
consumer3 received: broadcast message to all the consumers:hello the world.キューに "queue is empty" と表示される場合、メッセージがまだ配信されていない可能性があります。popMessage のタイムアウトを増やすか、Topic とサブスクリプションが正しく作成されたかを確認してください。
コードのウォークスルー
| ステップ | 内容 |
|---|---|
CloudAccount | AccessKey ID、AccessKey Secret、および SMQ エンドポイントで認証します。 |
createPullTopic | Topic を作成し、3 つのコンシューマーキューを作成し、各キューを Topic にサブスクライブします。 |
publishMessage | 未加工のメッセージ (RawTopicMessage) を Topic に公開します。SMQ はそれをサブスクライブされているすべてのキューにコピーします。 |
popMessage(30) | 各コンシューマーは、30 秒のロングポーリングタイムアウトでそのコピーをプルします。 |
pullTopic.delete() | Topic、そのサブスクリプション、および関連するすべてのキューを削除します。 |
このサンプルでは、未加工のメッセージ (RawTopicMessage) を公開します。未加工のメッセージはgetMessageBodyAsRawString()で解析します。代わりにBase64TopicMessageを使用する場合は、getMessageBodyAsString()で解析します。
次のステップ
その他の SMQ メッセージングパターンについては、「新しいバージョンの SDK のリファレンス (推奨)」をご参照ください。