全部產品
Search
文件中心

:Java SDK版本說明

更新時間:Feb 09, 2025

本文介紹TCP協議下Java SDK 2.x.x.Final的版本說明,包括使用限制、版本的基本資料、環境要求以及和歷史版本相比各功能特性的變更內容。

使用限制

  • 目前僅華東1(杭州)、華北1(青島)、華北2(北京)、華北3(張家口)、華北5(呼和浩特)、華南1(深圳)、西南1(成都)、中國香港、德國(法蘭克福)和印尼(雅加達)地區支援將用戶端升級為2.x.x.Final版本,其他地區請勿將SDK升級到Java SDK 2.x.x Final版本,否則將無法訪問雲訊息佇列 RocketMQ 版服務。

  • Java SDK 2.x.x.Final僅支援通過VPC網路訪問雲訊息佇列 RocketMQ 版,不支援傳統網路訪問。

    若您使用存量雲訊息佇列 RocketMQ 版執行個體並通過傳統網路訪問,請勿將Java SDK升級到V2.x.x.Final版本,否則將無法訪問雲訊息佇列 RocketMQ 版執行個體。

  • Java SDK 2.x.x.Final僅支援有命名空間的執行個體,若您使用的執行個體無命名空間,請勿將用戶端版本升級到Java SDK 2.x.x.Final。

    5.x版本執行個體預設都有命名空間,4.x版本執行個體可在雲訊息佇列 RocketMQ 版控制台实例详情頁面的基础信息地區查看是否有命名空間。

版本資訊

發布時間

版本號碼

下載連結

2023-02-23

2.0.5.Final

ons-client-2.0.5.Final

2022-08-17

2.0.3.Final

ons-client-2.0.3.Final

2022-06-16

2.0.2.Final

ons-client-2.0.2.Final

2021-11-29

2.0.1.Final

ons-client-2.0.1.Final

2021-10-18

2.0.0.Final

ons-client-2.0.0.Final

環境要求

使用V2.x.x版本Java SDK,請確保您使用的JDK版本為Java 8及以上。

V2.0.5功能變更

功能最佳化

日誌增加非同步支援。

問題修複

  • 修複批量消費等待時間無法指定的問題。

  • 修複部分安全性漏洞。

V2.0.3功能變更

問題修複

修複高版本JDK中,消費線程池無法調節至32個線程以上的問題。

V2.0.2功能變更

問題修複

修複訊息發送時,有小機率觸發死結的問題。

V2.0.1功能變更

訊息軌跡

補充訊息軌跡資料。

V2.0.0功能變更

順序訊息

順序訊息的最大重試次數MaxReconsumeTimes參數的預設值從Integer.MAX變更為16次。超過最大重試次數訊息還未被消費成功將直接被投遞至無效信件佇列。您可以通過自訂MaxReconsumeTimes參數值修改順序訊息的最大重試次數。

事務訊息

若生產者發送訊息時LocalTransactionExecuter類為null,則系統會拋出異常,歷史版本則不會拋出異常。

廣播消費

廣播消費模式下,支援使用offsetStore介面的方式定製消費者啟動時的消費位點。若未設定,預設和歷史版本一致直接從最新消費位點開始消費。

範例程式碼如下:

public class BroadcastingConsumerExample {
    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();

        properties.put(PropertyKeyConst.GROUP_ID, "MyGroupId");
        properties.put(PropertyKeyConst.AccessKey, "MyAccessKey");
        properties.put(PropertyKeyConst.SecretKey, "MySecretKey");
        // 設定TCP協議的接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
        // 從OffsetStore讀取消費位點的功能僅支援廣播消費模式。
        properties.put(PropertyKeyConst.MessageModel, MessageModel.BROADCASTING);

