All Products
Search
Document Center

DataHub SDK for C++

Last Updated: Aug 06, 2021

DataHub SDK for C++

Installation

Prerequisites

GCC 4.9.2 is used. DataHub SDK for C++ supports only GCC 4.9.2 for compilation. Before you install and use DataHub SDK for C++, make sure that the compilation environment is as required.

Download DataHub SDK for C++

Initialization

You can use an Alibaba Cloud account to access DataHub. To access DataHub, you must provide your AccessKey ID and AccessKey secret, and the endpoint that is used to access DataHub. The following sample code provides an example on how to create a DataHub client by using a DataHub endpoint:

     /* Configuration */
    Account account;
    account.id = "";
    account.key = "=";

    std::string projectName = "test_project";
    std::string topicName = "test_cpp";
    std::string comment = "test";

    std::string endpoint = "";
    Configuration conf(account, endpoint);

    /* Datahub Client */
    DatahubClient client(conf);

Manage projects

A project is a basic unit for managing data in DataHub. A project contains multiple topics. The projects in DataHub are independent of those in MaxCompute. You cannot reuse MaxCompute projects in DataHub. You must create projects in DataHub.

Create a project

Syntax: CreateProjectResult CeateProject(string projectName, string comment). When you create a project, you must set the project name and enter the project description. The project name must be 3 to 32 characters in length, and can contain letters, digits, and underscores (_). The project name must start with a letter and is not case-sensitive.

  • Parameters

    • projectName: the name of the project.

    • comment: the comments on the project.

  • Sample code

void CreateProject()
{
    try
    {
        client.CreateProject(projectName, comment);
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }
}

Delete a project

Syntax: DeleteProjectResult DeleteProject(string projectName). Make sure that the project contains no topic before you delete the project.

  • Parameters

    • projectName: the name of the project.

  • Sample code

void DeleteProject()
{
    try
    {
        client.DeleteProject(projectName);
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }
}

Update a project

Syntax: UpdateProjectResult UpdateProject(string projectName, string comment). You can update only the comments on a project.

  • Parameters

    • projectName: the name of the project.

    • comment: the comments on the project.

  • Sample code

    void UpdateProject()
    {
      try
      {
          client.UpdateProject(projectName,comment);
      }
      catch(DatahubException e)
      {
         cerr << e << endl;
      }
    }

Lists projects

Syntax: ListProjectResult ListProject(). The return result of the ListProject method is a ListProjectResult object, which contains a list of project names.

  • Sample code

    void ListProject()
    {
      try
      {
          client.ListProject();
          std::cout<<listProjectResult.GetProjectNames().size()<<std::endl;
      }
      catch(DatahubException e)
      {
         cerr << e << endl;
      }
    }

Query a project

Syntax: GetProjectResult GetProject(string projectName)

  • Parameters

    • projectName: the name of the project.

  • Sample code

    void GetProject()
    {
      try
      {
         GetProjectResult projectResult = client.GetProject(projectName);
         std::cout<<projectResult.GetProject()<<std::endl;
         std::cout<<projectResult.GetComment()<<std::endl;
         std::cout<<projectResult.GetCreator()<<std::endl;
         std::cout<<projectResult.GetCreateTime()<<std::endl;
         std::cout<<projectResult.GetLastModifyTime()<<std::endl;    
      }
      catch(DatahubException e)
      {
         cerr << e << endl;
      }
    }

Manage topics

A topic is the smallest unit for data subscription and publishing in DataHub. You can use topics to distinguish different types of streaming data. Two types of topics are supported: tuple and blob.

  1. You can write a block of binary data as a record to blob topics.

  2. Tuple topics contain records that are similar to data records in databases. Each record contains multiple columns. You must specify record schemas for tuple topics because the data in tuple topics is transmitted as strings over the network. Therefore, schemas are required for data type conversion. The following table describes the data types that are supported.

Type

Description

Value range

BIGINT

An eight-byte signed integer.

