このトピックでは、ApsaraMQ for RocketMQ における Lite Topic の定義、モデル関係、内部プロパティ、動作制約、バージョンの互換性、および推奨される使用方法について説明します。
前提条件
現在、Lite Topic は、非サーバーレスインスタンス (サブスクリプションおよび従量課金) と専用サーバーレスインスタンスでのみサポートされています。
Lite Topic モデルをサポートするインスタンスバージョンを購入するには:
新規インスタンスの場合、購入ページでプロダクト機能タグを追加します。タグキーを `version_capability` に、タグ値を `lite-topic` に設定します。次の図に例を示します。

既存のインスタンスの場合、チケットを送信して、Lite Topic モデルをサポートするバージョンにアップグレードできます。チケットを送信する際に、インスタンス ID とインスタンスが存在するリージョンを提供してください。
チケットを送信して、Lite Topic を含むシナリオのソリューションに関する無料相談をリクエストできます。
定義
Lite Topic は、ApsaraMQ for RocketMQ におけるメッセージの転送とストレージのためのセカンダリコンテナーです。同じビジネスロジックの下で、異なるセッションやタスクなど、異なる子クラスのメッセージを識別します。
Lite Topic の主な機能は次のとおりです:
排他的な消費とセカンダリデータの隔離
異なる子クラスのデータを別々の Lite Topic で管理することで、よりきめ細かいストレージとサブスクリプションの隔離を実現できます。
データ ID と権限
Topic ベースの ID および権限管理に加えて、Lite Topic を使用してユーザー ID と権限をさらに細かく定義できます。
モデルの関係
ApsaraMQ for RocketMQ のドメインモデルにおいて、Lite Topic はメッセージフローに次のように適合します:

Topic は、ApsaraMQ for RocketMQ におけるメッセージ転送とストレージの最上位コンテナーです。Topic が Lite タイプの場合、その中に Lite Topic を作成できます。Topic と Lite Topic の組み合わせが、メッセージストレージコンテナーを一意に識別します。
Topic タイプが Lite の場合、各ストレージコンテナーはデフォルトで 1 つのキューで構成されます。
内部プロパティ
LiteTopic 名
定義:Lite Topic の名前です。名前は親 Topic 内でグローバルに一意である必要があります。
値:Topic タイプが Lite で、メッセージに対して `setLiteTopic` を呼び出すと、対応する Lite Topic が存在しない場合、システムは自動的に作成します。
制約:詳細については、「パラメーターの制限」をご参照ください。
生存時間 (TTL)
定義:Lite Topic の生存時間 (TTL) です。Lite Topic がその TTL より長い期間メッセージを受信しない場合、自動的に削除されます。Lite Topic を削除すると、そのクォータ数が解放されます。
値:Lite タイプの Topic を作成する際に、`expiration` 値を設定できます。
制約:詳細については、「パラメーターの制限」をご参照ください。
バージョンの互換性
サーバー側バージョン:5.0-rmq-20251024-1 以降
クライアントバージョン: RocketMQ gRPC 5.1.0 以降
Lite Topic と通常 Topic の違い
シナリオ | 項目 | Lite Topic | 通常の Topic |
メッセージストレージ | プライマリ Topic | 同じです。どちらも事前に Topic リソースを作成する必要があります。 | |
セカンダリ Topic | 1 つの Topic の下に数百万のセカンダリ Topic リソース (Lite Topic) を作成できます。これらのセカンダリリソースには多くの新しい属性があります。 | セカンダリ Topic リソースはありません。 | |
自動化されたライフサイクル管理 | セカンダリ Topic (Lite Topic) のライフサイクルは自動的に管理できます:
| なし | |
注文 | 各 Lite Topic は 1 つのキューのみを作成します。同じキュー内のメッセージは順序通りに保存されます。
| 複数のキューが作成されます。パーティション順序 Topic のみが順序を保証します。 | |
送受信の最大同時 TPS | キューが 1 つしかないため、各 Lite Topic の最大 TPS は制限されます。 ただし、1 つの Topic の下に数百万の LiteTopic を作成できるため、LiteTopic を追加するにつれて合計の最大 TPS は増加します。 | Topic の TPS は、キューの数とクラスターノードの数に基づいて水平にスケールアウトできます。 | |
メッセージ消費 | サブスクリプション関係の一貫性 | 不整合になる可能性があります。 同じグループ内で、各コンシューマーは異なる Lite Topic のコレクションをサブスクライブできます。グループに対する制約は緩和されます。 | 一貫している必要があります。 同じグループ内で、各コンシューマーはターゲット Topic のメッセージを共有するために同じサブスクリプション関係を持つ必要があります。 |
注文 | 順序付き消費。Lite Topic 内のメッセージは、1 つのコンシューマースレッドでのみ処理できます。 | 同時消費または順序付き消費を選択できます。 | |
動的サブスクリプション | 各コンシューマーは、特定の Lite Topic へのサブスクリプションを動的に追加または削除できます。 | なし | |
単一のコンシューマーがサブスクライブできる LiteTopic の数 | 各コンシューマーは数千の LiteTopic をサブスクライブできます。 | なし | |
可観測性 | メトリックデータ | メッセージ蓄積メトリックを提供します。 メッセージ処理レイテンシーメトリックは提供しません。 | メッセージ蓄積メトリックを提供します。 メッセージ処理レイテンシーメトリックを提供します。 |
メッセージトレース | 同じ | ||
一般的な LiteTopic のシナリオ
シナリオ 1:マルチエージェントシステムの非同期通信による長時間実行呼び出しのブロッキング解決
AI シナリオが複雑化するにつれて、ほとんどの単一エージェントは複雑な状況で限界に直面します。専門的な分業が欠けており、複数のドメインを統合するのが難しく、動的な協調的意思決定ができません。単一エージェントのアプリケーションとワークフローは、徐々にマルチエージェントアプリケーションへと移行しています。しかし、AI タスクは長時間実行されるため、同期呼び出しは呼び出し元のスレッドをブロックし、大規模なコラボレーションにおいてスケーラビリティの問題を引き起こす可能性があります。