        Consumer consumer = ONSFactory.createConsumer(properties);
        // 參數1表示1秒鐘會調用一次AbstractOffsetStore方法來進行位點的持久化。
        OffsetStore offsetStore = new AbstractOffsetStore(1) {
            @Override
            public Map<TopicPartition, Long> loadOffset() {
            // 實現從外部儲存讀取位點的邏輯。
            }

            @Override
            public void persistOffset(Map<TopicPartition, Long> offsetTable) {
            // 持久化位點到外部儲存的邏輯。
            }
        };
        offsetStore.start();
        consumer.setOffsetStore(offsetStore);

        consumer.subscribe("testBroadcastingTopic", "TagA", new MessageListener() {
            @Override
            public Action consume(Message message, ConsumeContext context) {
                // 實現訊息消費相關邏輯。
                return Action.CommitMessage;
            }
        });

        consumer.start();
        Thread.sleep(100000);
        consumer.shutdown();
        offsetStore.shutdown();
    }
}

Push消費

  • 暫不支援普通訊息的批量消費功能。

  • 如果設定的消費線程數不在合法區間[1,1000]內,系統會在建立消費者時拋出異常,而不是在啟動消費者時拋出異常。

  • 新增消費速度限流功能。為了避免訊息洪峰可能對消費端應用產生應衝擊,您可通過該功能限制訊息的消費速度,保護消費端應用。自訂消費速度的範例程式碼如下:

    說明

    順序訊息的訊息重試不受限流量控制。

    public class RateLimitConsumerExample {
        public static void main(String[] args) throws InterruptedException {
            Properties properties = new Properties();
    
            properties.put(PropertyKeyConst.GROUP_ID, "MyGroupId");
            properties.put(PropertyKeyConst.AccessKey, "MyAccessKey");
            properties.put(PropertyKeyConst.SecretKey, "MySecretKey");
            // 設定TCP協議的接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
    
            Consumer consumer = ONSFactory.createConsumer(properties);
    
            // 對testTopicA進行消費限流,限制消費端每秒鐘消費10條訊息。
            consumer.rateLimit("testTopicA", 10);
            consumer.subscribe("testTopic", "TagA", new MessageListener() {
                @Override
                public Action consume(Message message, ConsumeContext context) {
                    // 實現訊息消費相關邏輯。
                    return Action.CommitMessage;
                }
            });
    
            consumer.start();
            Thread.sleep(100000);
            consumer.shutdown();
        }
    }

Pull消費

暫不支援Pull消費方式。

日誌配置

  • 日誌預設路徑由~/logs/ons.log變更為~/logs/ons/ons-client.log

  • 記錄層級與logback完全對齊。除已有的ERRORWARNINFODEBUG層級之外,增加對OFFTRACEALL層級的支援。

  • 所有日誌相關參數的添加方式除了-D方式以外,增加對環境變數的支援。

用戶端建立

設定非法的存取點時,系統會在建立寄件者或消費者時拋出異常,而不是在啟動寄件者或消費者時拋出異常。

訊息軌跡

使用最新版本SDK收發訊息,查詢訊息軌跡時,新增返回如下軌跡參數。

參數

說明

AccessKey

您的阿里雲帳號或RAM使用者的AccessKey ID,用於標識使用者。當您通過SDK或API調用雲訊息佇列 RocketMQ 版資源時,需要使用AccessKey ID進行身分識別驗證。

到達Server

訊息到達訊息佇列RocketMQ版服務端的時間。

預設DeliverAt

定時訊息的預計投遞時間。

實際AvailableAt

定時訊息定時結束的時間。即訊息可被消費者消費的開始時間。

Available Time

訊息可被消費者消費的開始時間。

提交/復原時間

事務訊息提交或復原的時間。

到达消费端

訊息到達消費者用戶端的時間。

等待处理耗时

訊息到達消費者用戶端,等待線程池分配線程和分配處理資源的耗時。

V2.0.0問題修複

修複RAM角色實現跨雲帳號STS授權情境下,updateCredential方法調用頻率較高時,三元組(AccessKey IDAccessKey SecretSTS Token)更新不具備原子性而導致的鑒權失敗問題。

STS授權的SDK範例程式碼,請參見STS Token配置樣本