-9223372036854775807 to 9223372036854775807.

DOUBLE

A double-precision floating-point number. It is eight bytes in length.

-1.0 10^308 to 1.0 10^308.

BOOLEAN

The Boolean type.

True and False, true and false, or 0 and 1.

TIMESTAMP

The type of timestamp.

A timestamp that is accurate to microseconds.

STRING

A string. Only UTF-8 encoding is supported.

The size of all values in a column of the STRING type cannot exceed 1 MB.

Create a tuple topic

Syntax: CreateTopicResult CreateTopic(string projectName, string topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, string comment)

  • Parameters

    • projectName: the name of the project in which you want to create the topic.

    • topicName: the name of the topic.

    • shardCount: the number of initial shards in the topic.

    • lifeCycle: the time to live (TTL) of the data. Unit: days. The data that is written before that time is not accessible.

    • recordType: the type of record that you want to write. Valid values: TUPLE and BLOB.

    • recordSchema: the record schema for the topic.

    • comment: the comments on the topic.

  • Sample code

void CreateTupleTopic(){
    try
    {
        client.CreateTopic(projectName, topicName, shardCount,lifeCycle,RecordType.TUPLE, schema,comment);
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }
}

Create a blob topic

Syntax: CreateTopicResult CreateTopic(string projectName, string topicName, int shardCount, int lifeCycle, RecordType recordType, string comment)

  • Parameters

    • projectName: the name of the project in which you want to create the topic.

    • topicName: the name of the topic.

    • shardCount: the number of initial shards in the topic.

    • lifeCycle: the TTL of the data. Unit: days. The data that is written before that time is not accessible.

    • recordType: the type of the record you want to write. Valid values: TUPLE and BLOB.

    • comment: the comments on the topic.

  • Sample code

void CreateBlobTopic()
{
    try
    {
        client.CreateTopic(projectName,topicName,shardCount,lifeCycle,RecordType.BLOB,comment);
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }

}

Delete a topic

Syntax: DeleteTopicResult DeleteTopic(string projectName, string topicName). Make sure that the topic contains no subscription or DataConnector before you delete the topic.

  • Parameters

    • projectName: the name of the project in which you want to delete the topic.

    • topicName: the name of the topic.

  • Sample code

void DeleteTopic()
{
    try
    {
         ListTopicResult listTopicResult = client.ListTopic(projectName);
        cout<<listTopicResult.GetTopicNames().size()<<std::endl;   
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }

}

List topics

Syntax: ListTopicResult ListTopic(string projectName)

  • Parameters

    • projectName: the name of the project in which you want to list topics.

  • Sample code

void ListTopic(){
    try
    {
         ListTopicResult listTopicResult = client.ListTopic(projectName);
         cout<<listTopicResult.GetTopicNames().size()<<std::endl;        
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }

}

Update a topic

Syntax: UpdateTopicResult UpdateTopic(string projectName, string topicName,int updateLifecycle, string comment)

  • Parameters

    • projectName: the name of the project in which you want to update the topic.

    • topicName: the name of the topic.

    • comment: the comments to be updated.

    • updateLifecycle: the TTL of the topic.

  • Sample code

void GetTopic(){
    try
    {
        client.UpdateTopic(projectName, topicName, updateLifecycle, updateProjectComment);
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }

}

Query a topic

Syntax: GetTopicResult GetTopic(string projectName, tring topicName)

  • Parameters

    • projectName: the name of the project in which you want to query the topic.

    • topicName: the name of the topic.

  • Sample code

void GetTopic(){
    try
    {
        GetTopicResult getTopicResult = client.GetTopic(projectName, topicName);
        cout<<getTopicResult.GetComment()<<endl;
    }
    catch(DatahubException e)
    {
       cerr << e << endl;
    }

}

Manage shards

Shards are concurrent tunnels used for data transmission in a topic. Each shard has an ID. A shard can be in different states. Opening: The shard is being started. Active: The shard is started and can be used to provide services. Each active shard consumes server resources. We recommend that you create shards as needed.

