Installation
Prerequisites
DataHub SDK for C++ requires GCC 4.9.2 for compilation. Verify that your build environment meets this requirement before installation.
Download the SDK
Initialization
To access DataHub, provide your AccessKey ID, AccessKey secret, and an endpoint. The following example creates a DataHub client using an endpoint. Available endpoints are listed in Endpoints.
/* Configuration */
Account account;
account.id = "";
account.key = "=";
std::string projectName = "test_project";
std::string topicName = "test_cpp";
std::string comment = "test";
std::string endpoint = "";
Configuration conf(account, endpoint);
/* Datahub Client */
DatahubClient client(conf);
Manage projects
A project is the basic unit for managing data in DataHub and contains multiple topics. DataHub projects are independent of MaxCompute projects and must be created separately.
Create a project
void CreateProject()
{
std::string projectName = "";
std::string comment = "";
try
{
client.CreateProject(projectName, comment);
}
catch(const DatahubException& e)
{
std::cerr << "Create project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Delete a project
void DeleteProject()
{
std::string projectName = "";
try
{
client.DeleteProject(projectName);
}
catch(const DatahubException& e)
{
std::cerr << "Delete project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Update a project
void UpdateProject()
{
std::string projectName = "";
std::string comment = "";
try
{
client.UpdateProject(projectName,comment);
}
catch(const DatahubException& e)
{
std::cerr << "Update project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Query projects
void ListProject()
{
try
{
const ListProjectResult& listProjectResult = client.ListProject();
std::cout<<listProjectResult.GetProjectNames().size()<<std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "List project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Query a project
void GetProject()
{
std::string projectName = "";
try
{
const GetProjectResult& projectResult = client.GetProject(projectName);
std::cout<<projectResult.GetProject()<<std::endl;
std::cout<<projectResult.GetComment()<<std::endl;
std::cout<<projectResult.GetCreator()<<std::endl;
std::cout<<projectResult.GetCreateTime()<<std::endl;
std::cout<<projectResult.GetLastModifyTime()<<std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "get project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Manage topics
A topic is the smallest unit for data subscription and publishing in DataHub. Topics distinguish different types of streaming data. Two topic types are supported: tuple and blob.
-
You can write a block of binary data as a record of a blob topic.
-
Tuple topics contain records similar to database rows, each with multiple columns. Record schemas are required because data is transmitted as strings and must be converted to the correct types. The following table lists the supported data types.
|
Type |
Description |
Value range |
|
BIGINT |
An eight-byte signed integer. |
-9223372036854775807 to 9223372036854775807 |
|
DOUBLE |
Eight-byte double-precision floating-point number. |
-1.0 _10^308 to 1.0 _10^308 |
|
BOOLEAN |
Boolean value. |
True and False, true and false, or 0 and 1. |
|
TIMESTAMP |
Timestamp value. |
Microsecond precision. |
|
STRING |
UTF-8 encoded string. |
Maximum 2 MB per column. |
|
TINYINT |
A single-byte integer. |
-128 to 127 |
|
SMALLINT |
A double-byte integer. |
-32768 to 32767 |
|
INTEGER |
A four-byte integer. |
-2147483648 to 2147483647 |
|
FLOAT |
A four-byte single-precision floating-point number. |
-3.40292347_10^38 to 3.40292347_10^38 |
Create a tuple topic
void CreateTupleTopic(){
RecordSchema schema;
std::string fieldName1 = "a";
std::string fieldName2 = "b";
std::string fieldName3 = "c";
std::string fieldComment1 = "field1 comment";
std::string fieldComment2 = "field2 comment";
std::string fieldComment3 = "field3 comment";
schema.AddField(Field(fieldName1/*Fieldname*/, BIGINT, true, fieldComment1));
schema.AddField(Field(fieldName2/*Fieldname*/, DOUBLE, true, fieldComment2));
schema.AddField(Field(fieldName3/*Fieldname*/, STRING, true, fieldComment3));
/* Create Topic */
int shardCount = 3;
int lifeCycle = 7;
RecordType type = TUPLE;
std::string projectName = "";
std::string topicName = "";
try
{
client.CreateTopic(projectName, topicName, shardCount, lifeCycle, type, schema, comment);
}
catch (const DatahubException& e)
{
std::cerr << "Create topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Create a blob topic
void CreateBlobTopic()
{
/* Create Topic */
int shardCount = 3;
int lifeCycle = 7;
RecordType type = BLOB;
std::string projectName = "";
std::string topicName = "";
try
{
client.CreateTopic(projectName, topicName, shardCount, lifeCycle, type, comment);
}
catch (const DatahubException& e)
{
std::cerr << "Create topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
}
Delete a topic
void DeleteTopic()
{
std::string projectName = "";
std::string topicName = "";
try
{
client.DeleteTopic(projectName, topicName);
}
catch(const DatahubException& e)
{
std::cerr << "Delete topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Query topics
void ListTopic(){
std::string projectName = "";
try
{
const ListTopicResult& listTopicResult = client.ListTopic(projectName);
std::cout<<listTopicResult.GetTopicNames().size()<<std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Get topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Update a topic
void UpdateTopic(){
const std::string updateComment = "test1";
int updateLifecycle = 7;
std::string projectName = "";
std::string topicName = "";
try
{
client.UpdateTopic(projectName, topicName, updateLifecycle, updateProjectComment);
}
catch(const DatahubException& e)
{
std::cerr << "Update topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Query a topic
void GetTopic(){
std::string projectName = "";
std::string topicName = "";
try
{
const GetTopicResult& getTopicResult = client.GetTopic(projectName, topicName);
cout<<getTopicResult.GetComment()<<endl;
}
catch(const DatahubException& e)
{
std::cerr << "Get topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Manage shards
Shards are concurrent tunnels for data transmission within a topic. Each shard has an ID and a state: Opening (starting up) or Active (ready to serve). Each active shard consumes server resources, so create only the shards you need.
List shards
void ListShard()
{
try
{
const ListShardResult& lsr = client.ListShard(projectName, topicName);
std::cout<<lsr.GetShards().size()<<std::endl;
std::vector<ShardEntry> shards = lsr.GetShards();
std::vector<std::string> shardIds;
for (size_t i = 0; i < shards.size(); ++i)
{
ShardEntry shardEntry = shards[i];
shardIds.push_back(shardEntry.GetShardId());
}
}
catch(const DatahubException& e)
{
std::cerr << "List shard fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Split a shard
void SplitShard()
{
std::string projectName = "";
std::string topicName = "";
try
{
const SplitShardResult& ssr = client.SplitShard(projectName, topicName, "0", "00000000000000000000000000AAAAAA");
std::cout<<ssr.GetChildShards().size()<<std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Split shardId fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Merge shards
void MergeShard()
{
std::string projectName = "";
std::string topicName = "";
try
{
const MergeShardResult& msr = client.MergeShard(projectName, topicName, "0", "1");
std::cout<<msr.GetChildShard().GetShardId()<<std::endl;
std::cout<<msr.GetChildShard().GetBeginHashKey()<<std::endl;
std::cout<<msr.GetChildShard().GetEndHashKey()<<std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Merge sahrd fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Read and write data
You can read data from both active and closed shards, but you can write data only to active shards.
Read data
-
Obtain a cursor
To read data from a topic, specify a shard and a cursor position. Obtain the cursor using one of the following methods:
-
OLDEST: points to the earliest valid record in the shard.
-
LATEST: points to the latest record in the shard.
-
SEQUENCE: points to the record with the specified sequence number.
-
SYSTEM_TIME: points to the first record with a timestamp greater than or equal to the specified value.
void GetCursor()
{
try
{
const GetCursorResult& r1 = client.GetCursor(projectName, topicName, "0", CURSOR_TYPE_OLDEST);
std::string cursor = r1.GetCursor();
}
catch(const DatahubException& e)
{
std::cerr << "Get cursor fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
-
Read data
void sub(DatahubClient& client)
{
std::string projectName = "";
std::string topicName = "";
std::string shardId = "0";
std::string cursor;
RecordSchema schema;
OpenSubscriptionOffsetSessionResult osor = client.InitSubscriptionOffsetSession(projectName, topicName, subId, {shardId});
SubscriptionOffset subscription = osor.GetOffsets().at(shardId);
if (subscription.GetSequence() < 0)
{
cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_OLDEST).GetCursor();
}
else
{
int64_t nextSequence = subscription.GetSequence() + 1;
try
{
cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_SEQUENCE, nextSequence).GetCursor();
}
catch (const DatahubException& e)
{
cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_OLDEST).GetCursor();
}
}
int64_t readTotal = 0;
int fetchNum = 10;
while (true)
{
try
{
GetRecordResult grr = client.GetRecord(projectName, topicName, shardId, cursor, fetchNum, subId);
if (grr.GetRecordCount() <= 0)
{
std::cout << "Read null, wait for 1s." << std::endl;
sleep(1);
continue;
}
for (auto recordResult : grr.GetRecords())
{
ProcessRecords(recordResult);
if (++readTotal % 1000 == 0)
{
subscription.SetSequence(recordResult.GetSequence());
subscription.SetTimestamp(recordResult.GetSystemTime());
std::map<std::string, SubscriptionOffset> offsets;
offsets[shardId] = subscription;
try
{
client.UpdateSubscriptionOffset(projectName, topicName, subId, offsets);
}
catch (const DatahubException& e)
{
std::cerr << "Update subscription offset fail. requestId: "<< e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
throw ;
}
}
}
cursor = grr.GetNextCursor();
}
catch (const DatahubException& e)
{
std::cerr << "Get record fail. requestId: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
inline void ProcessRecords(const RecordResult& record)
{
if (schema.GetFieldCount() == 0) // BLOB
{
int len = 0;
const char* data = record.GetData(len);
printf("%s\n", data);
}
else // TUPLE
{
for (int j = 0; j < schema.GetFieldCount(); ++j)
{
const Field &field = schema.GetField(j);
const FieldType fieldType = field.GetFieldType();
switch (fieldType)
{
case BIGINT:
printf("%ld\n", record.GetBigint(j));
break;
case DOUBLE:
printf("%.15lf\n", record.GetDouble(j));
break;
case STRING:
printf("%s\n", record.GetString(j).c_str());
break;
default:
break;
}
}
}
}
}
Collaborative consumption example
using namespace aliyun;
using namespace aliyun::datahub;
std::atomic_int gTotalNum(0);
Account gAccount;
std::string gEndpoint;
std::string gProjectName;
std::string gTopicName;
std::string gSubId;
bool gAutoAck;
int32_t gFetchNum;
int64_t gSessionTimeoutMs;
void Usage(const char* prog)
{
const char *start = strrchr(prog, '/');
start = (start != NULL) ? start + 1 : prog;
fprintf(stdout,
"%s Ver %s\n"
"Usage: <programme> [options...]\n"
" -i --accessid <string> set access id [Mandatory]\n"
" -k --accesskey <string> set access key [Mandatory]\n"
" -e --endpoint <string> set endpoint [Mandatory]\n"
" -p --project <string> set project [Mandatory]\n"
" -t --topic <string> set topic [Mandatory]\n"
" -s --subId <string> set subscription id [Mandatory]\n"
" -a --autoAck <bool> set auto ack offset\n"
" -f --fetchNum <int> set fetch limit num\n"
" -S --sessionTimeoutMs <int> set session timeout (ms)\n"
" -v --version show version\n"
" -h --help show help message\n", start, PROG_VERSION);
}
void parse_args(int argc, char* argv[])
{
int opt_id;
static struct option long_opts[] =
{
{ "accessid", 1, 0, 'i'},
{ "accesskey", 1, 0, 'k'},
{ "endpoint", 1, 0, 'e' },
{ "project", 1, 0, 'p' },
{ "topic", 1, 0, 't' },
{ "subId", 1, 0, 's' },
{ "autoAck", 1, 0, 'a'},
{ "fetchNum", 1, 0, 'f'},
{ "sessionTimeoutMs", 1, 0, 'S'},
{ "version", 0, 0, 'v' },
{ "help", 0, 0, 'h' },
{ NULL, 0, 0, 0 }
};
while (1)
{
int ret = getopt_long(argc, argv, "i:k:e:p:t:s:a:f:S:vh", long_opts, &opt_id);
if (ret == -1)
{
break;
}
switch (ret)
{
case 'i':
gAccount.id = optarg;
break;
case 'k':
gAccount.key = optarg;
break;
case 'e':
gEndpoint = optarg;
break;
case 'p':
gProjectName = optarg;
break;
case 't':
gTopicName = optarg;
break;
case 's':
gSubId = optarg;
break;
case 'a':
gAutoAck = (bool)std::atoi(optarg);
break;
case 'f':
gFetchNum = std::atoi(optarg);
break;
case 'S':
gSessionTimeoutMs = (int64_t)std::atoi(optarg);
break;
case 'v':
case 'h':
Usage(argv[0]);
exit(0);
break;
default:
break;
}
}
}
inline void ProcessRecords(const TopicMetaPtr& topicMeta, const RecordResultPtr& recordPtr)
{
const RecordSchema& schema = topicMeta->GetRecordSchema();
if (schema.GetFieldCount() == 0) // BLOB
{
int len = 0;
const char* data = recordPtr->GetData(len);
printf("%s\n", data);
}
else // TUPLE
{
for (int j = 0; j < schema.GetFieldCount(); ++j)
{
const Field& field = schema.GetField(j);
const FieldType fieldType = field.GetFieldType();
switch (fieldType)
{
case BIGINT:
printf("%ld ", recordPtr->GetBigint(j));
break;
case INTEGER:
printf("%d ", recordPtr->GetInteger(j));
break;
case SMALLINT:
printf("%hd ", recordPtr->GetSmallint(j));
break;
case TINYINT:
printf("%hhd ", recordPtr->GetTinyint(j));
break;
case DOUBLE:
printf("%.5lf ", recordPtr->GetDouble(j));
break;
case FLOAT:
printf("%.5f ", recordPtr->GetFloat(j));
break;
case STRING:
printf("%s ", recordPtr->GetString(j).c_str());
break;
case BOOLEAN:
printf("%s ", recordPtr->GetBoolean(j) ? "true" : "false");
break;
case TIMESTAMP:
printf("%ld ", recordPtr->GetTimestamp(j));
break;
case DECIMAL:
printf("%s ", recordPtr->GetDecimal(j).c_str());
break;
default:
break;
}
}
printf("\n");
}
}
void CollaborativeConsume(const std::string& project, const std::string& topic, const std::string& subId, const ConsumerConfiguration& conf)
{
DatahubConsumer consumer(project, topic, subId, conf);
const TopicMetaPtr& topicMeta = consumer.GetTopicMeta();
uint64_t readRecordNum = 0;
try{
while (true)
{
auto recordPtr = consumer.Read(60000);
if (recordPtr == nullptr)
{
break;
}
// Process data.
ProcessRecords(topicMeta, recordPtr);
if (!gAutoAck)
{
recordPtr->GetMessageKey()->Ack(); // If the value of auto_ack is set to false, the Ack function is executed after the data is processed.
}
readRecordNum++;
}
}
catch (const std::exception& e)
{
std::cerr << "Read fail: " << e.what() << std::endl;
}
std::cout << "Read " << readRecordNum << " records total" << std::endl;
}
int main(int argc, char* argv[])
{
parse_args(argc, argv);
ConsumerConfiguration consumerConf(gAccount, gEndpoint);
consumerConf.SetLogFilePath("./DatahubCollaborativeConsumer.log");
consumerConf.SetAutoAckOffset(gAutoAck);
if (gFetchNum > 0)
{
consumerConf.SetFetchLimitNum(gFetchNum);
}
if (gSessionTimeoutMs > 0)
{
consumerConf.SetSessionTimeout(gSessionTimeoutMs);
}
CollaborativeConsume(gProjectName, gTopicName, gSubId, consumerConf);
return 0;
}
Collaborative consumption requires SDK version 2.25.0 or later.
Write data
void pub(DatahubClient& client)
{
std::vector<RecordEntry> records;
GenerateRecords(records);
try
{
PutRecordResult prr = client.PutRecord(projectName, topicName, records);
if (prr.GetFailedRecordCount() > 0)
{
pubRetry(client, prr.GetFailedRecords(), 3);
}
}
catch (const DatahubException& e)
{
std::cerr << "Put records fail. requestId: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
void pubRetry(DatahubClient& client, const std::vector<RecordEntry>& records, int retryTimes)
{
if (retryTimes <= 0)
{
std::cout << "Put record retry fail. records size: " << records.size() << std::endl;
return ;
}
try
{
PutRecordResult prr = client.PutRecord(projectName, topicName, records);
if (prr.GetFailedRecordCount() > 0)
{
pubRetry(client, prr.GetFailedRecords(), retryTimes-1);
}
else
{
std::cout << "Put records retry success." << std::endl;
}
}
catch (const DatahubException& e)
{
std::cerr << "Put records fail when put retry. requestId: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
inline void GenerateRecords(RecordEntryVec& records)
{
for (int i = 0; i < 10; i++)
{
if (schema.GetFieldCount() == 0)
{
// BLOB
RecordEntry record = RecordEntry(BLOB);
record.SetData("test_blob_data" + std::to_string(i));
records.push_back(record);
}
else
{
// TUPLE
// RecordEntry record = RecordEntry(5);
// record.SetString(0, "field_1_" + std::to_string(i));
// record.SetBigint(1, 123456789l);
// record.SetDouble(2, 123.456d);
// record.SetDouble(3, 654.32100d);
// record.SetString(4, "field_2_" + std::to_string(i));
// records.push_back(record);
}
}
}
}
Asynchronous write example
using namespace aliyun;
using namespace aliyun::datahub;
Account gAccount;
std::string gEndpoint;
std::string gProjectName;
std::string gTopicName;
int gEpochNum = 10;
int gRecordNum = 10;
int64_t gMaxBufferRecords = 0l;
int64_t gMaxBufferSize = 0l;
int64_t gMaxBufferTimeMs = 0l;
int64_t gMaxRecordPackQueueLimit = 0l;
StringVec gShardIds;
inline void StringSplit(StringVec& tar, const std::string& src, const std::string& pre)
{
size_t start_pos = 0, end_pos = src.size();
size_t pos = src.find(pre, start_pos);
while (pos != std::string::npos)
{
std::string addStr = src.substr(start_pos, pos - start_pos);
if (!addStr.empty())
{
tar.push_back(addStr);
}
start_pos = pos + 1;
pos = src.find(pre, start_pos);
}
std::string addStr = src.substr(start_pos, end_pos - start_pos);
if (!addStr.empty())
{
tar.push_back(addStr);
}
}
void Usage(const char* prog)
{
const char *start = strrchr(prog, '/');
start = (start != NULL) ? start + 1 : prog;
fprintf(stdout,
"%s Ver %s\n"
"Usage: <programme> [options...]\n"
" -i --accessid <string> set access id [Mandatory]\n"
" -k --accesskey <string> set access key [Mandatory]\n"
" -e --endpoint <string> set endpoint [Mandatory]\n"
" -p --project <string> set project [Mandatory]\n"
" -t --topic <string> set topic [Mandatory]\n"
" -E --epochNum <int> set epoch num for write [Mandatory]\n"
" -N --recordNum <int> set record num for each epoch [Mandatory]\n"
" -R --maxBufferRecords <int> set max buffer record count\n"
" -S --maxBufferSize <int> set max buffer size\n"
" -M --maxBufferTimeMs <int> set max buffer time (ms)\n"
" -Q --maxRecordPackLimit <int> set max record pack queue limit\n"
" -H --shardIds <string> set shards (split by ',')\n"
" -v --version show version\n"
" -h --help show help message\n", start, PROG_VERSION);
}
void parse_args(int argc, char* argv[])
{
int opt_id;
static struct option long_opts[] =
{
{ "accessid", 1, 0, 'i'},
{ "accesskey", 1, 0, 'k'},
{ "endpoint", 1, 0, 'e' },
{ "project", 1, 0, 'p' },
{ "topic", 1, 0, 't' },
{ "epochNum", 1, 0, 'E'},
{ "recordNum", 1, 0, 'N'},
{ "maxBufferRecords", 1, 0, 'R'},
{ "mMaxBufferSize", 1, 0, 'S'},
{ "maxBufferTimeMs", 1, 0, 'M'},
{ "maxRecordPackQueueLimit", 1, 0, 'Q'},
{ "shardIds", 1, 0, 'H'},
{ "version", 0, 0, 'v' },
{ "help", 0, 0, 'h' },
{ NULL, 0, 0, 0 }
};
while (1)
{
int ret = getopt_long(argc, argv, "i:k:e:p:t:E:N:R:S:M:Q:H:vh", long_opts, &opt_id);
if (ret == -1)
{
break;
}
switch (ret)
{
case 'i':
gAccount.id = optarg;
break;
case 'k':
gAccount.key = optarg;
break;
case 'e':
gEndpoint = optarg;
break;
case 'p':
gProjectName = optarg;
break;
case 't':
gTopicName = optarg;
break;
case 'E':
gEpochNum = std::atoi(optarg);
break;
case 'N':
gRecordNum = std::atoi(optarg);
break;
case 'R':
gMaxBufferRecords = (int64_t)std::atoi(optarg);
break;
case 'S':
gMaxBufferSize = (int64_t)std::atoi(optarg);
break;
case 'M':
gMaxBufferTimeMs = (int64_t)std::atoi(optarg);
break;
case 'Q':
gMaxRecordPackQueueLimit = (int64_t)std::atoi(optarg);
break;
case 'H':
StringSplit(gShardIds, optarg, ",");
break;
case 'v':
case 'h':
Usage(argv[0]);
exit(0);
break;
default:
break;
}
}
if (gEpochNum <= 0 || gRecordNum <= 0)
{
std::cerr << "Invalid parameter!" << std::endl;
exit(1);
}
}
inline void GenerateTupleRecord(const RecordSchema& schema, RecordEntry& record)
{
for (int i = 0; i < schema.GetFieldCount(); i++)
{
const Field& field = schema.GetField(i);
const FieldType& type = field.GetFieldType();
switch (type)
{
case BIGINT:
record.SetBigint(i, 1234l);
break;
case DOUBLE:
record.SetDouble(i, 1.234);
break;
case BOOLEAN:
record.SetBoolean(i, true);
break;
case TIMESTAMP:
record.SetTimestamp(i, 1234l);
break;
case STRING:
record.SetString(i, "1234");
break;
case DECIMAL:
record.SetDecimal(i, "1234");
break;
case INTEGER:
record.SetInteger(i, (int32_t)1234);
break;
case FLOAT:
record.SetFloat(i, 1.234);
break;
case TINYINT:
record.SetTinyint(i, (int8_t)1234);
break;
case SMALLINT:
record.SetSmallint(i, (int16_t)1234);
break;
default:
break;
}
}
}
inline void GenerateRecords(TopicMetaPtr topicMeta, RecordEntryVec& records)
{
const RecordSchema& schema = topicMeta->GetRecordSchema();
for (int i = 0; i < gRecordNum; i++)
{
if (topicMeta->GetRecordType() == "TUPLE")
{
// TUPLE
records.emplace_back(schema.GetFieldCount());
GenerateTupleRecord(schema, records.back());
}
else
{
// BLOB
records.emplace_back(BLOB);
records.back().SetData("test_blob_data" + std::to_string(i));
}
}
}
void AsyncProduce(const std::string& project, const std::string& topic, const ProducerConfiguration& conf, const StringVec& shardIds)
{
DatahubProducer producer(project, topic, conf, shardIds);
const TopicMetaPtr& topicMeta = producer.GetTopicMeta();
std::vector<WriteResultFuturePtr> resultFutureVecs;
std::map<std::string, uint64_t> writeRecordNum;
for (const std::string& shardId : shardIds)
{
writeRecordNum[shardId] = 0;
}
try
{
// Generate data.
RecordEntryVec records;
GenerateRecords(topicMeta, records);
for (int i = 0; i < gEpochNum; i++)
{
auto result = producer.WriteAsync(records);
resultFutureVecs.push_back(result);
}
}
catch(const std::exception& e)
{
std::cerr << "WriteAsync fail: " << e.what() << std::endl;
}
producer.Flush();
for (auto it = resultFutureVecs.begin(); it != resultFutureVecs.end(); it++)
{
(*it)->wait();
try
{
WriteResultPtr result = (*it)->get();
writeRecordNum[result->GetShardId()] += gRecordNum;
}
catch (const std::exception& e)
{
std::cerr << "Write records fail: " << e.what() << std::endl;
}
}
for (auto it = writeRecordNum.begin(); it != writeRecordNum.end(); it++)
{
std::cout << "Write " << it->second << " records to shard " << it->first << std::endl;
}
}
int main(int argc, char *argv[])
{
parse_args(argc, argv);
ProducerConfiguration producerConf(gAccount, gEndpoint);
producerConf.SetEnableProtobuf(true);
producerConf.SetLogFilePath("./DatahubAsyncProducer.log");
if (gMaxBufferRecords > 0)
{
producerConf.SetMaxAsyncBufferRecords(gMaxBufferRecords);
}
if (gMaxBufferSize > 0)
{
producerConf.SetMaxAsyncBufferSize(gMaxBufferSize);
}
if (gMaxBufferTimeMs > 0)
{
producerConf.SetMaxAsyncBufferTimeMs(gMaxBufferTimeMs);
}
if (gMaxRecordPackQueueLimit > 0)
{
producerConf.SetMaxRecordPackQueueLimit(gMaxRecordPackQueueLimit);
}
AsyncProduce(gProjectName, gTopicName, producerConf, gShardIds);
return 0;
}
Synchronous write example
using namespace aliyun;
using namespace aliyun::datahub;
Account gAccount;
std::string gEndpoint;
std::string gProjectName;
std::string gTopicName;
int gEpochNum = 10;
int gRecordNum = 10;
StringVec gShardIds;
inline void StringSplit(StringVec& tar, const std::string& src, const std::string& pre)
{
size_t start_pos = 0, end_pos = src.size();
size_t pos = src.find(pre, start_pos);
while (pos != std::string::npos)
{
std::string addStr = src.substr(start_pos, pos - start_pos);
if (!addStr.empty())
{
tar.push_back(addStr);
}
start_pos = pos + 1;
pos = src.find(pre, start_pos);
}
std::string addStr = src.substr(start_pos, end_pos - start_pos);
if (!addStr.empty())
{
tar.push_back(addStr);
}
}
void Usage(const char* prog)
{
const char *start = strrchr(prog, '/');
start = (start != NULL) ? start + 1 : prog;
fprintf(stdout,
"%s Ver %s\n"
"Usage: <programme> [options...]\n"
" -i --accessid <string> set access id [Mandatory]\n"
" -k --accesskey <string> set access key [Mandatory]\n"
" -e --endpoint <string> set endpoint [Mandatory]\n"
" -p --project <string> set project [Mandatory]\n"
" -t --topic <string> set topic [Mandatory]\n"
" -E --epochNum <int> set epoch num for write [Mandatory]\n"
" -N --recordNum <int> set record num for each epoch [Mandatory]\n"
" -H --shardIds <string> set shards (split by ',')\n"
" -v --version show version\n"
" -h --help show help message\n", start, PROG_VERSION);
}
void parse_args(int argc, char* argv[])
{
int opt_id;
static struct option long_opts[] =
{
{ "accessid", 1, 0, 'i'},
{ "accesskey", 1, 0, 'k'},
{ "endpoint", 1, 0, 'e' },
{ "project", 1, 0, 'p' },
{ "topic", 1, 0, 't' },
{ "epochNum", 1, 0, 'E'},
{ "recordNum", 1, 0, 'N'},
{ "shardIds", 1, 0, 'H'},
{ "version", 0, 0, 'v' },
{ "help", 0, 0, 'h' },
{ NULL, 0, 0, 0 }
};
while (1)
{
int ret = getopt_long(argc, argv, "i:k:e:p:t:E:N:H:vh", long_opts, &opt_id);
if (ret == -1)
{
break;
}
switch (ret)
{
case 'i':
gAccount.id = optarg;
break;
case 'k':
gAccount.key = optarg;
break;
case 'e':
gEndpoint = optarg;
break;
case 'p':
gProjectName = optarg;
break;
case 't':
gTopicName = optarg;
break;
case 'E':
gEpochNum = std::atoi(optarg);
break;
case 'N':
gRecordNum = std::atoi(optarg);
break;
case 'H':
StringSplit(gShardIds, optarg, ",");
break;
case 'v':
case 'h':
Usage(argv[0]);
exit(0);
break;
default:
break;
}
}
if (gEpochNum <= 0 || gRecordNum <= 0)
{
std::cerr << "Invalid parameter!" << std::endl;
exit(1);
}
}
inline void GenerateTupleRecord(const RecordSchema& schema, RecordEntry& record)
{
for (int i = 0; i < schema.GetFieldCount(); i++)
{
const Field& field = schema.GetField(i);
const FieldType& type = field.GetFieldType();
switch (type)
{
case BIGINT:
record.SetBigint(i, 1234l);
break;
case DOUBLE:
record.SetDouble(i, 1.234);
break;
case BOOLEAN:
record.SetBoolean(i, true);
break;
case TIMESTAMP:
record.SetTimestamp(i, 1234l);
break;
case STRING:
record.SetString(i, "1234");
break;
case DECIMAL:
record.SetDecimal(i, "1234");
break;
case INTEGER:
record.SetInteger(i, (int32_t)1234);
break;
case FLOAT:
record.SetFloat(i, 1.234);
break;
case TINYINT:
record.SetTinyint(i, (int8_t)1234);
break;
case SMALLINT:
record.SetSmallint(i, (int16_t)1234);
break;
default:
break;
}
}
}
inline void GenerateRecords(const TopicMetaPtr& topicMeta, RecordEntryVec& records)
{
const RecordSchema& schema = topicMeta->GetRecordSchema();
for (int i = 0; i < gRecordNum; i++)
{
if (topicMeta->GetRecordType() == "TUPLE")
{
// TUPLE
records.emplace_back(schema.GetFieldCount());
GenerateTupleRecord(schema, records.back());
}
else
{
// BLOB
records.emplace_back(BLOB);
records.back().SetData("test_blob_data" + std::to_string(i));
}
}
}
void GeneralProduce(const std::string& project, const std::string& topic, const ProducerConfiguration& conf, const StringVec& shardIds)
{
DatahubProducer producer(project, topic, conf, shardIds);
const TopicMetaPtr& topicMeta = producer.GetTopicMeta();
std::map<std::string, uint64_t> writeRecordNum;
for (const std::string& shardId : shardIds)
{
writeRecordNum[shardId] = 0;
}
try
{
// Generate data.
RecordEntryVec records;
GenerateRecords(topicMeta, records);
for (int i = 0; i < gEpochNum; i++)
{
std::string shardId = producer.Write(records);
writeRecordNum[shardId] += records.size();
}
}
catch(const std::exception& e)
{
std::cerr << "Write fail: " << e.what() << std::endl;
}
for (auto it = writeRecordNum.begin(); it != writeRecordNum.end(); it++)
{
std::cout << "Write " << it->second << " records to shard " << it->first << std::endl;
}
}
int main(int argc, char *argv[])
{
parse_args(argc, argv);
ProducerConfiguration producerConf(gAccount, gEndpoint);
producerConf.SetEnableProtobuf(true);
producerConf.SetLogFilePath("./DatahubGeneralProducer.log");
GeneralProduce(gProjectName, gTopicName, producerConf, gShardIds);
return 0;
}
Query metering information
void GetMeter()
{
try
{
GetMeteringInfoResult GetMeteringInfo = Client.GetMeteringInfo(projectName, topicName, shardId);
}
catch(DatahubException e)
{
cerr << e << endl;
}
}
Asynchronous and synchronous write require SDK version 2.25.0 or later.
Manage subscriptions
DataHub provides highly available offset storage for subscriptions, allowing servers to persist consumption offsets with minimal configuration.
Create a subscription
void CreateSubscription()
{
std::string projectName = "";
std::string topicName = "";
try
{
const std::string subscriptionComment = "test_subscription";
const CreateSubscriptionResult& createSubscriptionResult = client.CreateSubscription(projectName, topicName, subscriptionComment);
}
catch(const DatahubException& e)
{
std::cerr << "Create sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Delete a subscription
void DeleteSubscription()
{
std::string projectName = "";
std::string topicName = "";
std::string subId = "";
try
{
client.DeleteSubscription(projectName, topicName, subId);
}
catch(const DatahubException& e)
{
std::cerr << "Delete sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Update a subscription
void UpdateSubscription()
{
try
{
const std::string updateSubscriptionComment = "test_subscription_1";
const std::string projectName = "";
const std::string topicName = "";
client.UpdateSubscription(projectName, topicName, subId, updateSubscriptionComment);
}
catch(const DatahubException& e)
{
std::cerr << "Upate sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
List subscriptions
void ListSubscription()
{
const std::string projectName = "";
const std::string topicName = "";
const std::string subId = "";
try
{
const ListSubscriptionResult& subscriptionResult = client.ListSubscription(projectName, topicName, 1, 20, subId));
std::cout << subscriptionResult.GetTotalCount() << std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "list sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Query a subscription
void GetSubscription()
{
try
{
const std::vector<SubscriptionEntry>& subscriptions = subscriptionResult.GetSubscriptions();
std::cout << subscriptions.size() << std::endl;
for (size_t i = 0; i < subscriptions.size(); ++i)
{
SubscriptionEntry entry = subscriptions[i];
std::cout << entry.GetSubId() << std::endl;
}
}
catch(const DatahubException& e)
{
std::cerr << "Get sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Update the status of a subscription
void UpdateSubscriptionState()
{
try
{
client.UpdateSubscriptionState(projectName, topicName, subId, SubscriptionState::OFFLINE);
std::cout << getSubscriptionResult.GetComment() << std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Update sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Manage offsets
A newly created subscription starts with no offsets. Use the following operations to manage offsets.
Initialize an offset
void InitSubscriptionOffsetSession()
{
const std::string projectName = "";
const std::string topicName = "";
const std::string subId = "";
std::vector<std::string> shardIds;
try
{
const OpenSubscriptionOffsetSessionResult& offsetSessionResult =
client.InitSubscriptionOffsetSession(projectName, topicName, subId, shardIds);
std::cout << offsetSessionResult.GetOffsets().size() << std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Init offset fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Obtain an offset
// Obtain an offset.
void GetSubscriptionOffset()
{
const std::string projectName = "";
const std::string topicName = "";
const std::string subId = "";
std::vector<std::string> shardIds;
try
{
const GetSubscriptionOffsetResult& getSubscriptionOffsetResult =
client.GetSubscriptionOffset(projectName, topicName, subId, shardIds;
std::cout << getSubscriptionOffsetResult.GetOffsets().size() << std::endl; std::cout << getSubscriptionOffsetResult.GetOffsets().size() << std::endl;
std::cout << getSubscriptionResult.GetComment() << std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Get offset fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Reset an offset
void ResetSubscriptionOffset()
{
try
{
int64_t resetTimestamp = 0l;
int64_t resetSequence = 0l;
uint32_t resetBatchIndex = 0u;
const std::string projectName = "";
const std::string topicName = "";
const std::string subId = "";
std::map<std::string, SubscriptionOffset> resetSubscriptionOffsets;
for (auto iter = offsets.begin(); iter != offsets.end(); ++iter)
{
SubscriptionOffset offset(resetTimestamp, resetSequence, resetBatchIndex);
resetSubscriptionOffsets.insert(
std::pair<std::string, SubscriptionOffset>(iter->first, offset));
}
client.ResetSubscriptionOffset(projectName, topicName, subId, resetSubscriptionOffsets); }
catch(const DatahubException& e)
{
std::cerr << "Reset offset fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Manage DataConnectors
DataConnectors synchronize streaming data from DataHub topics to other Alibaba Cloud services, including MaxCompute, Object Storage Service (OSS), ApsaraDB RDS for MySQL, Tablestore, Elasticsearch, and Function Compute, in real time or near real time.
Create a DataConnector
-
Create a DataConnector that synchronizes data to MaxCompute:
void CreateConnector()
{
try{
sdk::SinkOdpsConfig config;
config.SetEndpoint("ODPS_ENDPOINT");
config.SetProject("ODPS_PROJECT");
config.SetTable("ODPS_TABLE");
config.SetAccessId("ODPS_ACCESSID");
config.SetAccessKey("ODPS_ACCESSKEY");
config.SetPartitionMode(sdk::PartitionMode::SYSTEM_TIME);
config.SetTimeRange(15);
const std::string projectName = "";
const std::string topicName = "";
std::vector<std::pair<std::string, std::string> > partitionConfig;
partitionConfig.push_back(std::pair<std::string, std::string>("ds", "%Y%m%d"));
partitionConfig.push_back(std::pair<std::string, std::string>("hh", "%H"));
partitionConfig.push_back(std::pair<std::string, std::string>("mm", "%M"));
config.SetPartitionConfig(partitionConfig);
std::vector<std::string> columnFields;
columnFields.push_back(fieldName1);
const CreateConnectorResult& connectorResult = client.CreateConnector(projectName, topicName, sdk::ConnectorType::SINK_ODPS, columnFields, config);
std::cout<<"Odps ConnectorId:" + connectorResult.GetConnectorId()<<std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Create odpsConnector fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Delete a DataConnector
void DeleteConnector()
{
const std::string projectName = "";
const std::string topicName = "";
const std::string connectorId = "";
try {
client.DeleteConnector(projectName, topicName, connectorId);
}
catch(const DatahubException& e)
{
std::cerr << "Delete Connector fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
Query a DataConnector
void GetConnector()
{
const std::string projectName = "";
const std::string topicName = "";
const std::string connectorId = "";
try
{
GetConnectorResult GetConnectorResult = client.GetConnector(projectName, topicName, connectorId);
const sdk::SinkOdpsConfig* odpsConfig = dynamic_cast<const sdk::SinkOdpsConfig*>(getConnectorResult.GetConfig());
std::cout << odpsConfig->GetPartitionConfig().size() << std::endl;
}
catch(const DatahubException& e)
{
std::cerr << "Delete topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}
List DataConnectors
void ListConnector() {
try {
const std::string projectName = "";
const std::string topicName = "";
const ListConnectorResult& listConnectorResult = client.ListConnector(projectName, topicName);
std::cout << listConnectorResult.GetConnectorIds().size() << std::endl;
} catch(const DatahubException& e)
{
std::cerr << "list connector fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
}
}