All Products
Search
Document Center

DataHub:DataHub SDK for C++

Last Updated:Apr 17, 2024

Installation

Prerequisites

GCC 4.9.2 is used. DataHub SDK for C++ supports only GCC 4.9.2 for compilation. Before you install and use DataHub SDK for C++, make sure that the compilation environment is as required.

Download the SDK.

Initialization

You can use an Alibaba Cloud account to access DataHub. To access DataHub, you must provide your AccessKey ID and AccessKey secret, and the endpoint that is used to access DataHub. The following sample code provides an example on how to create a DataHub client by using endpoints. For more information about the endpoints, see Endpoints.

     /* Configuration */
    Account account;
    account.id = "";
    account.key = "=";

    std::string projectName = "test_project";
    std::string topicName = "test_cpp";
    std::string comment = "test";

    std::string endpoint = "";
    Configuration conf(account, endpoint);

    /* Datahub Client */
    DatahubClient client(conf);

Manage projects

A project is a basic unit for managing data in DataHub. A project contains multiple topics. Projects in DataHub are independent of those in MaxCompute. You cannot reuse MaxCompute projects in DataHub. You must create projects in DataHub.

Create a project

void CreateProject()
{
    std::string projectName = "";
    std::string comment = "";
    try
    {
        client.CreateProject(projectName, comment);
    }
    catch(const DatahubException& e)
    {
        std::cerr << "Create project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
}

Delete a project

void DeleteProject()
{
    std::string projectName = "";
    try
    {
        client.DeleteProject(projectName);
    }
    catch(const DatahubException& e)
    {
        std::cerr << "Delete project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
}

Update a project

void UpdateProject()
{
  std::string projectName = "";
  std::string comment = "";
  try
  {
      client.UpdateProject(projectName,comment);
  }
  catch(const DatahubException& e)
  {
      std::cerr << "Update project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
  }
}

Query projects

void ListProject()
{
  try
  {
      const ListProjectResult& listProjectResult = client.ListProject();
      std::cout<<listProjectResult.GetProjectNames().size()<<std::endl;
  }
  catch(const DatahubException& e)
  {
      std::cerr << "List project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
  }
}

Query a project

void GetProject()
{
  std::string projectName = "";
  try
  {
     const GetProjectResult& projectResult = client.GetProject(projectName);
     std::cout<<projectResult.GetProject()<<std::endl;
     std::cout<<projectResult.GetComment()<<std::endl;
     std::cout<<projectResult.GetCreator()<<std::endl;
     std::cout<<projectResult.GetCreateTime()<<std::endl;
     std::cout<<projectResult.GetLastModifyTime()<<std::endl;    
  }
  catch(const DatahubException& e)
  {
      std::cerr << "get project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
  }
}

Manage topics

A topic is the smallest unit for data subscription and publishing in DataHub. You can use topics to distinguish different types of streaming data. Two types of topics are supported: tuple and blob.

  1. You can write a block of binary data as a record of a blob topic.

  2. Tuple topics contain records that are similar to data records in databases. Each record contains multiple columns. You must specify record schemas for tuple topics because the data in tuple topics is transmitted as strings over the network. Therefore, schemas are required for data type conversion. The following table describes the data types that are supported.

Type

Description

Value range

BIGINT

An eight-byte signed integer.

-9223372036854775807 to 9223372036854775807

DOUBLE

A double-precision floating-point number. It is eight bytes in length.

-1.0 _10^308 to 1.0 _10^308

BOOLEAN

The Boolean type.

True and False, true and false, or 0 and 1.

TIMESTAMP

The type of timestamp.

A timestamp that is accurate to microseconds.

STRING

A string. Data of the STRING type can be encoded by using only UTF-8.

The size of all values in a column of the STRING type cannot exceed 2 MB.

TINYINT

A single-byte integer.

-128 to 127

SMALLINT

A double-byte integer.

-32768 to 32767

INTEGER

A four-byte integer.

-2147483648 to 2147483647

FLOAT

A four-byte single-precision floating-point number.

-3.40292347_10^38 to 3.40292347_10^38

Create a tuple topic

void CreateTupleTopic(){
    RecordSchema schema;
    std::string fieldName1 = "a";
    std::string fieldName2 = "b";
    std::string fieldName3 = "c";
    std::string fieldComment1 = "field1 comment";
    std::string fieldComment2 = "field2 comment";
    std::string fieldComment3 = "field3 comment";
    schema.AddField(Field(fieldName1/*Fieldname*/, BIGINT, true, fieldComment1));
    schema.AddField(Field(fieldName2/*Fieldname*/, DOUBLE, true, fieldComment2));
    schema.AddField(Field(fieldName3/*Fieldname*/, STRING, true, fieldComment3));

    /* Create Topic */
    int shardCount = 3;
    int lifeCycle = 7;
    RecordType type = TUPLE;
    std::string projectName = "";
    std::string topicName = "";

    try
    {
        client.CreateTopic(projectName, topicName, shardCount, lifeCycle, type, schema, comment);
    }
    catch (const DatahubException& e)
    {
        std::cerr << "Create topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;

    }
}

Create a blob topic

 void CreateBlobTopic()
{
    /* Create Topic */
    int shardCount = 3;
    int lifeCycle = 7;
    RecordType type = BLOB;
    std::string projectName = "";
    std::string topicName = "";
    try
    {
        client.CreateTopic(projectName, topicName, shardCount, lifeCycle, type, comment);
    }
    catch (const DatahubException& e)
    {
        std::cerr << "Create topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
}

}

Delete a topic

void DeleteTopic()
{
    std::string projectName = "";
    std::string topicName = "";
    try
    {
       client.DeleteTopic(projectName, topicName);  
    }
    catch(const DatahubException& e)
    {
        std::cerr << "Delete topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }

}

Query topics

void ListTopic(){
    std::string projectName = "";
    try
    {
        const ListTopicResult& listTopicResult = client.ListTopic(projectName);
        std::cout<<listTopicResult.GetTopicNames().size()<<std::endl;
    }
    catch(const DatahubException& e)
    {
        std::cerr << "Get topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }

}

Update a topic

void UpdateTopic(){
    const std::string updateComment = "test1";
    int updateLifecycle = 7;
    std::string projectName = "";
    std::string topicName = "";
    try
    {
        client.UpdateTopic(projectName, topicName, updateLifecycle, updateProjectComment);
    }
    catch(const DatahubException& e)
    {
        std::cerr << "Update topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }

}

Query a topic

void GetTopic(){
    std::string projectName = "";
    std::string topicName = "";

    try
    {
        const GetTopicResult& getTopicResult = client.GetTopic(projectName, topicName);
        cout<<getTopicResult.GetComment()<<endl;
    }
    catch(const DatahubException& e)
    {
        std::cerr << "Get topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }

}

Manage shards

Shards are concurrent tunnels used for data transmission in a topic. Each shard has an ID. A shard can be in different states. Opening: The shard is being started. Active: The shard is started and can be used to provide services. Each active shard consumes server resources. We recommend that you create shards based on your business requirements.

List shards

void ListShard()
{
    try
    {
       const ListShardResult& lsr = client.ListShard(projectName, topicName);
       std::cout<<lsr.GetShards().size()<<std::endl;
       std::vector<ShardEntry> shards = lsr.GetShards();

       std::vector<std::string> shardIds;
       for (size_t i = 0; i < shards.size(); ++i)
       {		
            ShardEntry shardEntry = shards[i];
            shardIds.push_back(shardEntry.GetShardId());
       }
    } 
    catch(const DatahubException& e)
    {
        std::cerr << "List shard fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
}

Split a shard

void SplitShard()
{
  std::string projectName = "";
  std::string topicName = "";
  try
  {
    const SplitShardResult& ssr = client.SplitShard(projectName, topicName, "0", "00000000000000000000000000AAAAAA");
    std::cout<<ssr.GetChildShards().size()<<std::endl;
  }
    catch(const DatahubException& e)
    {
        std::cerr << "Split shardId fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
}

Merge shards

void MergeShard()
{
  std::string projectName = "";
  std::string topicName = "";

  try
  {
    const MergeShardResult& msr = client.MergeShard(projectName, topicName, "0", "1");
    std::cout<<msr.GetChildShard().GetShardId()<<std::endl;
    std::cout<<msr.GetChildShard().GetBeginHashKey()<<std::endl;
    std::cout<<msr.GetChildShard().GetEndHashKey()<<std::endl;
  }
   catch(const DatahubException& e)
   {
        std::cerr << "Merge sahrd fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
	
}

Read and write data

You can read data from active and closed shards. However, you can write data only to active shards.

Read data

  1. Obtain a cursor

To read data from a topic, specify a shard and the cursor from which data starts to be read. You can obtain the cursor by using the following methods: OLDEST, LATEST, SEQUENCE, and SYSTEM_TIME.

  • OLDEST: the cursor that points to the earliest valid record in the specified shard.

  • LATEST: the cursor that points to the latest record in the specified shard.

  • SEQUENCE: the cursor that points to the record of the specified sequence number.

  • SYSTEM_TIME: the cursor that points to the first record whose timestamp value is greater than or equal to the specified timestamp value.

void GetCursor()
{
    try
   {
      const GetCursorResult& r1 = client.GetCursor(projectName, topicName, "0", CURSOR_TYPE_OLDEST);
      std::string cursor = r1.GetCursor();
   } 
    catch(const DatahubException& e)
    {
        std::cerr << "Get cursor fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
	
}
  1. Read data

void sub(DatahubClient& client)
{
    std::string projectName = "";
    std::string topicName = "";
    std::string shardId = "0";
    std::string cursor;
    RecordSchema schema;
    OpenSubscriptionOffsetSessionResult osor = client.InitSubscriptionOffsetSession(projectName, topicName, subId, {shardId});
    SubscriptionOffset subscription = osor.GetOffsets().at(shardId);
    if (subscription.GetSequence() < 0)
    {
        cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_OLDEST).GetCursor();
    }
    else
    {
        int64_t nextSequence = subscription.GetSequence() + 1;
        try
        {
            cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_SEQUENCE, nextSequence).GetCursor();
        }
        catch (const DatahubException& e)
        {
            cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_OLDEST).GetCursor();
        }
    }

    int64_t readTotal = 0;
    int fetchNum = 10;
    while (true)
    {
        try
        {
            GetRecordResult grr = client.GetRecord(projectName, topicName, shardId, cursor, fetchNum, subId);
            if (grr.GetRecordCount() <= 0)
            {
                std::cout << "Read null, wait for 1s." << std::endl;
                sleep(1);
                continue;
            }
            
            for (auto recordResult : grr.GetRecords())
            {
                ProcessRecords(recordResult);
                if (++readTotal % 1000 == 0)
                {
                    subscription.SetSequence(recordResult.GetSequence());
                    subscription.SetTimestamp(recordResult.GetSystemTime());
                    std::map<std::string, SubscriptionOffset> offsets;
                    offsets[shardId] = subscription;
                    try
                    {
                        client.UpdateSubscriptionOffset(projectName, topicName, subId, offsets);
                    }
                    catch (const DatahubException& e)
                    {
                        std::cerr << "Update subscription offset fail. requestId: "<< e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
                        throw ;
                    }
                }
            }
            cursor = grr.GetNextCursor();
        }
        catch (const DatahubException& e)
        {
            std::cerr << "Get record fail. requestId: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
        }
    }

inline void ProcessRecords(const RecordResult& record)
{
    if (schema.GetFieldCount() == 0)       // BLOB
    {
        int len = 0;
        const char* data = record.GetData(len);
        printf("%s\n", data);
    }
    else                                   // TUPLE
    {
        for (int j = 0; j < schema.GetFieldCount(); ++j)
        {
            const Field &field = schema.GetField(j);
            const FieldType fieldType = field.GetFieldType();
            switch (fieldType)
            {
                case BIGINT:
                    printf("%ld\n", record.GetBigint(j));
                    break;
                case DOUBLE:
                    printf("%.15lf\n", record.GetDouble(j));
                    break;
                case STRING:
                    printf("%s\n", record.GetString(j).c_str());
                    break;
                default:
                    break;
            }
        }
    }
} 
}

Example of reading collaborative consumption data

using namespace aliyun;
using namespace aliyun::datahub;

std::atomic_int gTotalNum(0);

Account gAccount;
std::string gEndpoint;
std::string gProjectName;
std::string gTopicName;
std::string gSubId;
bool gAutoAck;
int32_t gFetchNum;
int64_t gSessionTimeoutMs;

void Usage(const char* prog)
{
    const char *start = strrchr(prog, '/');
    start = (start != NULL) ? start + 1 : prog;
    fprintf(stdout,
        "%s Ver %s\n"
        "Usage: <programme> [options...]\n"
        "   -i --accessid           <string>          set access id [Mandatory]\n"
        "   -k --accesskey          <string>          set access key [Mandatory]\n"
        "   -e --endpoint           <string>          set endpoint [Mandatory]\n"
        "   -p --project            <string>          set project [Mandatory]\n"
        "   -t --topic              <string>          set topic [Mandatory]\n"
        "   -s --subId              <string>          set subscription id [Mandatory]\n"
        "   -a --autoAck            <bool>            set auto ack offset\n"
        "   -f --fetchNum           <int>             set fetch limit num\n"
        "   -S --sessionTimeoutMs   <int>             set session timeout (ms)\n"
        "   -v --version                              show version\n"
        "   -h --help                                 show help message\n", start, PROG_VERSION);
}

void parse_args(int argc, char* argv[])
{
    int opt_id;
    static struct option long_opts[] =
    {
        { "accessid", 1, 0, 'i'},
        { "accesskey", 1, 0, 'k'},
        { "endpoint", 1, 0, 'e' },
        { "project", 1, 0, 'p' },
        { "topic", 1, 0, 't' },
        { "subId", 1, 0, 's' },
        { "autoAck", 1, 0, 'a'},
        { "fetchNum", 1, 0, 'f'},
        { "sessionTimeoutMs", 1, 0, 'S'},
        { "version", 0, 0, 'v' },
        { "help", 0, 0, 'h' },
        { NULL, 0, 0, 0 }
    };

    while (1)
    {
        int ret = getopt_long(argc, argv, "i:k:e:p:t:s:a:f:S:vh", long_opts, &opt_id);
        if (ret == -1)
        {
            break;
        }
        switch (ret)
        {
        case 'i':
            gAccount.id = optarg;
            break;
        case 'k':
            gAccount.key = optarg;
            break;
        case 'e':
            gEndpoint = optarg;
            break;
        case 'p':
            gProjectName = optarg;
            break;
        case 't':
            gTopicName = optarg;
            break;
        case 's':
            gSubId = optarg;
            break;
        case 'a':
            gAutoAck = (bool)std::atoi(optarg);
            break;
        case 'f':
            gFetchNum = std::atoi(optarg);
            break;
        case 'S':
            gSessionTimeoutMs = (int64_t)std::atoi(optarg);
            break;
        case 'v':
        case 'h':
            Usage(argv[0]);
            exit(0);
            break;
        default:
            break;
        }
    }
}

inline void ProcessRecords(const TopicMetaPtr& topicMeta, const RecordResultPtr& recordPtr)
{
    const RecordSchema& schema = topicMeta->GetRecordSchema();
    if (schema.GetFieldCount() == 0)        // BLOB
    {
        int len = 0;
        const char* data = recordPtr->GetData(len);
        printf("%s\n", data);
    }
    else                                    // TUPLE
    {
        for (int j = 0; j < schema.GetFieldCount(); ++j)
        {
            const Field& field = schema.GetField(j);
            const FieldType fieldType = field.GetFieldType();
            switch (fieldType)
            {
                case BIGINT:
                    printf("%ld ", recordPtr->GetBigint(j));
                    break;
                case INTEGER:
                    printf("%d ", recordPtr->GetInteger(j));
                    break;
                case SMALLINT:
                    printf("%hd ", recordPtr->GetSmallint(j));
                    break;
                case TINYINT:
                    printf("%hhd ", recordPtr->GetTinyint(j));
                    break;
                case DOUBLE:
                    printf("%.5lf ", recordPtr->GetDouble(j));
                    break;
                case FLOAT:
                    printf("%.5f ", recordPtr->GetFloat(j));
                    break;
                case STRING:
                    printf("%s ", recordPtr->GetString(j).c_str());
                    break;
                case BOOLEAN:
                    printf("%s ", recordPtr->GetBoolean(j) ? "true" : "false");
                    break;
                case TIMESTAMP:
                    printf("%ld ", recordPtr->GetTimestamp(j));
                    break;
                case DECIMAL:
                    printf("%s ", recordPtr->GetDecimal(j).c_str());
                    break;
                default:
                    break;
            }
        }
        printf("\n");
    }
}

void CollaborativeConsume(const std::string& project, const std::string& topic, const std::string& subId, const ConsumerConfiguration& conf)
{
    DatahubConsumer consumer(project, topic, subId, conf);
    const TopicMetaPtr& topicMeta = consumer.GetTopicMeta();

    uint64_t readRecordNum = 0;
    try{
        while (true)
        {
            auto recordPtr = consumer.Read(60000);
            if (recordPtr == nullptr)
            {
                break;
            }
            // Process data.
            ProcessRecords(topicMeta, recordPtr);
            if (!gAutoAck)
            {
                recordPtr->GetMessageKey()->Ack();      // If the value of auto_ack is set to false, the Ack function is executed after the data is processed.
            }
            readRecordNum++;
        }
    }
    catch (const std::exception& e)
    {
        std::cerr << "Read fail: " << e.what() << std::endl;
    }
    std::cout << "Read " << readRecordNum << " records total" << std::endl;
}

int main(int argc, char* argv[])
{
    parse_args(argc, argv);

    ConsumerConfiguration consumerConf(gAccount, gEndpoint);
    consumerConf.SetLogFilePath("./DatahubCollaborativeConsumer.log");
    consumerConf.SetAutoAckOffset(gAutoAck);
    if (gFetchNum > 0)
    {
        consumerConf.SetFetchLimitNum(gFetchNum);
    }
    if (gSessionTimeoutMs > 0)
    {
        consumerConf.SetSessionTimeout(gSessionTimeoutMs);
    }

    CollaborativeConsume(gProjectName, gTopicName, gSubId, consumerConf);

    return 0;
}
Note

Collaborative consumption is a new feature supported since version 2.25.0 of the SDK. It is not supported in earlier versions of the SDK.

Write data

void pub(DatahubClient& client)
{
    std::vector<RecordEntry> records;
    GenerateRecords(records);

    try
    {
        PutRecordResult prr = client.PutRecord(projectName, topicName, records);
        if (prr.GetFailedRecordCount() > 0)
        {
            pubRetry(client, prr.GetFailedRecords(), 3);
        }
    }
    catch (const DatahubException& e)
    {
        std::cerr << "Put records fail. requestId: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
}

void pubRetry(DatahubClient& client, const std::vector<RecordEntry>& records, int retryTimes)
{
    if (retryTimes <= 0)
    {
        std::cout << "Put record retry fail. records size: " << records.size() << std::endl;
        return ;
    }

    try
    {
        PutRecordResult prr = client.PutRecord(projectName, topicName, records);
        if (prr.GetFailedRecordCount() > 0)
        {
            pubRetry(client, prr.GetFailedRecords(), retryTimes-1);
        }
        else
        {
            std::cout << "Put records retry success." << std::endl;
        }
    }
    catch (const DatahubException& e)
    {
        std::cerr << "Put records fail when put retry. requestId: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
inline void GenerateRecords(RecordEntryVec& records)
{
    for (int i = 0; i < 10; i++)
    {   
        if (schema.GetFieldCount() == 0) 
        {        
             // BLOB
            RecordEntry record = RecordEntry(BLOB);
            record.SetData("test_blob_data" + std::to_string(i));
            records.push_back(record);
        }
        else
        {
        // TUPLE
            // RecordEntry record = RecordEntry(5);
            // record.SetString(0, "field_1_" + std::to_string(i));
            // record.SetBigint(1, 123456789l);
            // record.SetDouble(2, 123.456d);
            // record.SetDouble(3, 654.32100d);
            // record.SetString(4, "field_2_" + std::to_string(i));
            // records.push_back(record);
        } 

    }
}  
}

Example of asynchronously writing data

using namespace aliyun;
using namespace aliyun::datahub;

Account gAccount;
std::string gEndpoint;
std::string gProjectName;
std::string gTopicName;
int gEpochNum = 10;
int gRecordNum = 10;
int64_t gMaxBufferRecords = 0l;
int64_t gMaxBufferSize = 0l;
int64_t gMaxBufferTimeMs = 0l;
int64_t gMaxRecordPackQueueLimit = 0l;
StringVec gShardIds;

inline void StringSplit(StringVec& tar, const std::string& src, const std::string& pre)
{
    size_t start_pos = 0, end_pos = src.size();
    size_t pos = src.find(pre, start_pos);
    while (pos != std::string::npos)
    {
        std::string addStr = src.substr(start_pos, pos - start_pos);
        if (!addStr.empty())
        {
            tar.push_back(addStr);
        }
        start_pos = pos + 1;
        pos = src.find(pre, start_pos);
    }
    std::string addStr = src.substr(start_pos, end_pos - start_pos);
    if (!addStr.empty())
    {
        tar.push_back(addStr);
    }
}

void Usage(const char* prog)
{
    const char *start = strrchr(prog, '/');
    start = (start != NULL) ? start + 1 : prog;
    fprintf(stdout,
        "%s Ver %s\n"
        "Usage: <programme> [options...]\n"
        "   -i --accessid           <string>          set access id [Mandatory]\n"
        "   -k --accesskey          <string>          set access key [Mandatory]\n"
        "   -e --endpoint           <string>          set endpoint [Mandatory]\n"
        "   -p --project            <string>          set project [Mandatory]\n"
        "   -t --topic              <string>          set topic [Mandatory]\n"
        "   -E --epochNum           <int>             set epoch num for write [Mandatory]\n"
        "   -N --recordNum          <int>             set record num for each epoch [Mandatory]\n"
        "   -R --maxBufferRecords   <int>             set max buffer record count\n"
        "   -S --maxBufferSize      <int>             set max buffer size\n"
        "   -M --maxBufferTimeMs    <int>             set max buffer time (ms)\n"
        "   -Q --maxRecordPackLimit <int>             set max record pack queue limit\n"
        "   -H --shardIds           <string>          set shards (split by ',')\n"
        "   -v --version                              show version\n"
        "   -h --help                                 show help message\n", start, PROG_VERSION);
}

void parse_args(int argc, char* argv[])
{
    int opt_id;
    static struct option long_opts[] =
    {
        { "accessid", 1, 0, 'i'},
        { "accesskey", 1, 0, 'k'},
        { "endpoint", 1, 0, 'e' },
        { "project", 1, 0, 'p' },
        { "topic", 1, 0, 't' },
        { "epochNum", 1, 0, 'E'},
        { "recordNum", 1, 0, 'N'},
        { "maxBufferRecords", 1, 0, 'R'},
        { "mMaxBufferSize", 1, 0, 'S'},
        { "maxBufferTimeMs", 1, 0, 'M'},
        { "maxRecordPackQueueLimit", 1, 0, 'Q'},
        { "shardIds", 1, 0, 'H'},
        { "version", 0, 0, 'v' },
        { "help", 0, 0, 'h' },
        { NULL, 0, 0, 0 }
    };

    while (1)
    {
        int ret = getopt_long(argc, argv, "i:k:e:p:t:E:N:R:S:M:Q:H:vh", long_opts, &opt_id);
        if (ret == -1)
        {
            break;
        }
        switch (ret)
        {
        case 'i':
            gAccount.id = optarg;
            break;
        case 'k':
            gAccount.key = optarg;
            break;
        case 'e':
            gEndpoint = optarg;
            break;
        case 'p':
            gProjectName = optarg;
            break;
        case 't':
            gTopicName = optarg;
            break;
        case 'E':
            gEpochNum = std::atoi(optarg);
            break;
        case 'N':
            gRecordNum = std::atoi(optarg);
            break;
        case 'R':
            gMaxBufferRecords = (int64_t)std::atoi(optarg);
            break;
        case 'S':
            gMaxBufferSize = (int64_t)std::atoi(optarg);
            break;
        case 'M':
            gMaxBufferTimeMs = (int64_t)std::atoi(optarg);
            break;
        case 'Q':
            gMaxRecordPackQueueLimit = (int64_t)std::atoi(optarg);
            break;
        case 'H':
            StringSplit(gShardIds, optarg, ",");
            break;
        case 'v':
        case 'h':
            Usage(argv[0]);
            exit(0);
            break;
        default:
            break;
        }
    }

    if (gEpochNum <= 0 || gRecordNum <= 0)
    {
        std::cerr << "Invalid parameter!" << std::endl;
        exit(1);
    }
}

inline void GenerateTupleRecord(const RecordSchema& schema, RecordEntry& record)
{
    for (int i = 0; i < schema.GetFieldCount(); i++)
    {
        const Field& field = schema.GetField(i);
        const FieldType& type = field.GetFieldType();
        switch (type)
        {
            case BIGINT:
                record.SetBigint(i, 1234l);
                break;
            case DOUBLE:
                record.SetDouble(i, 1.234);
                break;
            case BOOLEAN:
                record.SetBoolean(i, true);
                break;
            case TIMESTAMP:
                record.SetTimestamp(i, 1234l);
                break;
            case STRING:
                record.SetString(i, "1234");
                break;
            case DECIMAL:
                record.SetDecimal(i, "1234");
                break;
            case INTEGER:
                record.SetInteger(i, (int32_t)1234);
                break;
            case FLOAT:
                record.SetFloat(i, 1.234);
                break;
            case TINYINT:
                record.SetTinyint(i, (int8_t)1234);
                break;
            case SMALLINT:
                record.SetSmallint(i, (int16_t)1234);
                break;
            default:
                break;
        }
    }
}

inline void GenerateRecords(TopicMetaPtr topicMeta, RecordEntryVec& records)
{
    const RecordSchema& schema = topicMeta->GetRecordSchema();
    for (int i = 0; i < gRecordNum; i++)
    {
        if (topicMeta->GetRecordType() == "TUPLE")
        {
            // TUPLE
            records.emplace_back(schema.GetFieldCount());
            GenerateTupleRecord(schema, records.back());
        }
        else
        {
            // BLOB
            records.emplace_back(BLOB);
            records.back().SetData("test_blob_data" + std::to_string(i));
        }
    }
}

void AsyncProduce(const std::string& project, const std::string& topic, const ProducerConfiguration& conf, const StringVec& shardIds)
{
    DatahubProducer producer(project, topic, conf, shardIds);
    const TopicMetaPtr& topicMeta = producer.GetTopicMeta();

    std::vector<WriteResultFuturePtr> resultFutureVecs;
    std::map<std::string, uint64_t> writeRecordNum;
    for (const std::string& shardId : shardIds)
    {
        writeRecordNum[shardId] = 0;
    }

    try
    {
        // Generate data.
        RecordEntryVec records;
        GenerateRecords(topicMeta, records);
        for (int i = 0; i < gEpochNum; i++)
        {
            auto result = producer.WriteAsync(records);
            resultFutureVecs.push_back(result);
        }
    }
    catch(const std::exception& e)
    {
        std::cerr << "WriteAsync fail: " << e.what() << std::endl;
    }

    producer.Flush();

    for (auto it = resultFutureVecs.begin(); it != resultFutureVecs.end(); it++)
    {
        (*it)->wait();
        try
        {
            WriteResultPtr result = (*it)->get();
            writeRecordNum[result->GetShardId()] += gRecordNum;
        }
        catch (const std::exception& e)
        {
            std::cerr << "Write records fail: " << e.what() << std::endl;
        }
    }

    for (auto it = writeRecordNum.begin(); it != writeRecordNum.end(); it++)
    {
        std::cout << "Write " << it->second << " records to shard " << it->first << std::endl;
    }
}

int main(int argc, char *argv[])
{
    parse_args(argc, argv);

    ProducerConfiguration producerConf(gAccount, gEndpoint);
    producerConf.SetEnableProtobuf(true);
    producerConf.SetLogFilePath("./DatahubAsyncProducer.log");
    if (gMaxBufferRecords > 0)
    {
        producerConf.SetMaxAsyncBufferRecords(gMaxBufferRecords);
    }
    if (gMaxBufferSize > 0)
    {
        producerConf.SetMaxAsyncBufferSize(gMaxBufferSize);
    }
    if (gMaxBufferTimeMs > 0)
    {
        producerConf.SetMaxAsyncBufferTimeMs(gMaxBufferTimeMs);
    }
    if (gMaxRecordPackQueueLimit > 0)
    {
        producerConf.SetMaxRecordPackQueueLimit(gMaxRecordPackQueueLimit);
    }

    AsyncProduce(gProjectName, gTopicName, producerConf, gShardIds);

    return 0;
}

Example of synchronously writing data

using namespace aliyun;
using namespace aliyun::datahub;

Account gAccount;
std::string gEndpoint;
std::string gProjectName;
std::string gTopicName;
int gEpochNum = 10;
int gRecordNum = 10;
StringVec gShardIds;

inline void StringSplit(StringVec& tar, const std::string& src, const std::string& pre)
{
    size_t start_pos = 0, end_pos = src.size();
    size_t pos = src.find(pre, start_pos);
    while (pos != std::string::npos)
    {
        std::string addStr = src.substr(start_pos, pos - start_pos);
        if (!addStr.empty())
        {
            tar.push_back(addStr);
        }
        start_pos = pos + 1;
        pos = src.find(pre, start_pos);
    }
    std::string addStr = src.substr(start_pos, end_pos - start_pos);
    if (!addStr.empty())
    {
        tar.push_back(addStr);
    }
}

void Usage(const char* prog)
{
    const char *start = strrchr(prog, '/');
    start = (start != NULL) ? start + 1 : prog;
    fprintf(stdout,
        "%s Ver %s\n"
        "Usage: <programme> [options...]\n"
        "   -i --accessid           <string>          set access id [Mandatory]\n"
        "   -k --accesskey          <string>          set access key [Mandatory]\n"
        "   -e --endpoint           <string>          set endpoint [Mandatory]\n"
        "   -p --project            <string>          set project [Mandatory]\n"
        "   -t --topic              <string>          set topic [Mandatory]\n"
        "   -E --epochNum           <int>             set epoch num for write [Mandatory]\n"
        "   -N --recordNum          <int>             set record num for each epoch [Mandatory]\n"
        "   -H --shardIds           <string>          set shards (split by ',')\n"
        "   -v --version                              show version\n"
        "   -h --help                                 show help message\n", start, PROG_VERSION);
}

void parse_args(int argc, char* argv[])
{
    int opt_id;
    static struct option long_opts[] =
    {
        { "accessid", 1, 0, 'i'},
        { "accesskey", 1, 0, 'k'},
        { "endpoint", 1, 0, 'e' },
        { "project", 1, 0, 'p' },
        { "topic", 1, 0, 't' },
        { "epochNum", 1, 0, 'E'},
        { "recordNum", 1, 0, 'N'},
        { "shardIds", 1, 0, 'H'},
        { "version", 0, 0, 'v' },
        { "help", 0, 0, 'h' },
        { NULL, 0, 0, 0 }
    };

    while (1)
    {
        int ret = getopt_long(argc, argv, "i:k:e:p:t:E:N:H:vh", long_opts, &opt_id);
        if (ret == -1)
        {
            break;
        }
        switch (ret)
        {
        case 'i':
            gAccount.id = optarg;
            break;
        case 'k':
            gAccount.key = optarg;
            break;
        case 'e':
            gEndpoint = optarg;
            break;
        case 'p':
            gProjectName = optarg;
            break;
        case 't':
            gTopicName = optarg;
            break;
        case 'E':
            gEpochNum = std::atoi(optarg);
            break;
        case 'N':
            gRecordNum = std::atoi(optarg);
            break;
        case 'H':
            StringSplit(gShardIds, optarg, ",");
            break;
        case 'v':
        case 'h':
            Usage(argv[0]);
            exit(0);
            break;
        default:
            break;
        }
    }

    if (gEpochNum <= 0 || gRecordNum <= 0)
    {
        std::cerr << "Invalid parameter!" << std::endl;
        exit(1);
    }
}

inline void GenerateTupleRecord(const RecordSchema& schema, RecordEntry& record)
{
    for (int i = 0; i < schema.GetFieldCount(); i++)
    {
        const Field& field = schema.GetField(i);
        const FieldType& type = field.GetFieldType();
        switch (type)
        {
            case BIGINT:
                record.SetBigint(i, 1234l);
                break;
            case DOUBLE:
                record.SetDouble(i, 1.234);
                break;
            case BOOLEAN:
                record.SetBoolean(i, true);
                break;
            case TIMESTAMP:
                record.SetTimestamp(i, 1234l);
                break;
            case STRING:
                record.SetString(i, "1234");
                break;
            case DECIMAL:
                record.SetDecimal(i, "1234");
                break;
            case INTEGER:
                record.SetInteger(i, (int32_t)1234);
                break;
            case FLOAT:
                record.SetFloat(i, 1.234);
                break;
            case TINYINT:
                record.SetTinyint(i, (int8_t)1234);
                break;
            case SMALLINT:
                record.SetSmallint(i, (int16_t)1234);
                break;
            default:
                break;
        }
    }
}

inline void GenerateRecords(const TopicMetaPtr& topicMeta, RecordEntryVec& records)
{
    const RecordSchema& schema = topicMeta->GetRecordSchema();
    for (int i = 0; i < gRecordNum; i++)
    {
        if (topicMeta->GetRecordType() == "TUPLE")
        {
            // TUPLE
            records.emplace_back(schema.GetFieldCount());
            GenerateTupleRecord(schema, records.back());
        }
        else
        {
            // BLOB
            records.emplace_back(BLOB);
            records.back().SetData("test_blob_data" + std::to_string(i));
        }
    }
}

void GeneralProduce(const std::string& project, const std::string& topic, const ProducerConfiguration& conf, const StringVec& shardIds)
{
    DatahubProducer producer(project, topic, conf, shardIds);
    const TopicMetaPtr& topicMeta = producer.GetTopicMeta();

    std::map<std::string, uint64_t> writeRecordNum;
    for (const std::string& shardId : shardIds)
    {
        writeRecordNum[shardId] = 0;
    }

    try
    {
        // Generate data.
        RecordEntryVec records;
        GenerateRecords(topicMeta, records);
        for (int i = 0; i < gEpochNum; i++)
        {
            std::string shardId = producer.Write(records);
            writeRecordNum[shardId] += records.size();
        }
    }
    catch(const std::exception& e)
    {
        std::cerr << "Write fail: " << e.what() << std::endl;
    }

    for (auto it = writeRecordNum.begin(); it != writeRecordNum.end(); it++)
    {
        std::cout << "Write " << it->second << " records to shard " << it->first << std::endl;
    }
}

int main(int argc, char *argv[])
{
    parse_args(argc, argv);

    ProducerConfiguration producerConf(gAccount, gEndpoint);
    producerConf.SetEnableProtobuf(true);
    producerConf.SetLogFilePath("./DatahubGeneralProducer.log");

    GeneralProduce(gProjectName, gTopicName, producerConf, gShardIds);

    return 0;
}

Query metering information

void GetMeter()
{
    try
    {
        GetMeteringInfoResult GetMeteringInfo = Client.GetMeteringInfo(projectName, topicName, shardId);

    }
    catch(DatahubException e)
    {
          cerr << e << endl;
    }
}
Note

The features of asynchronously writing data and synchronously writing data are supported since version 2.25.0 of the SDK.

Manage subscriptions

DataHub allows servers to save the consumption offsets of a subscription. You can obtain highly available offset storage services by performing simple configurations.

Create a subscription

void CreateSubscription()
{ 
  std::string projectName = "";
  std::string topicName = "";
  try
  {
    const std::string subscriptionComment = "test_subscription";
    const CreateSubscriptionResult& createSubscriptionResult = client.CreateSubscription(projectName, topicName, subscriptionComment);
  } 
   catch(const DatahubException& e)
   {
        std::cerr << "Create sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

Delete a subscription

void DeleteSubscription() 
{
  std::string projectName = "";
  std::string topicName = "";
  std::string subId = "";
  try 
  {
     client.DeleteSubscription(projectName, topicName, subId);
  } 
   catch(const DatahubException& e)
   {
      std::cerr << "Delete sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

Update a subscription

void UpdateSubscription()
{
    try 
    {
        const std::string updateSubscriptionComment = "test_subscription_1";
      	const std::string projectName = "";
        const std::string topicName = "";
        client.UpdateSubscription(projectName, topicName, subId, updateSubscriptionComment);
      }
   catch(const DatahubException& e)
   {
      std::cerr << "Upate sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

List subscriptions

 void ListSubscription() 
 {
    const std::string projectName = "";
    const std::string topicName = "";
    const std::string subId = "";

    try 
    {
       const ListSubscriptionResult& subscriptionResult = client.ListSubscription(projectName, topicName, 1, 20, subId));
       std::cout << subscriptionResult.GetTotalCount() << std::endl;
    }  
   catch(const DatahubException& e)
   {
      std::cerr << "list sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

Query a subscription

void GetSubscription() 
{
    try 
    {
    	const std::vector<SubscriptionEntry>& subscriptions = subscriptionResult.GetSubscriptions();
    	std::cout << subscriptions.size() << std::endl;

    	for (size_t i = 0; i < subscriptions.size(); ++i)
    	{
          SubscriptionEntry entry = subscriptions[i];
          std::cout <<  entry.GetSubId() << std::endl;
      }						
    }  
   catch(const DatahubException& e)
   {
      std::cerr << "Get sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

Update the status of a subscription

 void UpdateSubscriptionState()
 {
    try 
    {
        client.UpdateSubscriptionState(projectName, topicName, subId, SubscriptionState::OFFLINE);
        std::cout << getSubscriptionResult.GetComment() << std::endl;

    } 
   catch(const DatahubException& e)
   {
      std::cerr << "Update sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
	
}

Manage offsets

After a subscription is created, it is initially unconsumed. To use the offset storage feature of the subscription, perform operations on offsets.

Initialize an offset

 void InitSubscriptionOffsetSession()
 {
    const std::string projectName = "";
    const std::string topicName = "";
    const std::string subId = "";
    std::vector<std::string> shardIds;
    try 
    {
        const OpenSubscriptionOffsetSessionResult& offsetSessionResult =
        client.InitSubscriptionOffsetSession(projectName, topicName, subId, shardIds);
        std::cout << offsetSessionResult.GetOffsets().size() << std::endl;

    } 
   catch(const DatahubException& e)
   {
      std::cerr << "Init offset fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

Obtain an offset

// Obtain an offset.
void GetSubscriptionOffset() 
{
    const std::string projectName = "";
    const std::string topicName = "";
    const std::string subId = "";
    std::vector<std::string> shardIds;
    try 
    {
        const GetSubscriptionOffsetResult& getSubscriptionOffsetResult =
        client.GetSubscriptionOffset(projectName, topicName, subId, shardIds;
        std::cout << getSubscriptionOffsetResult.GetOffsets().size() << std::endl;        std::cout << getSubscriptionOffsetResult.GetOffsets().size() << std::endl;
        std::cout << getSubscriptionResult.GetComment() << std::endl;

    }  
   catch(const DatahubException& e)
   {
      std::cerr << "Get offset fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

Reset an offset

void ResetSubscriptionOffset() 
{
    try 
    {
        int64_t resetTimestamp = 0l;
        int64_t resetSequence = 0l;
        uint32_t resetBatchIndex = 0u;
        const std::string projectName = "";
        const std::string topicName = "";
        const std::string subId = "";
        std::map<std::string, SubscriptionOffset> resetSubscriptionOffsets;
        for (auto iter = offsets.begin(); iter != offsets.end(); ++iter)
        {
            SubscriptionOffset offset(resetTimestamp, resetSequence, resetBatchIndex);
            resetSubscriptionOffsets.insert(
            std::pair<std::string, SubscriptionOffset>(iter->first, offset));
        }

    client.ResetSubscriptionOffset(projectName, topicName, subId, resetSubscriptionOffsets);    }  
   catch(const DatahubException& e)
   {
      std::cerr << "Reset offset fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

Manage DataConnectors

A DataConnector in DataHub synchronizes streaming data from DataHub to other cloud services. You can use DataConnectors to synchronize data from DataHub topics to MaxCompute, Object Storage Service (OSS), ApsaraDB RDS for MySQL, Tablestore, Elasticsearch, and Function Compute in real time or near real-time mode. After DataConnectors are configured, the data that you write to DataHub can be used in other Alibaba Cloud services.

Create a DataConnector

  • The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to MaxCompute:

void  CreateConnector() 
{
    try{
    sdk::SinkOdpsConfig config;
    config.SetEndpoint("ODPS_ENDPOINT");
    config.SetProject("ODPS_PROJECT");
    config.SetTable("ODPS_TABLE");
    config.SetAccessId("ODPS_ACCESSID");
    config.SetAccessKey("ODPS_ACCESSKEY");
    config.SetPartitionMode(sdk::PartitionMode::SYSTEM_TIME);
    config.SetTimeRange(15);
    const std::string projectName = "";
    const std::string topicName = "";
    std::vector<std::pair<std::string, std::string> > partitionConfig;
    partitionConfig.push_back(std::pair<std::string, std::string>("ds", "%Y%m%d"));
    partitionConfig.push_back(std::pair<std::string, std::string>("hh", "%H"));
    partitionConfig.push_back(std::pair<std::string, std::string>("mm", "%M"));
    config.SetPartitionConfig(partitionConfig);

    std::vector<std::string> columnFields;
    columnFields.push_back(fieldName1);

    const CreateConnectorResult& connectorResult = client.CreateConnector(projectName, topicName, sdk::ConnectorType::SINK_ODPS, columnFields, config);

    std::cout<<"Odps ConnectorId:" + connectorResult.GetConnectorId()<<std::endl;
    }
   catch(const DatahubException& e)
   {
      std::cerr << "Create odpsConnector fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

Delete a DataConnector

void DeleteConnector() 
{
    const std::string projectName = "";
    const std::string topicName = "";
    const std::string connectorId = "";

    try {
      client.DeleteConnector(projectName, topicName, connectorId);
      } 
   	catch(const DatahubException& e)
   {
      std::cerr << "Delete Connector fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

Query a DataConnector

void GetConnector() 
{
    const std::string projectName = "";
    const std::string topicName = "";
    const std::string connectorId = "";
    try 
    {
        GetConnectorResult GetConnectorResult = client.GetConnector(projectName, topicName, connectorId);
        const sdk::SinkOdpsConfig* odpsConfig = dynamic_cast<const sdk::SinkOdpsConfig*>(getConnectorResult.GetConfig());
        std::cout << odpsConfig->GetPartitionConfig().size() << std::endl;
    } 
   catch(const DatahubException& e)
   {
      std::cerr << "Delete topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

List DataConnectors

void ListConnector() {
  try {
       const std::string projectName = "";
       const std::string topicName = "";
       const ListConnectorResult& listConnectorResult = client.ListConnector(projectName, topicName);
       std::cout << listConnectorResult.GetConnectorIds().size() << std::endl;
  }  catch(const DatahubException& e)
   {
      std::cerr << "list connector fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
 }