本文介绍TCP协议下C++ SDK v3.x.x的版本信息,包括使用限制、版本的基本信息、环境要求、编译说明以及和历史版本相比各功能特性的变更内容。

使用限制

C++ SDK v3.x.x版本仅支持有命名空间的实例,若您使用的实例无命名空间,请勿将客户端版本升级到C++ SDK v3.x.x。

5.x版本实例默认都有命名空间,4.x版本实例可在消息队列RocketMQ版控制台实例详情页面的基础信息区域查看是否有命名空间。

版本信息

发布时间版本号下载链接
2021-10-18v3.x.xons-client-cpp

环境要求

ONS-Client-CPP是基于Apache RocketMQ 5.0协议原生实现的开源客户端开发软件包。Apache RocketMQ 5.0新的通信协议基于gRPC(HTTP 2.0/Protobuf)实现,因此v3.0.0版本的C++ SDK也需要依赖于grpc/grpc实现,满足以下依赖和工具链的要求。

依赖

依赖版本
grpc/grpc1.39.0
fmt8.0.1
spdlog1.9.2
filesystem1.5.0
asio1.18.2
cpp_httplib0.9.4
protobuf3.17.2

工具链

操作系统工具链版本
Linux、macOSGCC 4.9或以上版本、Clang 3.4或以上版本
Windows 7或以上版本Visual Studio 2015或以上版本

C++ Standard

SDK使用了C++ 11语法,因此需要启用C++ 11或以上标准。

编译说明

开源代码编译说明

  1. 参考Bazel安装指南安装Bazel工具。
    说明 使用Bazel 4.x版本,需要安装Python 3.x.x
  2. 下载开源代码并解压。可通过以下两种方式下载:
    • 通过GitHub克隆源代码:执行git clone https://github.com/aliyun-mq/ons-client-cpp.git命令。
    • 本地下载:下载链接请参见版本信息
  3. 在项目文件夹内执行以下命令,Bazel将自动下载所有第三方依赖。
    bazel -c opt //dist/...
    输出示例如下:
    INFO: From Action dist/libons_library.pic.a:
    starting to run shell
    INFO: Elapsed time: 39.480s, Critical Path: 38.89s
    INFO: 2044 processes: 1796 remote cache hit, 241 internal, 7 processwrapper-sandbox.
    INFO: Build completed successfully, 2044 total actions
    编译完成后,合并好的静态库在bazel-bin/dist/ons-dist.tar.gz文件内。
    root@a36849cf2f24:~/ons-client-cpp# ls -lah bazel-bin/dist/ons-dist.tar.gz 
    -r-xr-xr-x 1 root root 15M Oct 14 08:03 bazel-bin/dist/ons-dist.tar.gz

CentOS 7编译说明

说明 CentOS 7.x默认安装GCC 4.8.5,不满足工具链的要求。因此您需要安装devtoolset-4,devtoolset-4提供的工具链版本为GCC 5.3.1。
wget https://copr.fedorainfracloud.org/coprs/vbatts/bazel/repo/epel-7/vbatts-bazel-epel-7.repo
cp vbatts-bazel-epel-7.repo /etc/yum.repos.d/
yum install devtoolset-4-gcc devtoolset-4-gcc-c++ bazel4 python3 git -y
scl enable devtoolset-4 bash
unlink /usr/bin/python && ln -s /usr/bin/python3 /usr/bin/python
git clone git@github.com:aliyun-mq/ons-client-cpp.git
cd ons-client-cpp && bazel build //dist/...

功能变更

顺序消息

顺序消息的最大重试次数MaxReconsumeTimes参数的默认值从Integer.MAX变更为16次。超过最大重试次数消息还未被消费成功将直接被投递至死信队列。您可以通过自定义MaxReconsumeTimes参数值修改顺序消息的最大重试次数。

广播消费

广播消费模式下,支持使用offsetStore接口的方式定制消费者启动时的消费位点。若未设置,默认和历史版本一致直接从最新消费位点开始消费。

示例代码如下:
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>

#include "ons/MessageModel.h"
#include "ons/ONSFactory.h"

#include "rocketmq/Logger.h"

using namespace std;
using namespace ons;

std::mutex console_mtx;

class ExampleMessageListener : public MessageListener {
public:
  Action consume(const Message& message, ConsumeContext& context) noexcept override {
    std::lock_guard<std::mutex> lk(console_mtx);
    auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp();
    auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp();
    std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " << message.getMsgID()
              << ", Body-size: " << message.getBody().size()
              << ", Current - Store-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count()
              << "ms, Current - Born-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count()
              << "ms" << std::endl;
    return Action::CommitMessage;
  }
};

int main(int argc, char* argv[]) {
  auto& logger = rocketmq::getLogger();
  logger.setLevel(rocketmq::Level::Debug);
  logger.init();

  std::cout << "=======Before consuming messages=======" << std::endl;
  ONSFactoryProperty factory_property;
  
  //从OffsetStore读取消费位点的功能仅支持广播消费模式。
  factory_property.setMessageModel(ONS_NAMESPACE::MessageModel::BROADCASTING);

  factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard");

  PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property);

  const char* topic = "cpp_sdk_standard";
  const char* tag = "*";

  // register your own listener here to handle the messages received.
  auto* messageListener = new ExampleMessageListener();
  consumer->subscribe(topic, tag);
  consumer->registerMessageListener(messageListener);

  // Start this consumer
  consumer->start();

  // Keep main thread running until process finished.
  std::this_thread::sleep_for(std::chrono::minutes(15));

  consumer->shutdown();
  std::cout << "=======After consuming messages======" << std::endl;
  return 0;
}

