全部產品
Search
文件中心

:如何排查RocketMQ訂閱關係不一致的問題?

更新時間:Dec 24, 2025

背景知識

什麼是訂閱關係一致

訂閱關係一致指的是消費者組(Group)內所有消費者訂閱關係(Topic+TAG)必須保持一致,消費者可以訂閱多個Topic或多個Tag,但其訂閱關係必須一致。

詳見訂閱關係一致

消費者如何訂閱多個Topic與Tag

商業版JAVA SDK

maven座標範例:

<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>x.x.x.Final</version>

範例程式碼:

Properties properties = new Properties();
// 您在訊息佇列RocketMQ版控制台建立的Group ID。
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
/*
* 一些屬性配置
*/
Consumer consumer = ONSFactory.createConsumer(properties);
//訂閱多個Tag。
consumer.subscribe("TopicTestMQ1", "TagA||TagB", new MessageListener() {...});
//訂閱多個Topic,同一個consumer執行個體繼續調用subscribe增加訂閱。
consumer.subscribe("TopicTestMQ2", "TagA||TagB", new MessageListener() {...});

社區版JAVA SDK(artifactId為rocketmq-client)

maven座標範例:

<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>x.x.x</version>

範例程式碼:

// 您在訊息佇列RocketMQ版控制台建立的Group ID。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP_ID, getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
/*
* 一些屬性配置
*/
//訂閱多個Tag
consumer.subscribe("java-test", "TagA||TagB");
//訂閱第二個Topic,需要注意consumer只能註冊一個消費邏輯,即多個訂閱的消費邏輯都是一樣的
consumer.subscribe("cxtest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {...});

社區版JAVA SDK(artifactId為rocketmq-client-java)

maven座標範例:

<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.8</version>

範例程式碼:

final ClientServiceProvider provider = ClientServiceProvider.loadService();
/*
* 一些配置
*/
HashMap<String, FilterExpression> sub = new HashMap<>();
//訂閱多個Tag
sub.put("Topic1", new FilterExpression("TagA||TagB", FilterExpressionType.TAG));
//訂閱第二個topic,需要注意consumer只能註冊一個消費邏輯,即多個訂閱的消費邏輯都是一樣的
sub.put("Topic2", new FilterExpression("TagA||TagB", FilterExpressionType.TAG));
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                //設定消費者分組,訊息佇列RocketMQ版控制台建立的Group ID。
                .setConsumerGroup(consumerGroup)
                //設定預綁定的訂閱關係。
                .setSubscriptionExpressions(sub)
                //設定消費監聽器。
                .setMessageListener(messageView -> {...})
                .build();

SpringBoot架構(使用RocketMQMessageListener註解)

若使用RocketMQMessageListener註解來啟動消費者,需要留意每個註解都會啟動一個獨立的consumer執行個體,若代碼中使用了多次註解,且每處註解訂閱的Topic不同,就會導致訂閱關係不一致。

比如,下面這段代碼實現了兩次RocketMQListener介面,在同一個Group中分別訂閱了TopicA和TopicB,實際運行時用戶端會建立兩個consumer執行個體,最終導致在同一個消費者組內有兩個消費者分別訂閱TopicATopicB,導致訂閱關係不一致。

// 商務邏輯1:TestMessageListener1.java
@Component
@RocketMQMessageListener(consumerGroup="GID_A", topic = "TopicA")
public class TestMessageListener1 implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        /*
        * 一些配置
        */
    }
}
// 商務邏輯2:TestMessageListener2.java
@Component
@RocketMQMessageListener(consumerGroup="GID_A", topic = "TopicB")
public class TestMessageListener2 implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        /*
        * 一些配置
        */
    }
}

若要實現多訂閱,可通過重寫RocketMQPushConsumerLifecycleListener介面的prepareStart方法來實現,參考代碼如下:

@Component
@RocketMQMessageListener(consumerGroup = "GID_A", topic = "")
public class TestMessageListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(MessageExt messageExt) {}
    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        try{
            // 訂閱多個tag
            defaultMQPushConsumer.subscribe("TopicA", "TagA||TagB");
            // 訂閱第二個topic
            defaultMQPushConsumer.subscribe("TopicB", "*");
            // 註冊消費邏輯
            defaultMQPushConsumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {......});
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

SpringBoot架構(使用Bean註解)

此方式通過手動注入ConsumerBean,需要按照原生SDK的方式進行多訂閱。

@Configuration
public class ConsumerClient {
  
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        //參考商業版以及社區版SDK多訂閱的方式在此處注入對應的ConsumerBean
    }
}

情境描述

雲訊息佇列 RocketMQ 版控制台提示訂閱關係不一致。

簡要排查思路

  1. 在控制台中找到對應的RocketMQ執行個體。

  2. 在Group詳情頁查看訂閱關係不一致的用戶端分布情況。

  3. 根據使用的SDK、架構修正為一致的訂閱關係。

排查詳情

首先確認使用的執行個體版本是5.0還是4.0,兩者在控制台上顯示訂閱關係的樣式略有區別。

RocketMQ 5.0執行個體

  1. 進入Group詳情頁,查看用戶端以及對應的訂閱關係列表。

  2. 訂閱關係是以Topic為單位呈現的,過濾運算式一列顯示了該Group內所有消費者訂閱對應Topic的所有運算式。

  3. 為了快速排查具體是哪些消費者的訂閱關係不一致,可以通過點開查看分布來詳細擷取消費者的訂閱情況。點擊後重點關注宿主機 IP/公網 IP一列,一般情況下<IP>:<連接埠>可以唯一確定一個消費者執行個體(即初始化Consumer類之後的運行實體)。可以分為幾種情況來分析:

    1. 若頁面上只顯示了單個Topic,且該Topic記憶體在多個過濾運算式,說明不同的消費者對於該Topic設定的Tag不一致,此時只需點擊右側的查看分布,頁面上會顯示該Topic下所有消費者訂閱情況,根據實際情況修改業務代碼,確保所有消費者訂閱的Tag保持一致即可。

    2. 若頁面上顯示了多個Topic,但每個Topic內只有一個過濾運算式,則說明有消費者訂閱了多個Topic,此時只需點擊每個Topic的查看分布資訊,以宿主機 IP/公網 IP為維度梳理每個消費者的訂閱情況。

    3. 若頁面上顯示了多個Topic,且每個Topic記憶體在多個過濾運算式,可點擊查看分布,以宿主機 IP/公網 IP為維度重新整理消費者的訂閱關係,以便以消費者的視角快速理清訂閱關係。

RocketMQ 4.0執行個體

進入Group詳情頁,在訂閱關係地區會以用戶端為單位,顯示該Group內的訂閱情況。