List shards

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

  • Sample code

void ListShard()
{
    try
    {
        ListShardResult listShardResult = client.ListShard(projectName, topicName);

    } 
    catch(DatahubException e)
    {
          cerr << e << endl;
    }
}

Split a shard

Syntax: SplitShardResult splitShard(String projectName, String topicName, String shardId) or SplitShardResult splitShard(String projectName, String topicName, String shardId, String splitKey). You can split an active shard of a specified topic. After a shard is split, two active new shards are generated, whereas the original shard is closed. You can use the default split key or specify a split key to split a shard.

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard to be split.

    • splitKey: the split key that is used to split the shard.

  • Sample code

    void SplitShard()
    {
      try
      {
           SplitShardResult splitShardResult = Client.SplitShard(projectName, topicName, shardId);
          for (ShardEntry entry : splitShardResult.getNewShards()) 
          {
              std::cout << "shardId is: " << entry << std::endl;
          }
      }
      catch(DatahubException e)
      {
          cerr << e << endl;
      }
    }

Merge shards

The two active shards to be merged in a topic must be adjacent. For more information about the two adjacent shards of a shard, see the result that is returned by the ListShard method.

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard to be merged.

    • adjacentShardId: the ID of the shard that is adjacent to the specified shard.

  • Sample code

    void MergeShard()
    {
      try
      {
          MergeShardResult MergeShardResult = Client.MergeShard(projectName, topicName, shardId, adjacentShardId);
    
      }
      catch(DatahubException e)
      {
          cerr << e << endl;
      }
    }

Read and write data

You can read data from active and closed shards. However, you can write data only to active shards.

Read data

Obtain a cursor

To read data from a topic, specify a shard and the cursor from which data starts to be read. You can obtain the cursor by using the following methods: OLDEST, LATEST, SEQUENCE, and SYSTEM_TIME.

  • OLDEST: the cursor that points to the earliest valid record in the specified shard.

  • LATEST: the cursor that points to the latest record in the specified shard.

  • SEQUENCE: the cursor that points to the record of the specified sequence number.

  • SYSTEM_TIME: the cursor that points to the first record whose timestamp value is greater than or equal to the specified timestamp value.

Syntax: GetCursorResult GetCursor(String projectName, String topicName, String shardId, CursorType type) or GetCursorResult GetCursor(String projectName, String topicName, String shardId, int64_t timestamp)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • CursorType: the method that is used to obtain the cursor.

      Sample code

      void GetCursor()
      {
      try
      {
         GetCursorResult r1 = client.GetCursor(projectName, topicName, "0", OLDEST);
         std::string cursor = r1.GetCursor();
      } 
      catch(DatahubException e)
      {
         cerr << e << endl;
      }
      }

      Data reading method

      Syntax: GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • cursor: the cursor from which data starts to be read.

    • limit: the maximum number of records to be read.

  • Sample code

    • Read data from a tuple topic

      void GetRecord()
      {
      try
      {
         GetRecordResult r2 = client.GetRecord(projectName, topicName, "0", cursor, 1000);
         int count = r2.GetRecordCount();
         std::cout << "read record count is: " << count << std::endl;
         for (int i = 0; i < count; ++i)
         {
             const RecordResult& recordResult = r2.GetRecord(i);
             for (int j = 0; j < schema.GetFieldCount(); ++j)
             {
                 const Field& field = schema.GetField(j);
                 const FieldType fieldType = field.GetFieldType();
                 switch(fieldType)
                 {
                     case BIGINT:
                         std::cout << recordResult.GetBigint(j) << std::endl;
                         break;
                     case DOUBLE:
                         printf("%.15lf\n", recordResult.GetDouble(j));
                         break;
                     case STRING:
                         std::cout << recordResult.GetString(j) << std::endl;
                         break;
                     default:
                     break;
                 }
             }
         }
      
      } 
      catch(DatahubException e)
      {
           cerr << e << endl;
      }
      }

      Write data

      Syntax: PutRecordsResult putRecords(String projectName, String topicName, vector records,string compress) or PutRecordsResult putRecordByShard(String projectName, String topicName, String shardId,vector records)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • records: the list of records to be written.

  • Sample code