上の図に示すように、マルチエージェントシステムのワークフローは次のとおりです:スーパーバイザーエージェントがリクエストを分割し、2 つのサブエージェントに割り当てます。2 つのサブエージェントはそれぞれのドメイン固有の問題を処理し、結果をスーパーバイザーエージェントに返します。スーパーバイザーエージェントは結果を集約し、Web クライアントに返します。ApsaraMQ for RocketMQ の非同期通信ソリューションは次のように機能します:
リクエスト受信フロー:
各サブエージェントに対してリクエスト Topic を作成し、タスクのバッファーキューとして機能させることができます。これは、優先度の高いタスクを先に処理するための優先度 Topic にすることもできます。
スーパーバイザーエージェントは、分割されたタスク情報を対応するリクエスト Topic に送信します。
応答フローでは:
スーパーバイザーエージェントは Lite タイプの応答 Topic を作成し、それをサブスクライブします。
サブエージェントはタスクを処理し、応答結果を応答 Topic 内の Lite Topic に送信します。Lite Topic はタスク ID にちなんで命名でき、各タスク専用の Lite Topic を作成できます。
スーパーバイザーエージェントは、リアルタイムで結果を取得するためにサブスクライブし、HTTP Server-Sent Events (SSE) プロトコルを使用して Web クライアントにプッシュします。
シナリオ 2: 分散セッション状態管理による AI アプリケーションにおけるセッション管理の課題解決
AI アプリケーションの対話パターンは独特です。高コストの計算能力に大きく依存する、長時間実行されるマルチターンのセッションを伴います。アプリケーションが SSE のような持続的接続に依存している場合、ゲートウェイの再起動、接続タイムアウト、ネットワークの不安定性などのいかなる中断も、現在のセッションコンテキストの喪失を引き起こす可能性があります。これにより、進行中の AI タスクも無効になり、貴重な計算能力が無駄になります。

