全部产品
Search
文档中心

DataHub:C++ SDK

更新时间:Sep 06, 2023

安装

安装要求说明

DataHub C++ SDK目前必须使用GCC 4.9.2编译,使用前请检查编译环境是否适合,确认检查通过后再安装使用。

SDK下载

初始化

用户可以使用阿里云认证账号访问DataHub,并需要提供云账号AccessId和AccessKey,同时需要提供访问DataHub的服务地址。 以下代码用于使用域名列表新建DataHubClient:

     /* Configuration */
    Account account;
    account.id = "";
    account.key = "=";

    std::string projectName = "test_project";
    std::string topicName = "test_cpp";
    std::string comment = "test";

    std::string endpoint = "";
    Configuration conf(account, endpoint);

    /* Datahub Client */
    DatahubClient client(conf);

Project操作

项目(Project)是DataHub数据的基本组织单元,下面包含多个Topic。值得注意的是,DataHub的项目空间与MaxCompute的项目空间是相互独立的。用户在MaxCompute中创建的项目不能复用于DataHub,需要独立创建。

创建project

void CreateProject()
{
    std::string projectName = "";
    std::string comment = "";
    try
    {
        client.CreateProject(projectName, comment);
    }
    catch(const DatahubException& e)
    {
        std::cerr << "Create project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
}

删除Project

void DeleteProject()
{
    std::string projectName = "";
    try
    {
        client.DeleteProject(projectName);
    }
    catch(const DatahubException& e)
    {
        std::cerr << "Delete project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
}

更新Project

void UpdateProject()
{
  std::string projectName = "";
  std::string comment = "";
  try
  {
      client.UpdateProject(projectName,comment);
  }
  catch(const DatahubException& e)
  {
      std::cerr << "Update project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
  }
}

获取Project列表

void ListProject()
{
  try
  {
      const ListProjectResult& listProjectResult = client.ListProject();
      std::cout<<listProjectResult.GetProjectNames().size()<<std::endl;
  }
  catch(const DatahubException& e)
  {
      std::cerr << "List project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
  }
}

查询Project信息

void GetProject()
{
  std::string projectName = "";
  try
  {
     const GetProjectResult& projectResult = client.GetProject(projectName);
     std::cout<<projectResult.GetProject()<<std::endl;
     std::cout<<projectResult.GetComment()<<std::endl;
     std::cout<<projectResult.GetCreator()<<std::endl;
     std::cout<<projectResult.GetCreateTime()<<std::endl;
     std::cout<<projectResult.GetLastModifyTime()<<std::endl;    
  }
  catch(const DatahubException& e)
  {
      std::cerr << "get project fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
  }
}

Topic操作

Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。目前支持Tuple与Blob两种类型:

  1. Blob类型Topic支持写入一块二进制数据作为一个Record

  2. Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列,需要指定Record Schema,因为网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。目前支持以下几种数据类型:

类型

含义

值域

BIGINT

8字节有符号整型

-9223372036854775807 ~ 9223372036854775807

DOUBLE

8字节双精度浮点数

-1.0 _10^308 ~ 1.0 _10^308

BOOLEAN

布尔类型

True/False或true/false或0/1

TIMESTAMP

时间戳类型

表示到微秒的时间戳类型

STRING

字符串,只支持UTF-8编码

单个STRING列最长允许2MB

TINYINT

单字节整型

-128 ~127

SMALLINT

双字节整型

-32768 ~ 32767

INTEGER

4字节整型

-2147483648 ~ 2147483647

FLOAT

4字节单精度浮点数

-3.40292347_10^38 ~ 3.40292347_10^38

创建Tuple Topic

void CreateTupleTopic(){
    RecordSchema schema;
    std::string fieldName1 = "a";
    std::string fieldName2 = "b";
    std::string fieldName3 = "c";
    std::string fieldComment1 = "field1 comment";
    std::string fieldComment2 = "field2 comment";
    std::string fieldComment3 = "field3 comment";
    schema.AddField(Field(fieldName1/*Fieldname*/, BIGINT, true, fieldComment1));
    schema.AddField(Field(fieldName2/*Fieldname*/, DOUBLE, true, fieldComment2));
    schema.AddField(Field(fieldName3/*Fieldname*/, STRING, true, fieldComment3));

    /* Create Topic */
    int shardCount = 3;
    int lifeCycle = 7;
    RecordType type = TUPLE;
    std::string projectName = "";
    std::string topicName = "";

    try
    {
        client.CreateTopic(projectName, topicName, shardCount, lifeCycle, type, schema, comment);
    }
    catch (const DatahubException& e)
    {
        std::cerr << "Create topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;

    }
}

创建Blob Topic

 void CreateBlobTopic()
{
    /* Create Topic */
    int shardCount = 3;
    int lifeCycle = 7;
    RecordType type = BLOB;
    std::string projectName = "";
    std::string topicName = "";
    try
    {
        client.CreateTopic(projectName, topicName, shardCount, lifeCycle, type, comment);
    }
    catch (const DatahubException& e)
    {
        std::cerr << "Create topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
}

}

删除Topic

void DeleteTopic()
{
    std::string projectName = "";
    std::string topicName = "";
    try
    {
       client.DeleteTopic(projectName, topicName);  
    }
    catch(const DatahubException& e)
    {
        std::cerr << "Delete topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }

}

获取Topic列表

void ListTopic(){
    std::string projectName = "";
    try
    {
        const ListTopicResult& listTopicResult = client.ListTopic(projectName);
        std::cout<<listTopicResult.GetTopicNames().size()<<std::endl;
    }
    catch(const DatahubException& e)
    {
        std::cerr << "Get topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }

}

更新Topic

void UpdateTopic(){
    const std::string updateComment = "test1";
    int updateLifecycle = 7;
    std::string projectName = "";
    std::string topicName = "";
    try
    {
        client.UpdateTopic(projectName, topicName, updateLifecycle, updateProjectComment);
    }
    catch(const DatahubException& e)
    {
        std::cerr << "Update topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }

}

查询Topic

void GetTopic(){
    std::string projectName = "";
    std::string topicName = "";

    try
    {
        const GetTopicResult& getTopicResult = client.GetTopic(projectName, topicName);
        cout<<getTopicResult.GetComment()<<endl;
    }
    catch(const DatahubException& e)
    {
        std::cerr << "Get topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }

}

Shard操作

Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态:Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。

列出Shard列表

void ListShard()
{
    try
    {
       const ListShardResult& lsr = client.ListShard(projectName, topicName);
       std::cout<<lsr.GetShards().size()<<std::endl;
       std::vector<ShardEntry> shards = lsr.GetShards();

       std::vector<std::string> shardIds;
       for (size_t i = 0; i < shards.size(); ++i)
       {		
            ShardEntry shardEntry = shards[i];
            shardIds.push_back(shardEntry.GetShardId());
       }
    } 
    catch(const DatahubException& e)
    {
        std::cerr << "List shard fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
}

分裂Shard

void SplitShard()
{
  std::string projectName = "";
  std::string topicName = "";
  try
  {
    const SplitShardResult& ssr = client.SplitShard(projectName, topicName, "0", "00000000000000000000000000AAAAAA");
    std::cout<<ssr.GetChildShards().size()<<std::endl;
  }
    catch(const DatahubException& e)
    {
        std::cerr << "Split shardId fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
}

合并Shard

void MergeShard()
{
  std::string projectName = "";
  std::string topicName = "";

  try
  {
    const MergeShardResult& msr = client.MergeShard(projectName, topicName, "0", "1");
    std::cout<<msr.GetChildShard().GetShardId()<<std::endl;
    std::cout<<msr.GetChildShard().GetBeginHashKey()<<std::endl;
    std::cout<<msr.GetChildShard().GetEndHashKey()<<std::endl;
  }
   catch(const DatahubException& e)
   {
        std::cerr << "Merge sahrd fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
	
}

读写数据

状态为CLOSED和ACTIVE的shard都可以读取数据,不过只有状态为ACTIVE的shard可以写数据。

读数据

  1. 获取cursor

读取Topic下的数据,需要指定对应的shard,同时需要指定数据读取的游标位置Cursor。Cursor的获取方式有四种,分别是OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。

  • OLDEST : 表示获取的cursor指向当前有效数据中时间最久远的record。

  • LATEST : 表示获取的cursor指向当前最新的record。

  • SEQUENCE : 表示获取的cursor指向该序列的record。

  • SYSTEM_TIME : 表示获取的cursor指向该大于等于该时间戳的第一条record。

void GetCursor()
{
    try
   {
      const GetCursorResult& r1 = client.GetCursor(projectName, topicName, "0", CURSOR_TYPE_OLDEST);
      std::string cursor = r1.GetCursor();
   } 
    catch(const DatahubException& e)
    {
        std::cerr << "Get cursor fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
	
}

  1. 读取数据

void sub(DatahubClient& client)
{
    std::string projectName = "";
    std::string topicName = "";
    std::string shardId = "0";
    std::string cursor;
    RecordSchema schema;
    OpenSubscriptionOffsetSessionResult osor = client.InitSubscriptionOffsetSession(projectName, topicName, subId, {shardId});
    SubscriptionOffset subscription = osor.GetOffsets().at(shardId);
    if (subscription.GetSequence() < 0)
    {
        cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_OLDEST).GetCursor();
    }
    else
    {
        int64_t nextSequence = subscription.GetSequence() + 1;
        try
        {
            cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_SEQUENCE, nextSequence).GetCursor();
        }
        catch (const DatahubException& e)
        {
            cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_OLDEST).GetCursor();
        }
    }

    int64_t readTotal = 0;
    int fetchNum = 10;
    while (true)
    {
        try
        {
            GetRecordResult grr = client.GetRecord(projectName, topicName, shardId, cursor, fetchNum, subId);
            if (grr.GetRecordCount() <= 0)
            {
                std::cout << "Read null, wait for 1s." << std::endl;
                sleep(1);
                continue;
            }
            
            for (auto recordResult : grr.GetRecords())
            {
                ProcessRecords(recordResult);
                if (++readTotal % 1000 == 0)
                {
                    subscription.SetSequence(recordResult.GetSequence());
                    subscription.SetTimestamp(recordResult.GetSystemTime());
                    std::map<std::string, SubscriptionOffset> offsets;
                    offsets[shardId] = subscription;
                    try
                    {
                        client.UpdateSubscriptionOffset(projectName, topicName, subId, offsets);
                    }
                    catch (const DatahubException& e)
                    {
                        std::cerr << "Update subscription offset fail. requestId: "<< e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
                        throw ;
                    }
                }
            }
            cursor = grr.GetNextCursor();
        }
        catch (const DatahubException& e)
        {
            std::cerr << "Get record fail. requestId: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
        }
    }

inline void ProcessRecords(const RecordResult& record)
{
    if (schema.GetFieldCount() == 0)       // BLOB
    {
        int len = 0;
        const char* data = record.GetData(len);
        printf("%s\n", data);
    }
    else                                   // TUPLE
    {
        for (int j = 0; j < schema.GetFieldCount(); ++j)
        {
            const Field &field = schema.GetField(j);
            const FieldType fieldType = field.GetFieldType();
            switch (fieldType)
            {
                case BIGINT:
                    printf("%ld\n", record.GetBigint(j));
                    break;
                case DOUBLE:
                    printf("%.15lf\n", record.GetDouble(j));
                    break;
                case STRING:
                    printf("%s\n", record.GetString(j).c_str());
                    break;
                default:
                    break;
            }
        }
    }
} 
}

写数据

void pub(DatahubClient& client)
{
    std::vector<RecordEntry> records;
    GenerateRecords(records);

    try
    {
        PutRecordResult prr = client.PutRecord(projectName, topicName, records);
        if (prr.GetFailedRecordCount() > 0)
        {
            pubRetry(client, prr.GetFailedRecords(), 3);
        }
    }
    catch (const DatahubException& e)
    {
        std::cerr << "Put records fail. requestId: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
}

void pubRetry(DatahubClient& client, const std::vector<RecordEntry>& records, int retryTimes)
{
    if (retryTimes <= 0)
    {
        std::cout << "Put record retry fail. records size: " << records.size() << std::endl;
        return ;
    }

    try
    {
        PutRecordResult prr = client.PutRecord(projectName, topicName, records);
        if (prr.GetFailedRecordCount() > 0)
        {
            pubRetry(client, prr.GetFailedRecords(), retryTimes-1);
        }
        else
        {
            std::cout << "Put records retry success." << std::endl;
        }
    }
    catch (const DatahubException& e)
    {
        std::cerr << "Put records fail when put retry. requestId: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
    }
inline void GenerateRecords(RecordEntryVec& records)
{
    for (int i = 0; i < 10; i++)
    {   
        if (schema.GetFieldCount() == 0) 
        {        
             // BLOB
            RecordEntry record = RecordEntry(BLOB);
            record.SetData("test_blob_data" + std::to_string(i));
            records.push_back(record);
        }
        else
        {
        // TUPLE
            // RecordEntry record = RecordEntry(5);
            // record.SetString(0, "field_1_" + std::to_string(i));
            // record.SetBigint(1, 123456789l);
            // record.SetDouble(2, 123.456d);
            // record.SetDouble(3, 654.32100d);
            // record.SetString(4, "field_2_" + std::to_string(i));
            // records.push_back(record);
        } 

    }
}  
}

Meter操作

void GetMeter()
{
    try
    {
        GetMeteringInfoResult GetMeteringInfo = Client.GetMeteringInfo(projectName, topicName, shardId);

    }
    catch(DatahubException e)
    {
          cerr << e << endl;
    }
}

Subscribtion操作

订阅服务提供了服务端保存用户消费点位的功能,只需要通过简单配置和处理,就可以实现高可用的点位存储服务。

创建 Subscription

void CreateSubscription()
{ 
  std::string projectName = "";
  std::string topicName = "";
  try
  {
    const std::string subscriptionComment = "test_subscription";
    const CreateSubscriptionResult& createSubscriptionResult = client.CreateSubscription(projectName, topicName, subscriptionComment);
  } 
   catch(const DatahubException& e)
   {
        std::cerr << "Create sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

删除 Subscription

void DeleteSubscription() 
{
  std::string projectName = "";
  std::string topicName = "";
  std::string subId = "";
  try 
  {
     client.DeleteSubscription(projectName, topicName, subId);
  } 
   catch(const DatahubException& e)
   {
      std::cerr << "Delete sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

更新 Subscription

void UpdateSubscription()
{
    try 
    {
        const std::string updateSubscriptionComment = "test_subscription_1";
      	const std::string projectName = "";
        const std::string topicName = "";
        client.UpdateSubscription(projectName, topicName, subId, updateSubscriptionComment);
      }
   catch(const DatahubException& e)
   {
      std::cerr << "Upate sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

获取Subscription列表

 void ListSubscription() 
 {
    const std::string projectName = "";
    const std::string topicName = "";
    const std::string subId = "";

    try 
    {
       const ListSubscriptionResult& subscriptionResult = client.ListSubscription(projectName, topicName, 1, 20, subId));
       std::cout << subscriptionResult.GetTotalCount() << std::endl;
    }  
   catch(const DatahubException& e)
   {
      std::cerr << "list sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

查询 Subscription

void GetSubscription() 
{
    try 
    {
    	const std::vector<SubscriptionEntry>& subscriptions = subscriptionResult.GetSubscriptions();
    	std::cout << subscriptions.size() << std::endl;

    	for (size_t i = 0; i < subscriptions.size(); ++i)
    	{
          SubscriptionEntry entry = subscriptions[i];
          std::cout <<  entry.GetSubId() << std::endl;
      }						
    }  
   catch(const DatahubException& e)
   {
      std::cerr << "Get sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

更新 Subscription 状态

 void UpdateSubscriptionState()
 {
    try 
    {
        client.UpdateSubscriptionState(projectName, topicName, subId, SubscriptionState::OFFLINE);
        std::cout << getSubscriptionResult.GetComment() << std::endl;

    } 
   catch(const DatahubException& e)
   {
      std::cerr << "Update sub fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
	
}

offset 操作

一个subscription创建后,初始状态是未消费的,要使用subscription服务提供的点位存储功能,需要进行一些offset操作。

初始化offset

 void InitSubscriptionOffsetSession()
 {
    const std::string projectName = "";
    const std::string topicName = "";
    const std::string subId = "";
    std::vector<std::string> shardIds;
    try 
    {
        const OpenSubscriptionOffsetSessionResult& offsetSessionResult =
        client.InitSubscriptionOffsetSession(projectName, topicName, subId, shardIds);
        std::cout << offsetSessionResult.GetOffsets().size() << std::endl;

    } 
   catch(const DatahubException& e)
   {
      std::cerr << "Init offset fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

获取 offset

//获取点位
void GetSubscriptionOffset() 
{
    const std::string projectName = "";
    const std::string topicName = "";
    const std::string subId = "";
    std::vector<std::string> shardIds;
    try 
    {
        const GetSubscriptionOffsetResult& getSubscriptionOffsetResult =
        client.GetSubscriptionOffset(projectName, topicName, subId, shardIds;
        std::cout << getSubscriptionOffsetResult.GetOffsets().size() << std::endl;        std::cout << getSubscriptionOffsetResult.GetOffsets().size() << std::endl;
        std::cout << getSubscriptionResult.GetComment() << std::endl;

    }  
   catch(const DatahubException& e)
   {
      std::cerr << "Get offset fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

重置 offset

void ResetSubscriptionOffset() 
{
    try 
    {
        int64_t resetTimestamp = 0l;
        int64_t resetSequence = 0l;
        uint32_t resetBatchIndex = 0u;
        const std::string projectName = "";
        const std::string topicName = "";
        const std::string subId = "";
        std::map<std::string, SubscriptionOffset> resetSubscriptionOffsets;
        for (auto iter = offsets.begin(); iter != offsets.end(); ++iter)
        {
            SubscriptionOffset offset(resetTimestamp, resetSequence, resetBatchIndex);
            resetSubscriptionOffsets.insert(
            std::pair<std::string, SubscriptionOffset>(iter->first, offset));
        }

    client.ResetSubscriptionOffset(projectName, topicName, subId, resetSubscriptionOffsets);    }  
   catch(const DatahubException& e)
   {
      std::cerr << "Reset offset fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

Connector 操作

DataHub Connector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(ODPS)、Oss、RDS&Mysql、TableStore、Oss、ElasticSearch、函数计算中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在其他云产品中使用这份数据。

创建 Connector

  • ODPS示例

void  CreateConnector() 
{
    try{
    sdk::SinkOdpsConfig config;
    config.SetEndpoint("ODPS_ENDPOINT");
    config.SetProject("ODPS_PROJECT");
    config.SetTable("ODPS_TABLE");
    config.SetAccessId("ODPS_ACCESSID");
    config.SetAccessKey("ODPS_ACCESSKEY");
    config.SetPartitionMode(sdk::PartitionMode::SYSTEM_TIME);
    config.SetTimeRange(15);
    const std::string projectName = "";
    const std::string topicName = "";
    std::vector<std::pair<std::string, std::string> > partitionConfig;
    partitionConfig.push_back(std::pair<std::string, std::string>("ds", "%Y%m%d"));
    partitionConfig.push_back(std::pair<std::string, std::string>("hh", "%H"));
    partitionConfig.push_back(std::pair<std::string, std::string>("mm", "%M"));
    config.SetPartitionConfig(partitionConfig);

    std::vector<std::string> columnFields;
    columnFields.push_back(fieldName1);

    const CreateConnectorResult& connectorResult = client.CreateConnector(projectName, topicName, sdk::ConnectorType::SINK_ODPS, columnFields, config);

    std::cout<<"Odps ConnectorId:" + connectorResult.GetConnectorId()<<std::endl;
    }
   catch(const DatahubException& e)
   {
      std::cerr << "Create odpsConnector fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

删除 Connector

void DeleteConnector() 
{
    const std::string projectName = "";
    const std::string topicName = "";
    const std::string connectorId = "";

    try {
      client.DeleteConnector(projectName, topicName, connectorId);
      } 
   	catch(const DatahubException& e)
   {
      std::cerr << "Delete Connector fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

查询 Connector

void GetConnector() 
{
    const std::string projectName = "";
    const std::string topicName = "";
    const std::string connectorId = "";
    try 
    {
        GetConnectorResult GetConnectorResult = client.GetConnector(projectName, topicName, connectorId);
        const sdk::SinkOdpsConfig* odpsConfig = dynamic_cast<const sdk::SinkOdpsConfig*>(getConnectorResult.GetConfig());
        std::cout << odpsConfig->GetPartitionConfig().size() << std::endl;
    } 
   catch(const DatahubException& e)
   {
      std::cerr << "Delete topic fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
}

列出 Connector列表

void ListConnector() {
  try {
       const std::string projectName = "";
       const std::string topicName = "";
       const ListConnectorResult& listConnectorResult = client.ListConnector(projectName, topicName);
       std::cout << listConnectorResult.GetConnectorIds().size() << std::endl;
  }  catch(const DatahubException& e)
   {
      std::cerr << "list connector fail: " << e.GetRequestId() << ", ErrorCode: " << e.GetErrorCode() << ", ErrorMessage: " << e.GetErrorMessage() << std::endl;
   }
 }