void PutRecord()
{
    try{
        std::vector<RecordEntry> records;
        for (int32_t i = 0; i < 100; ++i)
        {
            RecordEntry record(schema.GetFieldCount());
            record.SetShardId("0");
            for (int i = 0; i < schema.GetFieldCount(); ++i)
            {
                const Field& field = schema.GetField(i);
                const FieldType fieldType = field.GetFieldType();
                switch(fieldType)
                {
                    case BIGINT:
                           record.SetBigint(i, 1);
                           break;
                    case DOUBLE:
                        record.SetDouble(i, 117.120799999999);
                        break;
                    case STRING:
                        record.SetString(i, "345");
                        break;
                    default:
                        break;
                }
            }
            records.push_back(record);
        }
    PutRecordResult put_ret0 = client.PutRecord(projectName, topicName, records, "lz4");
    std::cout<<put_ret0.GetFailedRecordCount()<<std::endl;
        } 
    catch(DatahubException e)
    {
          cerr << e << endl;
    }
}

Query metering information

Syntax: GetMeterInfoResult GetMeteringInfo(String projectName, String topicName, String day) or GetMeterInfoResult GetMeteringInfo(String projectName, String topicName, String shardId)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • day: the date on which the topic expires.

    • shardId: the ID of the shard.

Sample code

void GetMeter()
{
    try
    {
        GetMeteringInfoResult GetMeteringInfo = Client.GetMeteringInfo(projectName, topicName, shardId);

    }
    catch(DatahubException e)
    {
          cerr << e << endl;
    }
}

Manage subscriptions

DataHub allows the server to save the consumption offsets of a subscription. You can obtain highly available offset storage services by performing simple configurations.

Create a subscription

Syntax:CreateSubscriptionResult CreateSubscription(String projectName, String topicName, String comment)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • comment: the comments on the subscription.

  • Sample code

void CreateSubscription()
{
  try
  {
     CreateSubscriptionResult createSubscriptionResult = client.CreateSubscription(projectName, topicName, subscriptionComment);

  } 
  catch(DatahubException e)
  {
     cerr << e << endl;
  }
}

Delete a subscription

Syntax: DeleteSubscriptionResult DeleteSubscription(string projectName, string topicName, string subId)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

  • Sample code

void DeleteSubscription() 
{
  try 
  {
      client.DeleteSubscription(projectName, topicName, subId);

  } 
  catch(DatahubException e)
  {
     cerr << e << endl;
  }
}

Update a subscription

Syntax: UpdateSubscriptionResult UpdateSubscription(string projectName, string topicName, string subId, String comment). You can call the UpdateSubscription method to update only the comments on an existing subscription.

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • comment: the comments to be updated.

  • Sample code

