背景情報
一貫性のあるサブスクリプション関係とは
一貫性のあるサブスクリプションとは、同一のコンシューマーグループ内のすべてのコンシューマーが、全く同じトピックとタグのセットをサブスクライブしなければならないというルールです。個々のコンシューマーは複数のトピックをサブスクライブできますが、この完全なサブスクリプションセットは、そのグループ内の他のすべてのコンシューマーと同一でなければなりません。これにより、メッセージが予測どおりに分散され、メッセージの損失を防ぐことができます。
詳細については、「一貫性のあるサブスクリプション関係」をご参照ください。
コンシューマーが複数のトピックとタグをサブスクライブする方法
商用 Java SDK
Maven 座標の例:
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>x.x.x.Final</version>サンプルコード:
Properties properties = new Properties();
// ApsaraMQ for RocketMQ コンソールで作成した Group ID。
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
/*
* いくつかのプロパティ設定
*/
Consumer consumer = ONSFactory.createConsumer(properties);
// 複数のタグをサブスクライブします。
consumer.subscribe("TopicTestMQ1", "TagA||TagB", new MessageListener() {...});
// 複数のトピックをサブスクライブします。同じコンシューマーインスタンスで再度 subscribe を呼び出してサブスクリプションを追加します。
consumer.subscribe("TopicTestMQ2", "TagA||TagB", new MessageListener() {...});
コミュニティ Java SDK (artifactId: rocketmq-client)
Maven 座標の例:
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>x.x.x</version>サンプルコード:
// ApsaraMQ for RocketMQ コンソールで作成した Group ID。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP_ID, getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
/*
* いくつかのプロパティ設定
*/
// 複数のタグをサブスクライブします。
consumer.subscribe("java-test", "TagA||TagB");
// 2 つ目のトピックをサブスクライブします。コンシューマーは 1 つの消費ロジックしか登録できないことに注意してください。これは、すべてのサブスクリプションに同じロジックが適用されることを意味します。
consumer.subscribe("cxtest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {...});コミュニティ Java SDK (artifactId: rocketmq-client-java)
Maven 座標の例:
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.8</version>サンプルコード:
final ClientServiceProvider provider = ClientServiceProvider.loadService();
/*
* いくつかの設定
*/
HashMap<String, FilterExpression> sub = new HashMap<>();
// 複数のタグをサブスクライブします。
sub.put("Topic1", new FilterExpression("TagA||TagB", FilterExpressionType.TAG));
// 2 つ目のトピックをサブスクライブします。コンシューマーは 1 つの消費ロジックしか登録できないことに注意してください。これは、すべてのサブスクリプションに同じロジックが適用されることを意味します。
sub.put("Topic2", new FilterExpression("TagA||TagB", FilterExpressionType.TAG));
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// コンシューマーグループを設定します。ApsaraMQ for RocketMQ コンソールで作成した Group ID を使用します。
.setConsumerGroup(consumerGroup)
// 事前にアタッチされたサブスクリプション関係を設定します。
.setSubscriptionExpressions(sub)
// メッセージリスナーを設定します。
.setMessageListener(messageView -> {...})
.build();SpringBoot フレームワーク (`RocketMQMessageListener` アノテーションを使用)
RocketMQMessageListener アノテーションを使用してコンシューマーを起動する場合、各アノテーションが個別のコンシューマーインスタンスを起動することに注意してください。コード内で複数のアノテーションを使用し、それぞれが異なるトピックをサブスクライブすると、サブスクリプション関係の不整合が発生します。
例えば、以下のコードは `RocketMQListener` インターフェイスを 2 回実装しており、同じグループで `TopicA` と `TopicB` をサブスクライブします。実行時に、クライアントは 2 つのコンシューマーインスタンスを作成します。これにより、同じコンシューマーグループ内の 2 つのコンシューマーがそれぞれ TopicA と TopicB をサブスクライブすることになり、サブスクリプション関係の不整合が発生します。
// ビジネスロジック 1: TestMessageListener1.java
@Component
@RocketMQMessageListener(consumerGroup="GID_A", topic = "TopicA")
public class TestMessageListener1 implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
/*
* いくつかの設定
*/
}
}
// ビジネスロジック 2: TestMessageListener2.java
@Component
@RocketMQMessageListener(consumerGroup="GID_A", topic = "TopicB")
public class TestMessageListener2 implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
/*
* いくつかの設定
*/
}
}複数のサブスクリプションを実装するには、RocketMQPushConsumerLifecycleListener インターフェイスの prepareStart メソッドを再書き込みします。以下のコードに例を示します。
@Component
@RocketMQMessageListener(consumerGroup = "GID_A", topic = "")
public class TestMessageListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(MessageExt messageExt) {}
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
try{
// 複数のタグをサブスクライブします。
defaultMQPushConsumer.subscribe("TopicA", "TagA||TagB");
// 2 つ目のトピックをサブスクライブします。
defaultMQPushConsumer.subscribe("TopicB", "*");
// 消費ロジックを登録します。
defaultMQPushConsumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {......});
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}SpringBoot フレームワーク (`Bean` アノテーションを使用)
このメソッドは ConsumerBean を手動でインジェクションします。複数のサブスクリプションを作成するには、ネイティブ SDK のメソッドを使用する必要があります。
@Configuration
public class ConsumerClient {
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer() {
// ここで対応する ConsumerBean をインジェクションします。商用およびコミュニティ SDK の複数サブスクリプションメソッドをご参照ください。
}
}シナリオの説明
ApsaraMQ for RocketMQ コンソールで、サブスクリプション関係が不整合であることが示されます。
簡単なトラブルシューティング手順
コンソールで、対応する RocketMQ インスタンスを見つけます。
グループ詳細ページで、サブスクリプション関係が不整合なグループのクライアント分散を表示します。
使用している SDK またはフレームワークに基づいて、サブスクリプション関係を修正し、一貫性を確保します。
トラブルシューティングの詳細
まず、使用しているインスタンスのバージョンが 5.0 か 4.0 かを確認します。コンソールでは、バージョンごとにサブスクリプション関係の表示が異なります。
RocketMQ 5.0 インスタンス
グループ詳細ページに移動し、クライアントのリストとそれに対応する [サブスクリプション] を表示します。
サブスクリプション関係はトピックごとにレンダリングされます。[フィルター式] 列には、グループ内のコンシューマーが対応するトピックをサブスクライブするために使用するすべての式が表示されます。
サブスクリプション関係が不整合なコンシューマーを迅速に特定するには、[分散の表示] をクリックして、各コンシューマーの詳細なサブスクリプション情報を取得します。クリック後、[ホスト IP/パブリック IP] 列に注目します。通常、
<IP>:<Port>はコンシューマーインスタンスを一意に識別します。これは、`Consumer` クラスが初期化されるときに作成されるランタイムエンティティです。状況はいくつかの方法で分析できます。ページに単一のトピックのみが表示され、そのトピックに複数の [フィルター式] エントリがある場合、これは異なるコンシューマーがそのトピックに対して異なるタグを設定していることを意味します。この場合、右側の [分散の表示] をクリックして、そのトピックに対するすべてのコンシューマーのサブスクリプション状況を表示します。必要に応じてビジネスコードを修正し、すべてのコンシューマーが同じタグをサブスクライブするようにします。
ページに複数のトピックが表示され、それぞれに 1 つの [フィルター式] しかない場合、これは一部のコンシューマーが複数のトピックをサブスクライブしていることを示します。この場合、各トピックの [分散の表示] をクリックします。[ホスト IP/パブリック IP] ディメンションに基づいて、各コンシューマーのサブスクリプション情報を整理します。
ページに複数のトピックが表示され、各トピックに複数の [フィルター式] エントリがある場合、[分散の表示] をクリックして、[ホスト IP/パブリック IP] ディメンションでコンシューマーのサブスクリプション関係を再整理します。これにより、コンシューマーの視点からサブスクリプション関係を迅速に理解できます。
RocketMQ 4.0 インスタンス
グループ詳細ページに移動します。[サブスクリプション] エリアには、グループのサブスクリプション情報がクライアントごとに表示されます。