本文介紹TCP協議下C++ SDK v3.x.x的版本資訊,包括使用限制、版本的基本資料、環境要求、編譯說明以及和歷史版本相比各功能特性的變更內容。
使用限制
C++ SDK v3.x.x版本僅支援有命名空間的執行個體,若您使用的執行個體無命名空間,請勿將用戶端版本升級到C++ SDK v3.x.x。
5.x版本執行個體預設都有命名空間,4.x版本執行個體可在雲訊息佇列 RocketMQ 版控制台实例详情頁面的基础信息地區查看是否有命名空間。
版本資訊
發布時間 | 版本號碼 | 下載連結 |
2021-10-18 | v3.x.x |
環境要求
ONS-Client-CPP是基於Apache RocketMQ 5.0協議原生實現的開源用戶端開發軟體包。Apache RocketMQ 5.0新的通訊協定基於gRPC(HTTP 2.0/Protobuf)實現,因此v3.0.0版本的C++ SDK也需要依賴於grpc/grpc實現,滿足以下依賴和工具鏈的要求。
依賴
依賴 | 版本 |
grpc/grpc | 1.39.0 |
fmt | 8.0.1 |
spdlog | 1.9.2 |
filesystem | 1.5.0 |
asio | 1.18.2 |
cpp_httplib | 0.9.4 |
protobuf | 3.17.2 |
工具鏈
作業系統 | 工具鏈版本 |
Linux、macOS | GCC 4.9或以上版本、Clang 3.4或以上版本 |
Windows 7或以上版本 | Visual Studio 2015或以上版本 |
C++ Standard
SDK使用了C++ 11文法,因此需要啟用C++ 11或以上標準。
編譯說明
開原始碼編譯說明
參考Bazel安裝指南安裝Bazel工具。
說明使用Bazel 4.x版本,需要安裝Python 3.x.x。
下載開原始碼並解壓。可通過以下兩種方式下載:
通過GitHub複製原始碼:執行
git clone https://github.com/aliyun-mq/ons-client-cpp.git命令。本地下載:下載連結請參見版本資訊。
在專案檔夾內執行以下命令,Bazel將自動下載所有第三方依賴。
bazel -c opt //dist/...輸出樣本如下:
INFO: From Action dist/libons_library.pic.a: starting to run shell INFO: Elapsed time: 39.480s, Critical Path: 38.89s INFO: 2044 processes: 1796 remote cache hit, 241 internal, 7 processwrapper-sandbox. INFO: Build completed successfully, 2044 total actions編譯完成後,合并好的靜態庫在bazel-bin/dist/ons-dist.tar.gz檔案內。
root@a36849cf2f24:~/ons-client-cpp# ls -lah bazel-bin/dist/ons-dist.tar.gz -r-xr-xr-x 1 root root 15M Oct 14 08:03 bazel-bin/dist/ons-dist.tar.gz
CentOS 7編譯說明
CentOS 7.x預設安裝GCC 4.8.5,不滿足工具鏈的要求。因此您需要安裝devtoolset-4,devtoolset-4提供的工具鏈版本為GCC 5.3.1。
wget https://copr.fedorainfracloud.org/coprs/vbatts/bazel/repo/epel-7/vbatts-bazel-epel-7.repo
cp vbatts-bazel-epel-7.repo /etc/yum.repos.d/
yum install devtoolset-4-gcc devtoolset-4-gcc-c++ bazel4 python3 git -y
scl enable devtoolset-4 bash
unlink /usr/bin/python && ln -s /usr/bin/python3 /usr/bin/python
git clone git@github.com:aliyun-mq/ons-client-cpp.git
cd ons-client-cpp && bazel build //dist/...功能變更
順序訊息
順序訊息的最大重試次數MaxReconsumeTimes參數的預設值從Integer.MAX變更為16次。超過最大重試次數訊息還未被消費成功將直接被投遞至無效信件佇列。您可以通過自訂MaxReconsumeTimes參數值修改順序訊息的最大重試次數。
廣播消費
廣播消費模式下,支援使用offsetStore介面的方式定製消費者啟動時的消費位點。若未設定,預設和歷史版本一致直接從最新消費位點開始消費。
範例程式碼如下:
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
#include "ons/MessageModel.h"
#include "ons/ONSFactory.h"
#include "rocketmq/Logger.h"
using namespace std;
using namespace ons;
std::mutex console_mtx;
class ExampleMessageListener : public MessageListener {
public:
Action consume(const Message& message, ConsumeContext& context) noexcept override {
std::lock_guard<std::mutex> lk(console_mtx);
auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp();
auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp();
std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " << message.getMsgID()
<< ", Body-size: " << message.getBody().size()
<< ", Current - Store-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count()
<< "ms, Current - Born-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count()
<< "ms" << std::endl;
return Action::CommitMessage;
}
};
int main(int argc, char* argv[]) {
auto& logger = rocketmq::getLogger();
logger.setLevel(rocketmq::Level::Debug);
logger.init();
std::cout << "=======Before consuming messages=======" << std::endl;
ONSFactoryProperty factory_property;
//從OffsetStore讀取消費位點的功能僅支援廣播消費模式。
factory_property.setMessageModel(ONS_NAMESPACE::MessageModel::BROADCASTING);
factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard");
PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property);
const char* topic = "cpp_sdk_standard";
const char* tag = "*";
// register your own listener here to handle the messages received.
auto* messageListener = new ExampleMessageListener();
consumer->subscribe(topic, tag);
consumer->registerMessageListener(messageListener);
// Start this consumer
consumer->start();
// Keep main thread running until process finished.
std::this_thread::sleep_for(std::chrono::minutes(15));
consumer->shutdown();
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}Push消費
如果設定的消費線程數不在合法區間[1,1000]內,系統會在建立消費者時拋出異常,而不是在啟動消費者時拋出異常。
新增消費速度限流功能。為了避免訊息洪峰可能對消費端應用產生衝擊,您可通過該功能限制訊息的消費速度,保護消費端應用。
說明順序訊息的訊息重試不受限流量控制。
消費限流範例程式碼如下:
#include <chrono> #include <iostream> #include <mutex> #include <thread> #include "ons/MessageModel.h" #include "ons/ONSFactory.h" #include "rocketmq/Logger.h" using namespace std; using namespace ons; std::mutex console_mtx; class ExampleMessageListener : public MessageListener { public: Action consume(const Message& message, ConsumeContext& context) noexcept override { std::lock_guard<std::mutex> lk(console_mtx); auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp(); auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp(); std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " << message.getMsgID() << ", Body-size: " << message.getBody().size() << ", Tag: " << message.getTag() << ", Current - Store-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count() << "ms, Current - Born-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count() << "ms" << std::endl; return Action::CommitMessage; } }; int main(int argc, char* argv[]) { auto& logger = rocketmq::getLogger(); logger.setLevel(rocketmq::Level::Debug); logger.init(); const char* topic = "cpp_sdk_standard"; const char* tag = "*"; std::cout << "=======Before consuming messages=======" << std::endl; ONSFactoryProperty factory_property; factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard"); // Client-side throttling. factory_property.throttle(topic, 16); PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property); // register your own listener here to handle the messages received. auto* messageListener = new ExampleMessageListener(); consumer->subscribe(topic, tag); consumer->registerMessageListener(messageListener); // Start this consumer. consumer->start(); // Keep main thread running until process finished. std::this_thread::sleep_for(std::chrono::minutes(15)); consumer->shutdown(); std::cout << "=======After consuming messages======" << std::endl; return 0; }
訊息軌跡
參數 | 說明 |
AccessKey | 您的阿里雲帳號或RAM使用者的AccessKey ID,用於標識使用者。當您通過SDK或API調用雲訊息佇列 RocketMQ 版資源時,需要使用AccessKey ID進行身分識別驗證。 |
到達Server | 訊息到達訊息佇列RocketMQ版服務端的時間。 |
預設DeliverAt | 定時訊息的預計投遞時間。 |
實際AvailableAt | 定時訊息定時結束的時間。即訊息可被消費者消費的開始時間。 |
Available Time | 訊息可被消費者消費的開始時間。 |
提交/復原時間 | 事務訊息提交或復原的時間。 |
到达消费端 | 訊息到達消費者用戶端的時間。 |
等待处理耗时 | 訊息到達消費者用戶端,等待線程池分配線程和分配處理資源的耗時。 |
API介面變更
日誌預設路徑由~/logs/rocketmqlogs/ons.log變更為~/logs/rocketmqlogs/ons.log。
枚舉Action從全域命名空間移動到ons命名空間下。
標頭檔都存放到/ons路徑下。
Message#getStartDeliverTime參數傳回值,從int64_t修改為std::chrono::system_clock::timepoint或std::chrono::milliseconds。
刪除了函數的throws聲明,該聲明在C++ 11標準下已不支援。
Producer類提供noexcept介面,用于禁用異常情境使用。枚舉類型都轉變為
namespace enum,即enum class Type。
SDK常見問題
在同一個進程內,新舊版本的SDK是否可以共存?會不會有符號衝突?
新版本SDK的先行編譯靜態庫,符號在預設的命名空間ons下,會與歷史版本的符號衝突。若要實現同進程兩個版本相容,您可以自行從源碼編譯,只要保證在編譯過程中,定義
ONS_NAMESPACE這個宏為非ons的值即可。Bazel提供多種方式定義宏,可以通過.bazelrc、cc_library規則中的defines屬性定義或
cc_library#copts屬性定義。調試代碼時,怎麼編譯一個帶符號表的靜態庫?
我已經使用了Protobuf依賴,跟你們要求的依賴版本不一致,怎麼解決依賴衝突?
ONS-Client-CPP對第三方採用源碼依賴的形式,您只需要調整ONS-Client-CPP專案的依賴版本與您自身的依賴一致即可。
ONS-Client-CPP依賴了RocketMQ-Client-CPP,您需要fork apache/rocketmq-client-cpp倉庫,將依賴描述檔案ons-client-cpp/bazel/deps.bzl中的依賴指向地址修改為fork的地址。
本地使用了HTTP代理,並聲明了
http_proxy、grpc_proxy等環境變數,發現發送訊息有很多逾時,是什麼原因?SDK基於gRPC實現,支援http_proxy、https_proxy和grpc_proxy等方式配置代理。如果不需要proxy,可以配置no_grpc_proxy或no_proxy環境變數。忽略代理網站,請參見gRPC環境變數列表說明。
我們專案使用的是C++ 98、C++ 03標準,能支援嗎?
不能。由於gRPC和Protobuf是目前專案的核心協議和依賴,本專案也依據這兩個依賴的標準,不再支援C++ 98和C++ 03標準。