void UpdateSubscription()
{
    try 
    {
        client.UpdateSubscription(projectName, topicName, subId, SubscriptionComment);

      }
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

List subscriptions

Syntax: ListSubscriptionResult ListSubscription(string projectName, string topicName, int pageNum, int pageSize). The pageNum and pageSize parameters of the ListSubscription method specify the range of subscriptions to be listed. For example, you can set the pageNum and pageSize parameters to 1 and 10 to list the first 10 subscriptions. For another example, you can set the pageNum and pageSize parameters to 2 and 5 to list the sixth to tenth subscriptions.

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • pageNum: the number of the page to return.

    • pageSize: the number of entries to return on each page.

  • Sample code

 void ListSubscription() 
 {
    try 
    {
        ListSubscriptionResult subscriptionResult = client.ListSubscription(projectName, topicName, 1, 20, subId);

    }  
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

Query a subscription

Syntax: GetSubscriptionResult GetSubscription(string projectName, string topicName, string subId)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

void GetSubscription() 
{
    try 
    {
        GetSubscriptionResult  getSubscriptionResult = client.GetSubscription(projectName, topicName, subId);
        std::cout << getSubscriptionResult.GetComment() << std::endl;
    }  
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

Update the status of a subscription

Syntax: UpdateSubscriptionStateResult UpdateSubscriptionState(string projectName, string topicName, string subId, SubscriptionState state). A subscription can be in the OFFLINE or ONLINE state, which indicates an offline or online subscription.

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • state: the state to be updated.

  • Sample code

 void UpdateSubscriptionState()
 {
    try 
    {
        client.UpdateSubscriptionState(projectName, topicName, subId, SubscriptionState::OFFLINE);
        std::cout << getSubscriptionResult.GetComment() << std::endl;

    } 
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

Manage offsets

After a subscription is created, it is initially unconsumed. To use the offset storage feature of the subscription, perform the following operations on offsets:

Initialize an offset

Syntax: OpenSubscriptionOffsetSessionResult InitSubscriptionOffsetSession(string projectName, string topicName, string subId, StringVec shardIds). To initialize an offset, you need to call the InitSubscriptionOffsetSession method only once. The second time you call this method, a new consumption session ID is generated. In this case, the previous session becomes invalid, and you cannot submit offsets.

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • shardIds: the IDs of shards.

Sample code

 void InitSubscriptionOffsetSession()
 {
    try 
    {
        OpenSubscriptionOffsetSessionResult offsetSessionResult =
        client.InitSubscriptionOffsetSession(projectName, topicName, createSubscriptionResult.GetSubId(), shardIds);
        std::cout << offsetSessionResult.GetOffsets().size() << std::endl;

    } 
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

Obtain an offset

Syntax: GetSubscriptionOffsetResult GetSubscriptionOffset(string projectName, string topicName, string subId, StringVec shardIds). After you call the GetSubscriptionOffset method, a GetSubscriptionOffsetResult object is returned.

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • shardIds: the IDs of shards.

  • Sample code

// Obtain an offset.
void GetSubscriptionOffset() 
{
    try 
    {
        GetSubscriptionOffsetResult  getSubscriptionOffsetResult =client.GetSubscriptionOffset(projectName, topicName,subId, shardIds);
        std::cout << getSubscriptionOffsetResult.GetOffsets().size() << std::endl;
        std::cout << getSubscriptionResult.GetComment() << std::endl;

    }  
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

Reset an offset

Syntax: ResetSubscriptionOffsetResult ResetSubscriptionOffset(string projectName, string topicName, string subId, Map offsets). You can reset an offset to a specific point in time. If multiple records are involved at this point in time, the reset offset points to the first record that is involved at this point in time. After an offset is reset, the offset information changes, and the version ID is updated. If a task that is running submits offsets by using the previous version ID, the SubscriptionOffsetResetException error is reported. You can call the GetSubscriptionOffset method to obtain a new version ID.

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • offsets: the offset map of shards.

Sample code

void ResetSubscriptionOffset() 
{
    try 
    {
        client.ResetSubscriptionOffset(projectName, topicName, SubId, resetSubscriptionOffsets);
    }  
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

Manage DataConnectors

A DataConnector in DataHub synchronizes streaming data from DataHub to other cloud services. You can use DataConnectors to synchronize data from DataHub topics to MaxCompute, Object Storage Service (OSS), ApsaraDB RDS for MySQL, Tablestore, Elasticsearch, and Function Compute in real-time or near real-time mode. After DataConnectors are configured, the data you write to DataHub can be used in other Alibaba Cloud services.

Create a DataConnector

Syntax: CreateConnectorResult CreateConnector(string projectName, string topicName, ConnectorType connectorType, List columnFields, SinkConfig config) or CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, int64_t sinkStartTime, StringVec columnFields, SinkConfig config)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • ConnectorType: the type of DataConnector that you want create.

    • columnFields: the fields that you want to synchronize.

    • sinkStartTime: the time at which data starts to be synchronized to DataHub. Unit: milliseconds.

    • config: the configuration details of the specific type of DataConnector.

  • The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to MaxCompute:

void  CreateConnector() 
{
    SinkOdpsConfig config = new SinkOdpsConfig() {{
        setEndpoint(odps_endpoint);
        setProject(odps_project);
        setTable(odps_table);
        setAccessId(odps_accessId);
        setAccessKey(odps_accessKey);
        setPartitionMode(PartitionMode.SYSTEM_TIME);
        setTimeRange(60);
    }};
    // Specify the partition format.
    SinkOdpsConfig.PartitionConfig partitionConfig = new SinkOdpsConfig.PartitionConfig() {{
        addConfig("ds", "%Y%m%d");
        addConfig("hh", "%H");
        addConfig("mm", "%M");
    }};
    config.setPartitionConfig(partitionConfig);
    try 
    {
        CreateConnectorResult  connectorResult = client.CreateConnector(projectName, topicName, sdk::ConnectorType::SINK_ODPS, columnFields, config);
        std::cout<<"Odps ConnectorId:" + connectorResult.GetConnectorId()<<std::endl;
    } 
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

Delete a DataConnector

Syntax: DeleteConnectorResult DeleteConnector(string projectName, string topicName, string connectorId)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • connectorId: the DataConnector ID of the topic.

  • Sample code

void DeleteConnector() 
{
    try 
    {
         client.DeleteConnector(projectName, topicName, connectorId);
      } 
    catch(DatahubException e)
    {
          cerr << e << endl;
    }
}

Query a DataConnector

Syntax: GetConnectorResult getConnectorResult (projectName,topicName,ConnectorType.SINK_ODPS)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • ConnectorType: the type of DataConnector that you want query.

  • Sample code

void GetConnector() 
{
    try 
    {
        GetConnectorResult GetConnectorResult = client.GetConnector(projectName, topicName, connectorId);
    } 
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

Update a DataConnector

Syntax: UpdateConnectorResult UpdateConnector(string projectName, string topicName, string connectorId,SinkConfig config)

  • Parameters

    • projectName: the name of the project.

    • topicName : the name of the topic.

    • connectorId: the DataConnector ID of the topic.

    • config: the configuration details of the specific type of DataConnector.

  • Sample code

 void UpdateConnector() 
 {
    SinkOdpsConfig updateConfig;
    updateConfig.SetEndpoint("ODPS_ENDPOINT");
    updateConfig.SetProject("ODPS_PROJECT");
    updateConfig.SetTable("ODPS_TABLE");
    updateConfig.SetAccessId("ODPS_ACCESSID");
    updateConfig.SetAccessKey("ODPS_ACCESSKEY");
    updateConfig.SetPartitionMode(sdk::PartitionMode::EVENT_TIME);
    updateConfig.SetTimeRange(30);
    updateConfig.SetPartitionConfig(updatePartitionConfig);
       try 
    {
         GetConnectorResult updateConnectorResult = client.GetConnector(projectName, topicName, connectorId);
      } 
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
}

Update the status of a DataConnector

Syntax: UpdateConnectorStateResult UpdateConnectorState(string projectName, string topicName, string connectorId, ConnectorState connectorState)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • connectorId: the DataConnector ID of the topic.

    • connectorState: the state of the DataConnector. Valid values: PAUSED and RUNNING.

  • Sample code

 void UpdateConnectorState() 
 {
    try 
    {
        client.UpdateConnectorState(projectName, topicName, connectorId, sdk::ConnectorState::CONNECTOR_STOPPED);
    } 
    catch(DatahubException e)
    {
        cerr << e << endl;
    }
  }

Update the offset of a DataConnector

Syntax: UpdateConnectorOffsetResult UpdateConnectorOffset(string projectName, String topicName, ConnectorType connectorType, String shardId, ConnectorOffset offset)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • ConnectorType: the type of DataConnector.

    • shardId: the ID of the shard. If the shardID parameter is set to null, the offsets of all shards are updated.

    • offset: the offset of the DataConnector.

  • Sample code

void UpdateConnectorOffset() 
{
    ConnectorOffset offset = new ConnectorOffset() {{
        setSequence(10);
        setTimestamp(1000);
    }};
    try 
    {
        // Before you update the offset of a DataConnector, stop the DataConnector.
        Client.UpdateConnectorState(projectName, topicName, connectorId, ConnectorState.STOPPED);
           Client.UpdateConnectorOffset(projectName, topicName, connectorId, connectorOffset1);
        Client.UpdateConnectorState(projectName, topicName, connectorId, ConnectorState.RUNNING);
    } 
    catch(DatahubException e){
          cerr << e << endl;
    }
}

List DataConnectors

Note

Syntax: ListConnectorResult ListConnector(string projectName, string topicName)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

  • Sample code

void ListConnector() {
  try {
       const ListConnectorResult& listConnectorResult = client.ListConnector(projectName, topicName);
        std::cout << listConnectorResult.GetConnectorIds().size() << std::endl;

        std::string connectorId = listConnectorResult.GetConnectorIds()[0];
  } catch(std::exception& e){
          cerr << e << endl;
    }
 }

Query the shard status of a DataConnector

Note

Syntax 1: ConnectorShardStatusEntry GetConnectorShardStatusByShard(string projectName, string topicName, string connectorId,string shardId)

Syntax 2: ConnectorShardStatusEntry GetConnectorShardStatus(string projectName, string topicName, string connectorId)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • connectorId: the DataConnector ID of the topic.

  • Sample code

 void GetConnectorShardStatusByShard() {
    try {
         const GetConnectorShardStatusByShardResult& shardStatusByShardResult = client.GetConnectorShardStatusByShard(projectName, topicName, connectorId, "0");

    sdk::ConnectorShardStatusEntry statuUpdateSubscriptionsEntry = shardStatusByShardResult.GetStatusEntry();

    std::cout << (statusEntry.GetState()>= sdk::ConnectorShardState::CONTEXT_HANG && statusEntry.GetState() <= sdk::ConnectorShardState::CONTEXT_FINISHED) << std::endl;
  }  catch(std::exception& e){
          cerr << e << endl;
    }
}

Restart a DataConnector

Note

Syntax 1: ReloadConnectorResult ReloadConnector(string projectName, string topicName, string connectorId)

Syntax 2: ReloadConnectorResult ReloadConnector(string projectName, string topicName, string connectorId, string shardId)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • connectorId: the DataConnector ID of the topic.

  • Sample code

 void ReloadConnector() {
    try {
    client.ReloadConnector(projectName, topicName, connectorId);

  }  catch(std::exception& e){
          cerr << e << endl;
    }
}

Query the completion time of a DataConnector

Note

Syntax: GetConnectorDoneTimeResult GetConnectorDoneTime(string projectName, string topicName, string connectorId)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • connectorId: the DataConnector ID of the topic.

  • Sample code

 void GetDoneTime() {
    try {
        const GetConnectorDoneTimeResult& doneTimeResult = client.GetConnectorDoneTime(projectName, topicName, connectorId);
        std::cout << doneTimeResult.GetDoneTime() << std::endl;

  }  catch(std::exception& e){
          cerr << e << endl;
    }

}

Add a field

Note

Syntax: AppendConnectorFieldResult AppendConnectorField(string projectName, string topicName, string connectorId, String fieldName)

You can add a field to be synchronized by using a DataConnector, provided that a MaxCompute table contains the field.

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • fieldName: the name of the field to be added. The field can be set to null.

    • connectorId: the DataConnector ID of the topic.

  • Sample code

 void AppendConnectorField() {
    try {
        client.AppendConnectorField(projectName, topicName, connectorId, fieldName2);

  } catch(std::exception& e){
          cerr << e << endl;
    }

}