Topik ini menjelaskan catatan rilis untuk TCP Client SDK untuk C++ V3.x.x. Catatan mencakup penggunaan, informasi versi, persyaratan lingkungan, instruksi kompilasi, dan perubahan fitur.
Catatan Penggunaan
TCP Client SDK untuk C++ V3.x.x hanya dapat digunakan untuk mengakses instance yang memiliki namespace. Jika instance Anda tidak memiliki namespace, jangan tingkatkan ke TCP Client SDK untuk C++ V3.x.x.
Secara default, semua instance ApsaraMQ for RocketMQ 5.x memiliki namespace. Untuk instance ApsaraMQ for RocketMQ 4.x, Anda dapat memeriksa apakah namespace tersedia di bagian Basic Information pada halaman Instance Details di ApsaraMQ for RocketMQ console.
Informasi Versi
Tanggal Rilis | Versi | Tautan Unduhan |
2021-10-18 | v3.x.x |
Persyaratan Lingkungan
ONS-Client-CPP adalah SDK klien open source asli dari Apache RocketMQ 5.0. Apache RocketMQ 5.0 menggunakan kerangka kerja gRPC, yang dikembangkan berdasarkan HTTP 2.0 dan Protobuf. Oleh karena itu, gRPC diperlukan untuk SDK C++ V3.0.0. Tabel berikut menjelaskan persyaratan versi untuk dependensi dan toolchain.
Dependensi
Dependensi | Versi |
grpc/grpc | 1.39.0 |
fmt | 8.0.1 |
spdlog | 1.9.2 |
filesystem | 1.5.0 |
asio | 1.18.2 |
cpp_httplib | 0.9.4 |
protobuf | 3.17.2 |
Toolchain
Sistem Operasi | Versi Toolchain |
Linux atau macOS | GNU Compiler Collection (GCC) 4.9 atau lebih baru dan Clang 3.4 atau lebih baru |
Windows 7 atau lebih baru | Visual Studio 2015 atau lebih baru |
Standar C++
SDK menggunakan pustaka standar C++ 11. Diperlukan C++ 11 atau lebih baru.
Instruksi Kompilasi
Instruksi Kompilasi untuk Kode Sumber Terbuka
Instal Bazel. Untuk informasi lebih lanjut, lihat Menginstal Bazel.
CatatanPython 3.x.x diperlukan untuk Bazel 4.x.
Unduh dan ekstrak kode sumber terbuka. Anda dapat mengunduh kode dengan salah satu metode berikut:
Jalankan perintah
git clone https://github.com/aliyun-mq/ons-client-cpp.gituntuk mengkloning kode sumber.Unduh kode ke mesin lokal Anda dari tautan di Informasi Versi.
Jalankan perintah berikut di folder proyek. Bazel secara otomatis mengunduh semua dependensi pihak ketiga.
bazel -c opt //dist/...Contoh keluaran:
INFO: From Action dist/libons_library.pic.a: starting to run shell INFO: Elapsed time: 39.480s, Critical Path: 38.89s INFO: 2044 processes: 1796 remote cache hit, 241 internal, 7 processwrapper-sandbox. INFO: Build completed successfully, 2044 total actionsSetelah kompilasi selesai, pustaka statis yang berisi semua objek gabungan disimpan dalam file bazel-bin/dist/ons-dist.tar.gz.
root@a36849cf2f24:~/ons-client-cpp# ls -lah bazel-bin/dist/ons-dist.tar.gz -r-xr-xr-x 1 root root 15M Oct 14 08:03 bazel-bin/dist/ons-dist.tar.gz
Instruksi Kompilasi untuk CentOS 7
Secara default, GCC 4.8.5 diinstal di CentOS 7.x. Versi GCC ini tidak memenuhi persyaratan toolchain. Anda harus menginstal devtoolset-4 yang menyediakan GCC 5.3.1.
wget https://copr.fedorainfracloud.org/coprs/vbatts/bazel/repo/epel-7/vbatts-bazel-epel-7.repo
cp vbatts-bazel-epel-7.repo /etc/yum.repos.d/
yum install devtoolset-4-gcc devtoolset-4-gcc-c++ bazel4 python3 git -y
scl enable devtoolset-4 bash
unlink /usr/bin/python && ln -s /usr/bin/python3 /usr/bin/python
git clone git@github.com:aliyun-mq/ons-client-cpp.git
cd ons-client-cpp && bazel build //dist/...Perubahan Fitur
Pesan Terurut
Nilai default parameter MaxReconsumeTimes diubah dari Integer.MAX menjadi 16. Parameter ini menentukan jumlah maksimum percobaan ulang untuk pesan terurut. Jika konsumen gagal mengonsumsi pesan setelah jumlah maksimum percobaan ulang tercapai, pesan tersebut dikirim ke antrian pesan gagal. Anda dapat menentukan nilai kustom untuk parameter MaxReconsumeTimes.
Konsumsi Siaran
Dalam mode konsumsi siaran, operasi offsetStore didukung. Anda dapat memanggil operasi ini untuk menentukan offset konsumen dari mana konsumen mulai mengonsumsi pesan. Jika Anda tidak menentukan offset konsumen, konsumen akan memulai konsumsi pesan dari offset konsumen terbaru. Ini sesuai dengan versi sebelumnya.
Contoh kode:
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
#include "ons/MessageModel.h"
#include "ons/ONSFactory.h"
#include "rocketmq/Logger.h"
using namespace std;
using namespace ons;
std::mutex console_mtx;
class ExampleMessageListener : public MessageListener {
public:
Action consume(const Message& message, ConsumeContext& context) noexcept override {
std::lock_guard<std::mutex> lk(console_mtx);
auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp();
auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp();
std::cout << "Menerima pesan. Topik: " << message.getTopic() << ", MsgId: " << message.getMsgID()
<< ", Ukuran-Badan: " << message.getBody().size()
<< ", Saat Ini - Waktu Penyimpanan: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count()
<< "ms, Saat Ini - Waktu Pembuatan: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count()
<< "ms" << std::endl;
return Action::CommitMessage;
}
};
int main(int argc, char* argv[]) {
auto& logger = rocketmq::getLogger();
logger.setLevel(rocketmq::Level::Debug);
logger.init();
std::cout << "=======Sebelum mengonsumsi pesan=======" << std::endl;
ONSFactoryProperty factory_property;
// Dapatkan offset konsumen dengan memanggil operasi offsetStore. Operasi ini hanya didukung dalam mode konsumsi siaran.
factory_property.setMessageModel(ONS_NAMESPACE::MessageModel::BROADCASTING);
factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard");
PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property);
const char* topic = "cpp_sdk_standard";
const char* tag = "*";
// daftarkan pendengar Anda sendiri di sini untuk menangani pesan yang diterima.
auto* messageListener = new ExampleMessageListener();
consumer->subscribe(topic, tag);
consumer->registerMessageListener(messageListener);
// Mulai konsumen ini
consumer->start();
// Pertahankan utas utama tetap berjalan hingga proses selesai.
std::this_thread::sleep_for(std::chrono::minutes(15));
consumer->shutdown();
std::cout << "=======Setelah mengonsumsi pesan======" << std::endl;
return 0;
}Mode Dorong
Jika jumlah utas konsumsi yang ditentukan tidak berada dalam rentang valid 1 hingga 1000, pengecualian akan dilemparkan saat sistem mencoba membuat konsumen. Jenis pengecualian ini tidak dilemparkan saat sistem mencoba memulai konsumen.
Fitur pembatasan konsumsi didukung. Anda dapat mengonfigurasi fitur pembatasan konsumsi untuk membatasi laju konsumsi pesan. Ini membantu mencegah pengecualian aplikasi yang disebabkan oleh lonjakan pesan mendadak pada klien konsumen.
CatatanFitur pembatasan konsumsi tidak berlaku untuk percobaan ulang pesan terurut.
Contoh kode berikut menunjukkan cara mengonfigurasi fitur pembatasan konsumsi:
#include <chrono> #include <iostream> #include <mutex> #include <thread> #include "ons/MessageModel.h" #include "ons/ONSFactory.h" #include "rocketmq/Logger.h" using namespace std; using namespace ons; std::mutex console_mtx; class ExampleMessageListener : public MessageListener { public: Action consume(const Message& message, ConsumeContext& context) noexcept override { std::lock_guard<std::mutex> lk(console_mtx); auto latency = std::chrono::system_clock::now() - message.getStoreTimestamp(); auto latency2 = std::chrono::system_clock::now() - message.getBornTimestamp(); std::cout << "Menerima pesan. Topik: " << message.getTopic() << ", MsgId: " << message.getMsgID() << ", Ukuran-Badan: " << message.getBody().size() << ", Tag: " << message.getTag() << ", Saat Ini - Waktu Penyimpanan: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency).count() << "ms, Saat Ini - Waktu Pembuatan: " << std::chrono::duration_cast<std::chrono::milliseconds>(latency2).count() << "ms" << std::endl; return Action::CommitMessage; } }; int main(int argc, char* argv[]) { auto& logger = rocketmq::getLogger(); logger.setLevel(rocketmq::Level::Debug); logger.init(); const char* topic = "cpp_sdk_standard"; const char* tag = "*"; std::cout << "=======Sebelum mengonsumsi pesan=======" << std::endl; ONSFactoryProperty factory_property; factory_property.setFactoryProperty(ons::ONSFactoryProperty::GroupId, "GID_cpp_sdk_standard"); // Pembatasan sisi klien. factory_property.throttle(topic, 16); PushConsumer* consumer = ONSFactory::getInstance()->createPushConsumer(factory_property); // daftarkan pendengar Anda sendiri di sini untuk menangani pesan yang diterima. auto* messageListener = new ExampleMessageListener(); consumer->subscribe(topic, tag); consumer->registerMessageListener(messageListener); // Mulai konsumen ini. consumer->start(); // Pertahankan utas utama tetap berjalan hingga proses selesai. std::this_thread::sleep_for(std::chrono::minutes(15)); consumer->shutdown(); std::cout << "=======Setelah mengonsumsi pesan======" << std::endl; return 0; }
Jejak Pesan
Parameter | Deskripsi |
AccessKey | ID AccessKey akun Alibaba Cloud Anda atau Pengguna Resource Access Management (RAM). ID AccessKey digunakan untuk memverifikasi identitas pengguna. Saat Anda menggunakan SDK atau memanggil operasi API untuk mendapatkan sumber daya ApsaraMQ for RocketMQ, ID AccessKey diperlukan untuk autentikasi. |
ReachServer | Waktu saat pesan tiba di broker ApsaraMQ for RocketMQ. |
PresetDeliverAt | Titik waktu yang dijadwalkan saat pesan terjadwal akan dikirimkan. |
ActualAvailableAt | Waktu saat pesan terjadwal dikirimkan. Nilai parameter ini menunjukkan waktu saat pesan terjadwal siap untuk dikonsumsi. |
Available Time | Waktu saat pesan siap untuk dikonsumsi. |
Commit/RollbackTime | Waktu saat pesan transaksional dikomit atau dibatalkan. |
Arrive at Consumer At | Waktu saat pesan tiba di klien konsumen. |
Wait Duration before Processing | Durasi tunggu antara waktu saat pesan tiba di klien konsumen dan waktu saat kolam utas mengalokasikan utas dan sumber daya pemrosesan untuk pesan tersebut. |
Perubahan dalam Operasi API
Path log default diubah dari ~/logs/rocketmqlogs/ons.log menjadi ~/logs/rocketmqlogs/ons.log.
Kelas enum Action dipindahkan dari namespace global ke namespace ons.
File header disimpan di path /ons.
Nilai kembali dari Message#getStartDeliverTime diubah dari int64_t menjadi std::chrono::system_clock::timepoint atau std::chrono::milliseconds.
Deklarasi throws untuk fungsi dihapus karena deklarasi throws tidak lagi didukung di C++ 11.
Kelas
Producermenyediakan operasinoexcept. Anda dapat memanggil operasi ini untuk menonaktifkan pengecualian agar tidak dilemparkan.Tipe enumerasi diubah menjadi
namespace enum, yaitu,enum class Type.
FAQ
Bisakah saya menggunakan SDK versi terbaru bersama dengan SDK versi sebelumnya dalam proses yang sama? Apakah simbol-simbolnya bertentangan?
Simbol untuk pustaka statis pra-kompilasi SDK terbaru ada di namespace ons, yang merupakan namespace default. Simbol-simbol ini bertentangan dengan simbol yang digunakan dalam versi SDK sebelumnya. Anda dapat mengkompilasi SDK Anda dari kode sumber dan memastikan bahwa nilai makro
ONS_NAMESPACEbukan ons. Dengan cara ini, Anda dapat menggunakan SDK Anda bersama dengan versi sebelumnya dalam proses yang sama.Bazel menyediakan beberapa metode untuk mendefinisikan makro. Misalnya, Anda dapat menggunakan .bazelrc, properti defines dalam aturan cc_library, dan properti
cc_library#coptsuntuk mendefinisikan makro.Bagaimana cara mengkompilasi pustaka statis yang berisi tabel simbol saat saya men-debug kode?
Jalankan perintah berikut:
bazel -c dbg //dist/...Untuk informasi tentang opsi kompilasi, lihat Panduan Pengguna Bazel.
Bagaimana cara menyelesaikan konflik dependensi jika dependensi Protobuf saya tidak memenuhi persyaratan versi?
ONS-Client-CPP menggunakan kode sumber dependensi pihak ketiga. Pastikan versi dependensi ONS-Client-CPP konsisten dengan dependensi Anda.
ONS-Client-CPP bergantung pada RocketMQ-Client-CPP. Anda harus membuat fork repositori apache/rocketmq-client-cpp, dan mengubah URL dependensi di ons-cilent-cpp/bazel/deps.bzl menjadi URL repositori yang difork.
Mengapa sejumlah besar pesan keluar melewati batas waktu setelah saya mengaktifkan proxy HTTP di mesin lokal saya dan mendeklarasikan variabel lingkungan seperti
http_proxydangrpc_proxy?SDK berbasis gRPC mendukung proxy seperti http_proxy, https_proxy, dan grpc_proxy. Jika tidak diperlukan proxy, Anda dapat mengonfigurasi variabel lingkungan no_grpc_proxy atau no_proxy untuk mengabaikan situs proxy. Untuk informasi lebih lanjut, lihat Variabel Lingkungan gRPC.
Apakah SDK mendukung standar C++ 98 dan C++ 03?
Tidak, SDK tidak mendukung standar C++ 98 atau C++ 03. SDK menggunakan gRPC sebagai protokol inti dan Protobuf sebagai dependensi utama. Protokol gRPC dan Protobuf tidak mendukung standar C++ 98 atau C++ 03.