上の図に示すように、応答結果フローはシナリオ 1 と同じです。Lite タイプの Topic が応答結果のリアルタイム通知に使用されます。ここでは、各 Lite Topic は SessionID を使用して命名できます (例:chatbot/{sessionID})。これにより、セッションの結果はこの Topic 内のメッセージとして順序通りに転送されます。以下のソリューションは、持続的接続が再確立された後にセッションの継続性を維持する方法を説明しています:
Web クライアント 2 は、アプリケーションサービスノード 1 との持続的接続を確立し、Session2 を作成します。
アプリケーションサービスノード 1 は、LiteTopic [chat/SessionID2] をサブスクライブします。
大規模モデルタスクスケジューリングコンポーネントは、リクエスト内の SessionID 情報に基づいて、結果を Lite Topic [chat/SessionID2] に送信します。
ネットワークの問題やその他の例外により、WebSocket 接続は自動的にアプリケーションサービスノード 2 に再接続します。
アプリケーションサービスノード 1 は LiteTopic [chat/SessionID2] のサブスクライブを解除し、アプリケーションサービスノード 2 は LiteTopic [chat/SessionID2] をサブスクライブします。
Lite Topic [chat/SessionID2] は、以前の消費の進捗に基づいて、後続の未消費メッセージをアプリケーションサービスノード 2 にプッシュし続けます。これにより、セッションの状態とデータの継続性が保証されます。
サンプルコード
完全な例については、「RocketMQ 5.x gRPC SDK」のサンプルコードをご参照ください。
メッセージの送信
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(clientConfiguration)
.build();
final Message message = provider.newMessageBuilder()
.setTopic(topic)
// メッセージキーを設定します。キーを使用して特定のメッセージを正確に見つけることができます。
.setKeys("messageKey")
// LiteTopic を設定します。
.setLiteTopic("lite-topic-1")
// メッセージ本文。
.setBody("messageBody".getBytes())
.build();
try {
final SendReceipt sendReceipt = producer.send(message);
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (LiteTopicQuotaExceededException e) {
// LiteTopic クォータを超えました。クォータを評価して増やしてください。
log.error("Lite topic quota exceeded", e);
} catch (Throwable t) {
log.error("Failed to send message", t);
}
メッセージの消費
LitePushConsumer クラスを使用する必要があります:
// LitePushConsumer を初期化します。コンシューマーグループ、ターゲット Topic、通信パラメーターなどをバインドする必要があります。
LitePushConsumer litePushConsumer = provider.newLitePushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// コンソールでコンシューマーグループを作成したときにバインドされた Topic。
.bindTopic(topicName)
// コンシューマーグループを設定します。
.setConsumerGroup(consumerGroup)
.setMessageListener(messageView -> {
// メッセージを処理し、消費結果を返します。
LOGGER.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
try {
// 関心のある Lite Topic のコレクションをサブスクライブします。
litePushConsumer.subscribeLite("lite-topic-1");
litePushConsumer.subscribeLite("lite-topic-2");
litePushConsumer.subscribeLite("lite-topic-3");
} catch (LiteSubscriptionQuotaExceededException e) {
// Lite Topic のサブスクリプションクォータを超えました。クォータを評価し、引き上げてください。
log.error("Lite subscription quota exceeded", e);
} catch (Throwable t) {
log.error("Failed to subscribe lite topic", t);
}
// ビジネス処理が完了したら、使用しなくなった Lite Topic のサブスクリプションを速やかに解除します。
litePushConsumer.unsubscribeLite("lite-topic-3");
// 現在サブスクライブしている Lite Topic のセットを取得します。
Set<String> liteTopicSet = litePushConsumer.getLiteTopicSet();
サブスクリプション関係の動的変更
/**
* サブスクリプション関係を動的に追加します。
* subscribeLite() メソッドはネットワークリクエストを開始し、クォータ検証を実行するため、呼び出しが失敗する可能性があります。
* サブスクリプションが正常に追加されたことを確認するために、この呼び出しの結果を常に確認してください。
* 考えられる失敗シナリオは次のとおりです:
* 1. ネットワークリクエストエラーが発生します。操作をリトライできます。
* 2. クォータ検証が失敗し、LiteSubscriptionQuotaExceededException がスローされます。
* クォータがニーズを満たしているか評価し、使用しなくなった Topic のサブスクリプションを unsubscribeLite() を使用して速やかに解除し、リソースを解放してください。
*/
litePushConsumer.subscribeLite("lite-topic-1");
// サブスクリプション関係を動的に削除します。
litePushConsumer.unsubscribeLite("lite-topic-1");
制限
単一のコンシューマーは最大 2,000 の LiteTopic をサブスクライブできます。この制限を調整するには、チケットを送信できます。
単一の Lite Topic の最大消費 TPS は 200 です。
サービスの安定性を確保するため、単一のインスタンス内で作成またはサブスクライブできる Lite Topic の数には制限が適用されます。具体的なクォータは次の表に記載されています。これらのクォータは、チケットを送信して調整できます。
作成された Lite Topic の数
定義:単一のインスタンス内で作成され、現在アクティブな Lite Topic の総数。
トリガーと影響:この制限に達し、クライアントが存在しない Lite Topic にメッセージを送信しようとして自動作成がトリガーされた場合、システムは Lite Topic を作成できず、メッセージ送信失敗を返します。
LiteTopic サブスクリプションの数
定義:インスタンス内のすべてのオンラインコンシューマークライアントと Lite Topic との間に確立されたすべての有効なサブスクリプション関係の合計。これは動的に変化する値です。
影響:この制限に達すると、コンシューマークライアントが新しい Lite Topic をサブスクライブしようとしても失敗し、新しいサブスクリプション関係を確立できません。
特別ルール:Lite Topic がシステムから削除された場合でも、コンシューマークライアントがまだそのサブスクリプションを維持している場合、そのサブスクリプション関係は、コンシューマーがサブスクリプションを解除するまで、サブスクリプションの総数にカウントされます。
サーバーレスインスタンス
デプロイメントアーキテクチャ | 容量モード | 仕様 | 作成またはサブスクライブできる LiteTopic の最大数 |
専用 | 予約済み + 弾性 | 5,000 | 300,000 |
10,000 | 600,000 | ||
15,000 | 720,000 | ||
[20,000, 50,000] | 1,000,000 | ||
(50,000, 100,000] | 1,500,000 | ||
(100,000, 200,000] | 2,400,000 | ||
(200,000, 300,000] | 4,700,000 | ||
(300,000, 500,000] | 6,300,000 | ||
(500,000, 1,000,000] | 11,600,000 |
非サーバーレス (サブスクリプションおよび従量課金) インスタンス
Standard Edition
インスタンスタイプ | メッセージ送受信の基本 TPS 制限 (ops/s) | 作成またはサブスクライブできる LiteTopic の最大数 |
rmq.s2.2xlarge | 2,000 | 150,000 |
rmq.s2.4xlarge | 4,000 | 250,000 |
rmq.s2.6xlarge | 6,000 | 300,000 |
Professional Edition
インスタンスタイプ | メッセージ送受信の基本 TPS 制限 (ops/s) | 作成またはサブスクライブできる LiteTopic の最大数 |
rmq.p2.2xlarge | 2,000 | 150,000 |
rmq.p2.4xlarge | 4,000 | 250,000 |
rmq.p2.6xlarge | 6,000 | 300,000 |
rmq.p2.10xlarge | 10,000 | 600,000 |
rmq.p2.20xlarge | 20,000 | 800,000 |
rmq.p2.30xlarge | 30,000 | 1,000,000 |
rmq.p2.40xlarge | 40,000 | 1,200,000 |
rmq.p2.50xlarge | 50,000 | 1,400,000 |
rmq.p2.100xlarge | 100,000 | 2,200,000 |
rmq.p2.120xlarge | 120,000 | 2,700,000 |
rmq.p2.150xlarge | 150,000 | 3,300,000 |
rmq.p2.200xlarge | 200,000 | 4,500,000 |
Platinum Edition
インスタンスタイプ | メッセージ送受信の基本 TPS 制限 (ops/s) | 作成またはサブスクライブできる LiteTopic の最大数 |
rmq.u2.10xlarge | 10,000 | 600,000 |
rmq.u2.20xlarge | 20,000 | 800,000 |
rmq.u2.30xlarge | 30,000 | 1,000,000 |
rmq.u2.40xlarge | 40,000 | 1,200,000 |
rmq.u2.50xlarge | 50,000 | 1,400,000 |
rmq.u2.60xlarge | 60,000 | 1,600,000 |
rmq.u2.70xlarge | 70,000 | 1,700,000 |
rmq.u2.80xlarge | 80,000 | 1,800,000 |
rmq.u2.90xlarge | 90,000 | 2,000,000 |
rmq.u2.100xlarge | 100,000 | 2,200,000 |
rmq.u2.120xlarge | 120,000 | 2,700,000 |
rmq.u2.150xlarge | 150,000 | 3,300,000 |
rmq.u2.200xlarge | 200,000 | 4,500,000 |
rmq.u2.250xlarge | 250,000 | 5,600,000 |
rmq.u2.300xlarge | 300,000 | 6,300,000 |
rmq.u2.350xlarge | 350,000 | 7,500,000 |
rmq.u2.400xlarge | 400,000 | 9,300,000 |
rmq.u2.450xlarge | 450,000 | 10,400,000 |
rmq.u2.500xlarge | 500,000 | 11,600,000 |
rmq.u2.550xlarge | 550,000 | 12,800,000 |
rmq.u2.600xlarge | 600,000 | 14,000,000 |
rmq.u2.1000xlarge | 1,000,000 | 23,200,000 |