Push消费

  • 如果设置的消费线程数不在合法区间[1,1000]内,系统会在创建消费者时抛出异常,而不是在启动消费者时抛出异常。
  • 新增消费速度限流功能。为了避免消息洪峰可能对消费端应用产生应冲击,您可通过该功能限制消息的消费速度,保护消费端应用。
    说明 顺序消息的消息重试不受限流控制。
    消费限流示例代码如下:
    #include <chrono>
    #include <iostream>
    #include <mutex>
    #include <thread>
    
    #include "ons/MessageModel.h"
    #include "ons/ONSFactory.h"
    
    #include "rocketmq/Logger.h"
    
    using namespace std;
    using namespace ons;
    
    std::mutex console_mtx;
    
    class ExampleMessageListener : public MessageListener {
    public:
      Action consume(const Message& message, ConsumeContext& context) noexcept override {
        std::lock_guard<std::mutex> lk(console_mtx);
        auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp();
        auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp();
        std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: " << message.getMsgID()
                  << ", Body-size: " << message.getBody().size() << ", Tag: " << message.getTag()
                  << ", Current - Store-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count()
                  << "ms, Current - Born-Time: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count()
                  << "ms" << std::endl;
        return Action::CommitMessage;
      }
    };
    
    int main(int argc, char* argv[]) {
      auto& logger = rocketmq::getLogger();
      logger.setLevel(rocketmq::Level::Debug);
      logger.init();
    
      const char* topic = "cpp_sdk_standard";
      const char* tag = "*";
    
      std::cout << "=======Before consuming messages=======" << std::endl;
      ONSFactoryProperty factory_property;
      factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard");
    
      // Client-side throttling.
      factory_property.throttle(topic, 16);
    
      PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property);
    
      // register your own listener here to handle the messages received.
      auto* messageListener = new ExampleMessageListener();
      consumer->subscribe(topic, tag);
      consumer->registerMessageListener(messageListener);
    
      // Start this consumer.
      consumer->start();
    
      // Keep main thread running until process finished.
      std::this_thread::sleep_for(std::chrono::minutes(15));
    
      consumer->shutdown();
      std::cout << "=======After consuming messages======" << std::endl;
      return 0;
    }

消息轨迹

参数说明
AccessKey您的阿里云账号或RAM用户的AccessKey ID,用于标识用户。当您通过SDK或API调用消息队列RocketMQ版资源时,需要使用AccessKey ID进行身份验证。
到达Server消息到达消息队列RocketMQ版服务端的时间。
预设DeliverAt定时消息的预计投递时间。
实际AvailableAt定时消息定时结束的时间。即消息可被消费者消费的开始时间。
Available Time消息可被消费者消费的开始时间。
提交/回滚时间事务消息提交或回滚的时间。
到达消费端消息到达消费者客户端的时间。
等待处理耗时消息到达消费者客户端,等待线程池分配线程和分配处理资源的耗时。

API接口变更

  • 日志默认路径由~/logs/rocketmqlogs/ons.log变更为~/logs/rocketmqlogs/ons.log
  • 枚举Action从全局命名空间移动到ons命令空间下。
  • 头文件都存放到/ons路径下。
  • Message#getStartDeliverTime参数返回值,从int64_t修改为std::chrono::system_clock::timepointstd::chrono::milliseconds
  • 删除了函数的throws声明,该声明在C++ 11标准下已不支持。
  • Producer类提供noexcept接口,用于禁用异常场景使用。
  • 枚举类型都转变为namespace enum,即enum class Type

SDK常见问题

  • 在同一个进程内,新旧版本的SDK是否可以共存?会不会有符号冲突?

    新版本SDK的预编译静态库,符号在默认的命名空间ons下,会与历史版本的符号冲突。若要实现同进程两个版本兼容,您可以自行从源码编译,只要保证在编译过程中,定义ONS_NAMESPACE这个宏为非ons的值即可。

    Bazel提供多种方式定义宏,可以通过.bazelrc、cc_library规则中的defines属性定义或cc_library#copts属性定义。

  • 调试代码时,怎么编译一个带符号表的静态库?
    使用以下命令编译:
    bazel -c dbg //dist/...
    更多个性化编译选项,请参见Bazel使用说明
  • 我已经使用了Protobuf依赖,跟你们要求的依赖版本不一致,怎么解决依赖冲突?

    ONS-Client-CPP对第三方采用源码依赖的形式,您只需要调整ONS-Client-CPP项目的依赖版本与您自身的依赖一致即可。

    ONS-Client-CPP依赖了RocketMQ-Client-CPP,您需要fork apache/rocketmq-client-cpp仓库,将依赖描述文件ons-cilent-cpp/bazel/deps.bzl中的依赖指向地址修改为fork的地址。

  • 本地使用了HTTP代理,并声明了http_proxygrpc_proxy等环境变量,发现发送消息有很多超时,是什么原因?

    SDK基于gRPC实现,支持http_proxy、https_proxy和grpc_proxy等方式配置代理。如果不需要proxy,可以配置no_grpc_proxy或no_proxy环境变量。忽略代理站点,请参见gRPC环境变量列表说明

  • 我们项目使用的是C++ 98、C++ 03标准,能支持么?

    不能。由于gRPC和Protobuf是目前项目的核心协议和依赖,本项目也依据这两个依赖的标准,不再支持C++ 98和C++ 03标准。

示例工程

ons-client-cpp-demo