すべてのプロダクト
Search
ドキュメントセンター

DataHub:C++ 用 DataHub SDK

最終更新日:Jan 12, 2025

インストール

前提条件

GCC 4.9.2 が使用されています。C++ 用 DataHub SDK は、コンパイルに GCC 4.9.2 のみをサポートしています。 C++ 用 DataHub SDK をインストールして使用する前に、コンパイル環境が要件を満たしていることを確認してください。

SDK をダウンロードします。

初期化

Alibaba Cloud アカウントを使用して DataHub にアクセスできます。DataHub にアクセスするには、AccessKey ID と AccessKey Secret、および DataHub へのアクセスに使用されるエンドポイントを提供する必要があります。次のサンプルコードは、エンドポイントを使用して DataHub クライアントを作成する方法の例を示しています。エンドポイントの詳細については、エンドポイントをご参照ください。

     /* 設定 */
    Account account;
    account.id = "";
    account.key = "=";

    std::string projectName = "test_project";
    std::string topicName = "test_cpp";
    std::string comment = "test";

    std::string endpoint = "";
    Configuration conf(account, endpoint);

    /* Datahub クライアント */
    DatahubClient client(conf);

プロジェクトの管理

プロジェクトは、DataHub でデータを管理するための基本単位です。1 つのプロジェクトには複数のトピックが含まれています。DataHub のプロジェクトは、MaxCompute のプロジェクトとは独立しています。MaxCompute プロジェクトを DataHub で再利用することはできません。DataHub でプロジェクトを作成する必要があります。

プロジェクトの作成

void CreateProject()
{
    std::string projectName = "";
    std::string comment = "";
    try
    {
        client.CreateProject(projectName, comment);
    }
    catch(const DatahubException& e)
    {
        std::cerr << "プロジェクトの作成に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl;  // プロジェクトの作成失敗
    }
}

プロジェクトの削除

void DeleteProject()
{
    std::string projectName = "";
    try
    {
        client.DeleteProject(projectName);
    }
    catch(const DatahubException& e)
    {
        std::cerr << "プロジェクトの削除に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // プロジェクトの削除失敗
    }
}

プロジェクトの更新

void UpdateProject()
{
  std::string projectName = "";
  std::string comment = "";
  try
  {
      client.UpdateProject(projectName,comment);
  }
  catch(const DatahubException& e)
  {
      std::cerr << "プロジェクトの更新に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // プロジェクトの更新失敗
  }
}

プロジェクトのクエリ

void ListProject()
{
  try
  {
      const ListProjectResult& listProjectResult = client.ListProject();
      std::cout<<listProjectResult.GetProjectNames().size()<<std::endl;
  }
  catch(const DatahubException& e)
  {
      std::cerr << "プロジェクトのリスト取得に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // プロジェクトリストの取得失敗
  }
}

プロジェクトの照会

void GetProject()
{
  std::string projectName = "";
  try
  {
     const GetProjectResult& projectResult = client.GetProject(projectName);
     std::cout<<projectResult.GetProject()<<std::endl;
     std::cout<<projectResult.GetComment()<<std::endl;
     std::cout<<projectResult.GetCreator()<<std::endl;
     std::cout<<projectResult.GetCreateTime()<<std::endl;
     std::cout<<projectResult.GetLastModifyTime()<<std::endl;    
  }
  catch(const DatahubException& e)
  {
      std::cerr << "プロジェクトの取得に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // プロジェクトの取得失敗
  }
}

トピックの管理

トピックは、DataHub でのデータのサブスクライブとパブリッシュの最小単位です。トピックを使用して、さまざまな種類のストリーミングデータを区別できます。2 種類のトピック (タプルと BLOB) がサポートされています。

  1. バイナリデータのブロックを BLOB トピックのレコードとして書き込むことができます。

  2. タプルトピックには、データベースのデータレコードに似たレコードが含まれています。各レコードには複数の列が含まれています。タプルトピックのレコードスキーマを指定する必要があります。これは、タプルトピックのデータがネットワーク経由で文字列として送信されるためです。したがって、データ型の変換にはスキーマが必要です。次の表に、サポートされているデータ型を示します。

説明

値の範囲

BIGINT

8 バイトの符号付き整数。

-9223372036854775807 ~ 9223372036854775807

DOUBLE

倍精度浮動小数点数。長さは 8 バイトです。

-1.0 _10^308 ~ 1.0 _10^308

BOOLEAN

ブール型。

True と False、true と false、または 0 と 1。

TIMESTAMP

タイムスタンプの型。

マイクロ秒単位の精度のタイムスタンプ。

STRING

文字列。STRING 型のデータは、UTF-8 のみを使用してエンコードできます。

STRING 型の列のすべての値のサイズは 2 MB を超えることはできません。

TINYINT

1 バイトの整数。

-128 ~ 127

SMALLINT

2 バイトの整数。

-32768 ~ 32767

INTEGER

4 バイトの整数。

-2147483648 ~ 2147483647

FLOAT

4 バイトの単精度浮動小数点数。

-3.40292347_10^38 ~ 3.40292347_10^38

タプルトピックの作成

void CreateTupleTopic(){
    RecordSchema schema;
    std::string fieldName1 = "a";
    std::string fieldName2 = "b";
    std::string fieldName3 = "c";
    std::string fieldComment1 = "フィールド 1 のコメント"; // field1 comment
    std::string fieldComment2 = "フィールド 2 のコメント"; // field2 comment
    std::string fieldComment3 = "フィールド 3 のコメント"; // field3 comment
    schema.AddField(Field(fieldName1/*フィールド名*/, BIGINT, true, fieldComment1)); // Fieldname
    schema.AddField(Field(fieldName2/*フィールド名*/, DOUBLE, true, fieldComment2)); // Fieldname
    schema.AddField(Field(fieldName3/*フィールド名*/, STRING, true, fieldComment3)); // Fieldname

    /* トピックの作成 */
    int shardCount = 3;
    int lifeCycle = 7;
    RecordType type = TUPLE;
    std::string projectName = "";
    std::string topicName = "";

    try
    {
        client.CreateTopic(projectName, topicName, shardCount, lifeCycle, type, schema, comment);
    }
    catch (const DatahubException& e)
    {
        std::cerr << "トピックの作成に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // トピックの作成失敗

    }
}

BLOB トピックの作成

 void CreateBlobTopic()
{
    /* トピックの作成 */
    int shardCount = 3;
    int lifeCycle = 7;
    RecordType type = BLOB;
    std::string projectName = "";
    std::string topicName = "";
    try
    {
        client.CreateTopic(projectName, topicName, shardCount, lifeCycle, type, comment);
    }
    catch (const DatahubException& e)
    {
        std::cerr << "トピックの作成に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // トピックの作成失敗
    }
}

}

トピックの削除

void DeleteTopic()
{
    std::string projectName = "";
    std::string topicName = "";
    try
    {
       client.DeleteTopic(projectName, topicName);  
    }
    catch(const DatahubException& e)
    {
        std::cerr << "トピックの削除に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // トピックの削除失敗
    }

}

トピックのクエリ

void ListTopic(){
    std::string projectName = "";
    try
    {
        const ListTopicResult& listTopicResult = client.ListTopic(projectName);
        std::cout<<listTopicResult.GetTopicNames().size()<<std::endl;
    }
    catch(const DatahubException& e)
    {
        std::cerr << "トピックの取得に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // トピックの取得失敗
    }

}

トピックの更新

void UpdateTopic(){
    const std::string updateComment = "test1";
    int updateLifecycle = 7;
    std::string projectName = "";
    std::string topicName = "";
    try
    {
        client.UpdateTopic(projectName, topicName, updateLifecycle, updateProjectComment); // updateProjectComment seems incorrect, should be updateComment?
    }
    catch(const DatahubException& e)
    {
        std::cerr << "トピックの更新に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // トピックの更新失敗
    }

}

トピックの照会

void GetTopic(){
    std::string projectName = "";
    std::string topicName = "";

    try
    {
        const GetTopicResult& getTopicResult = client.GetTopic(projectName, topicName);
        cout<<getTopicResult.GetComment()<<endl;
    }
    catch(const DatahubException& e)
    {
        std::cerr << "トピックの取得に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // トピックの取得失敗
    }

}

シャードの管理

シャードは、トピックでのデータ送信に使用される同時トンネルです。各シャードには ID があります。シャードはさまざまな状態になる可能性があります。Opening: シャードが開始されています。Active: シャードが開始され、サービスの提供に使用できます。アクティブな各シャードはサーバーリソースを消費します。ビジネス要件に基づいてシャードを作成することをお勧めします。

シャードのリスト

void ListShard()
{
    try
    {
       const ListShardResult& lsr = client.ListShard(projectName, topicName);
       std::cout<<lsr.GetShards().size()<<std::endl;
       std::vector<ShardEntry> shards = lsr.GetShards();

       std::vector<std::string> shardIds;
       for (size_t i = 0; i < shards.size(); ++i)
       {		
            ShardEntry shardEntry = shards[i];
            shardIds.push_back(shardEntry.GetShardId());
       }
    } 
    catch(const DatahubException& e)
    {
        std::cerr << "シャードのリスト取得に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // シャードリストの取得失敗
    }
}

シャードの分割

void SplitShard()
{
  std::string projectName = "";
  std::string topicName = "";
  try
  {
    const SplitShardResult& ssr = client.SplitShard(projectName, topicName, "0", "00000000000000000000000000AAAAAA");
    std::cout<<ssr.GetChildShards().size()<<std::endl;
  }
    catch(const DatahubException& e)
    {
        std::cerr << "シャード ID の分割に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // シャード ID の分割失敗
    }
}

シャードのマージ

void MergeShard()
{
  std::string projectName = "";
  std::string topicName = "";

  try
  {
    const MergeShardResult& msr = client.MergeShard(projectName, topicName, "0", "1");
    std::cout<<msr.GetChildShard().GetShardId()<<std::endl;
    std::cout<<msr.GetChildShard().GetBeginHashKey()<<std::endl;
    std::cout<<msr.GetChildShard().GetEndHashKey()<<std::endl;
  }
   catch(const DatahubException& e)
   {
        std::cerr << "シャードのマージに失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // シャードのマージ失敗
   }
	
}

データの読み取りと書き込み

アクティブなシャードと閉じられたシャードからデータを読み取ることができます。ただし、アクティブなシャードにのみデータを書き込むことができます。

データの読み取り

  1. カーソルの取得

トピックからデータを読み取るには、シャードと、データの読み取りを開始するカーソルを指定します。カーソルは、OLDEST、LATEST、SEQUENCE、SYSTEM_TIME の方法で取得できます。

  • OLDEST: 指定されたシャードの最も古い有効なレコードを指すカーソル。

  • LATEST: 指定されたシャードの最新のレコードを指すカーソル。

  • SEQUENCE: 指定されたシーケンス番号のレコードを指すカーソル。

  • SYSTEM_TIME: タイムスタンプ値が指定されたタイムスタンプ値以上である最初のレコードを指すカーソル。

void GetCursor()
{
    try
   {
      const GetCursorResult& r1 = client.GetCursor(projectName, topicName, "0", CURSOR_TYPE_OLDEST);
      std::string cursor = r1.GetCursor();
   } 
    catch(const DatahubException& e)
    {
        std::cerr << "カーソルの取得に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // カーソルの取得失敗
    }
	
}
  1. データの読み取り

void sub(DatahubClient& client)
{
    std::string projectName = "";
    std::string topicName = "";
    std::string shardId = "0";
    std::string cursor;
    RecordSchema schema;
    OpenSubscriptionOffsetSessionResult osor = client.InitSubscriptionOffsetSession(projectName, topicName, subId, {shardId});
    SubscriptionOffset subscription = osor.GetOffsets().at(shardId);
    if (subscription.GetSequence() < 0)
    {
        cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_OLDEST).GetCursor();
    }
    else
    {
        int64_t nextSequence = subscription.GetSequence() + 1;
        try
        {
            cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_SEQUENCE, nextSequence).GetCursor();
        }
        catch (const DatahubException& e)
        {
            cursor = client.GetCursor(projectName, topicName, shardId, CURSOR_TYPE_OLDEST).GetCursor();
        }
    }

    int64_t readTotal = 0;
    int fetchNum = 10;
    while (true)
    {
        try
        {
            GetRecordResult grr = client.GetRecord(projectName, topicName, shardId, cursor, fetchNum, subId);
            if (grr.GetRecordCount() <= 0)
            {
                std::cout << "null を読み取りました。1 秒間待機します。" << std::endl; // Read null, wait for 1s.
                sleep(1);
                continue;
            }
            
            for (auto recordResult : grr.GetRecords())
            {
                ProcessRecords(recordResult);
                if (++readTotal % 1000 == 0)
                {
                    subscription.SetSequence(recordResult.GetSequence());
                    subscription.SetTimestamp(recordResult.GetSystemTime());
                    std::map<std::string, SubscriptionOffset> offsets;
                    offsets[shardId] = subscription;
                    try
                    {
                        client.UpdateSubscriptionOffset(projectName, topicName, subId, offsets);
                    }
                    catch (const DatahubException& e)
                    {
                        std::cerr << "サブスクリプションオフセットの更新に失敗しました。リクエスト ID: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // Update subscription offset fail.
                        throw ;
                    }
                }
            }
            cursor = grr.GetNextCursor();
        }
        catch (const DatahubException& e)
        {
            std::cerr << "レコードの取得に失敗しました。リクエスト ID: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // Get record fail.
        }
    }

inline void ProcessRecords(const RecordResult& record)
{
    if (schema.GetFieldCount() == 0)       // BLOB
    {
        int len = 0;
        const char* data = record.GetData(len);
        printf("%s\n", data);
    }
    else                                   // TUPLE
    {
        for (int j = 0; j < schema.GetFieldCount(); ++j)
        {
            const Field &field = schema.GetField(j);
            const FieldType fieldType = field.GetFieldType();
            switch (fieldType)
            {
                case BIGINT:
                    printf("%ld\n", record.GetBigint(j));
                    break;
                case DOUBLE:
                    printf("%.15lf\n", record.GetDouble(j));
                    break;
                case STRING:
                    printf("%s\n", record.GetString(j).c_str());
                    break;
                default:
                    break;
            }
        }
    }
} 
}

共同消費データの読み取りの例

using namespace aliyun;
using namespace aliyun::datahub;

std::atomic_int gTotalNum(0);

Account gAccount;
std::string gEndpoint;
std::string gProjectName;
std::string gTopicName;
std::string gSubId;
bool gAutoAck;
int32_t gFetchNum;
int64_t gSessionTimeoutMs;

void Usage(const char* prog)
{
    const char *start = strrchr(prog, '/');
    start = (start != NULL) ? start + 1 : prog;
    fprintf(stdout,
        "%s Ver %s\n"
        "使用方法: <プログラム> [オプション...]\n"  // Usage
        "   -i --accessid           <string>          アクセス ID を設定します [必須]\n" // set access id [Mandatory]
        "   -k --accesskey          <string>          アクセスキーを設定します [必須]\n" // set access key [Mandatory]
        "   -e --endpoint           <string>          エンドポイントを設定します [必須]\n" // set endpoint [Mandatory]
        "   -p --project            <string>          プロジェクトを設定します [必須]\n" // set project [Mandatory]
        "   -t --topic              <string>          トピックを設定します [必須]\n" // set topic [Mandatory]
        "   -s --subId              <string>          サブスクリプション ID を設定します [必須]\n" // set subscription id [Mandatory]
        "   -a --autoAck            <bool>            自動 ACK オフセットを設定します\n" // set auto ack offset
        "   -f --fetchNum           <int>             フェッチ制限数を設定します\n" // set fetch limit num
        "   -S --sessionTimeoutMs   <int>             セッションタイムアウトを設定します (ミリ秒)\n" // set session timeout (ms)
        "   -v --version                              バージョンを表示します\n" // show version
        "   -h --help                                 ヘルプメッセージを表示します\n", start, PROG_VERSION); // show help message
}

void parse_args(int argc, char* argv[])
{
    int opt_id;
    static struct option long_opts[] =
    {
        { "accessid", 1, 0, 'i'},
        { "accesskey", 1, 0, 'k'},
        { "endpoint", 1, 0, 'e' },
        { "project", 1, 0, 'p' },
        { "topic", 1, 0, 't' },
        { "subId", 1, 0, 's' },
        { "autoAck", 1, 0, 'a'},
        { "fetchNum", 1, 0, 'f'},
        { "sessionTimeoutMs", 1, 0, 'S'},
        { "version", 0, 0, 'v' },
        { "help", 0, 0, 'h' },
        { NULL, 0, 0, 0 }
    };

    while (1)
    {
        int ret = getopt_long(argc, argv, "i:k:e:p:t:s:a:f:S:vh", long_opts, &opt_id);
        if (ret == -1)
        {
            break;
        }
        switch (ret)
        {
        case 'i':
            gAccount.id = optarg;
            break;
        case 'k':
            gAccount.key = optarg;
            break;
        case 'e':
            gEndpoint = optarg;
            break;
        case 'p':
            gProjectName = optarg;
            break;
        case 't':
            gTopicName = optarg;
            break;
        case 's':
            gSubId = optarg;
            break;
        case 'a':
            gAutoAck = (bool)std::atoi(optarg);
            break;
        case 'f':
            gFetchNum = std::atoi(optarg);
            break;
        case 'S':
            gSessionTimeoutMs = (int64_t)std::atoi(optarg);
            break;
        case 'v':
        case 'h':
            Usage(argv[0]);
            exit(0);
            break;
        default:
            break;
        }
    }
}

inline void ProcessRecords(const TopicMetaPtr& topicMeta, const RecordResultPtr& recordPtr)
{
    const RecordSchema& schema = topicMeta->GetRecordSchema();
    if (schema.GetFieldCount() == 0)        // BLOB
    {
        int len = 0;
        const char* data = recordPtr->GetData(len);
        printf("%s\n", data);
    }
    else                                    // TUPLE
    {
        for (int j = 0; j < schema.GetFieldCount(); ++j)
        {
            const Field& field = schema.GetField(j);
            const FieldType fieldType = field.GetFieldType();
            switch (fieldType)
            {
                case BIGINT:
                    printf("%ld ", recordPtr->GetBigint(j));
                    break;
                case INTEGER:
                    printf("%d ", recordPtr->GetInteger(j));
                    break;
                case SMALLINT:
                    printf("%hd ", recordPtr->GetSmallint(j));
                    break;
                case TINYINT:
                    printf("%hhd ", recordPtr->GetTinyint(j));
                    break;
                case DOUBLE:
                    printf("%.5lf ", recordPtr->GetDouble(j));
                    break;
                case FLOAT:
                    printf("%.5f ", recordPtr->GetFloat(j));
                    break;
                case STRING:
                    printf("%s ", recordPtr->GetString(j).c_str());
                    break;
                case BOOLEAN:
                    printf("%s ", recordPtr->GetBoolean(j) ? "true" : "false");
                    break;
                case TIMESTAMP:
                    printf("%ld ", recordPtr->GetTimestamp(j));
                    break;
                case DECIMAL:
                    printf("%s ", recordPtr->GetDecimal(j).c_str());
                    break;
                default:
                    break;
            }
        }
        printf("\n");
    }
}

void CollaborativeConsume(const std::string& project, const std::string& topic, const std::string& subId, const ConsumerConfiguration& conf)
{
    DatahubConsumer consumer(project, topic, subId, conf);
    const TopicMetaPtr& topicMeta = consumer.GetTopicMeta();

    uint64_t readRecordNum = 0;
    try{
        while (true)
        {
            auto recordPtr = consumer.Read(60000);
            if (recordPtr == nullptr)
            {
                break;
            }
            // データを処理します。 // Process data.
            ProcessRecords(topicMeta, recordPtr);
            if (!gAutoAck)
            {
                recordPtr->GetMessageKey()->Ack();      // auto_ack の値が false に設定されている場合、Ack 関数はデータの処理後に実行されます。 // If the value of auto_ack is set to false, the Ack function is executed after the data is processed.
            }
            readRecordNum++;
        }
    }
    catch (const std::exception& e)
    {
        std::cerr << "読み取りに失敗しました: " << e.what() << std::endl; // Read fail
    }
    std::cout << readRecordNum << " レコードを合計で読み取りました" << std::endl; // Read ... records total
}

int main(int argc, char* argv[])
{
    parse_args(argc, argv);

    ConsumerConfiguration consumerConf(gAccount, gEndpoint);
    consumerConf.SetLogFilePath("./DatahubCollaborativeConsumer.log");
    consumerConf.SetAutoAckOffset(gAutoAck);
    if (gFetchNum > 0)
    {
        consumerConf.SetFetchLimitNum(gFetchNum);
    }
    if (gSessionTimeoutMs > 0)
    {
        consumerConf.SetSessionTimeout(gSessionTimeoutMs);
    }

    CollaborativeConsume(gProjectName, gTopicName, gSubId, consumerConf);

    return 0;
}
説明

共同消費は、SDK のバージョン 2.25.0 以降でサポートされている新機能です。SDK の以前のバージョンではサポートされていません。

データの書き込み

void pub(DatahubClient& client)
{
    std::vector<RecordEntry> records;
    GenerateRecords(records);

    try
    {
        PutRecordResult prr = client.PutRecord(projectName, topicName, records);
        if (prr.GetFailedRecordCount() > 0)
        {
            pubRetry(client, prr.GetFailedRecords(), 3);
        }
    }
    catch (const DatahubException& e)
    {
        std::cerr << "レコードの書き込みに失敗しました。リクエスト ID: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // Put records fail.
    }
}

void pubRetry(DatahubClient& client, const std::vector<RecordEntry>& records, int retryTimes)
{
    if (retryTimes <= 0)
    {
        std::cout << "レコードの書き込み再試行に失敗しました。レコードサイズ: " << records.size() << std::endl; // Put record retry fail.
        return ;
    }

    try
    {
        PutRecordResult prr = client.PutRecord(projectName, topicName, records);
        if (prr.GetFailedRecordCount() > 0)
        {
            pubRetry(client, prr.GetFailedRecords(), retryTimes-1);
        }
        else
        {
            std::cout << "レコードの書き込み再試行に成功しました。" << std::endl; // Put records retry success.
        }
    }
    catch (const DatahubException& e)
    {
        std::cerr << "書き込み再試行時にレコードの書き込みに失敗しました。リクエスト ID: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // Put records fail when put retry.
    }
inline void GenerateRecords(RecordEntryVec& records)
{
    for (int i = 0; i < 10; i++)
    {   
        if (schema.GetFieldCount() == 0) 
        {        
             // BLOB
            RecordEntry record = RecordEntry(BLOB);
            record.SetData("test_blob_data" + std::to_string(i));
            records.push_back(record);
        }
        else
        {
        // TUPLE
            // RecordEntry record = RecordEntry(5);
            // record.SetString(0, "field_1_" + std::to_string(i));
            // record.SetBigint(1, 123456789l);
            // record.SetDouble(2, 123.456d);
            // record.SetDouble(3, 654.32100d);
            // record.SetString(4, "field_2_" + std::to_string(i));
            // records.push_back(record);
        } 

    }
}  
}

データの非同期書き込みの例

using namespace aliyun;
using namespace aliyun::datahub;

Account gAccount;
std::string gEndpoint;
std::string gProjectName;
std::string gTopicName;
int gEpochNum = 10;
int gRecordNum = 10;
int64_t gMaxBufferRecords = 0l;
int64_t gMaxBufferSize = 0l;
int64_t gMaxBufferTimeMs = 0l;
int64_t gMaxRecordPackQueueLimit = 0l;
StringVec gShardIds;

inline void StringSplit(StringVec& tar, const std::string& src, const std::string& pre)
{
    size_t start_pos = 0, end_pos = src.size();
    size_t pos = src.find(pre, start_pos);
    while (pos != std::string::npos)
    {
        std::string addStr = src.substr(start_pos, pos - start_pos);
        if (!addStr.empty())
        {
            tar.push_back(addStr);
        }
        start_pos = pos + 1;
        pos = src.find(pre, start_pos);
    }
    std::string addStr = src.substr(start_pos, end_pos - start_pos);
    if (!addStr.empty())
    {
        tar.push_back(addStr);
    }
}

void Usage(const char* prog)
{
    const char *start = strrchr(prog, '/');
    start = (start != NULL) ? start + 1 : prog;
    fprintf(stdout,
        "%s Ver %s\n"
        "使用方法: <プログラム> [オプション...]\n" // Usage
        "   -i --accessid           <string>          アクセス ID を設定します [必須]\n" // set access id
        "   -k --accesskey          <string>          アクセスキーを設定します [必須]\n" // set access key
        "   -e --endpoint           <string>          エンドポイントを設定します [必須]\n" // set endpoint
        "   -p --project            <string>          プロジェクトを設定します [必須]\n" // set project
        "   -t --topic              <string>          トピックを設定します [必須]\n" // set topic
        "   -E --epochNum           <int>             書き込みのエポック数を設定します [必須]\n" // set epoch num for write
        "   -N --recordNum          <int>             各エポックのレコード数を設定します [必須]\n" // set record num for each epoch
        "   -R --maxBufferRecords   <int>             最大バッファレコード数を設定します\n" // set max buffer record count
        "   -S --maxBufferSize      <int>             最大バッファサイズを設定します\n" // set max buffer size
        "   -M --maxBufferTimeMs    <int>             最大バッファ時間を設定します (ミリ秒)\n" // set max buffer time (ms)
        "   -Q --maxRecordPackLimit <int>             最大レコードパックキュー制限を設定します\n" // set max record pack queue limit
        "   -H --shardIds           <string>          シャードを設定します (, で区切る)\n" // set shards (split by ',')
        "   -v --version                              バージョンを表示します\n" // show version
        "   -h --help                                 ヘルプメッセージを表示します\n", start, PROG_VERSION); // show help message
}

void parse_args(int argc, char* argv[])
{
    int opt_id;
    static struct option long_opts[] =
    {
        { "accessid", 1, 0, 'i'},
        { "accesskey", 1, 0, 'k'},
        { "endpoint", 1, 0, 'e' },
        { "project", 1, 0, 'p' },
        { "topic", 1, 0, 't' },
        { "epochNum", 1, 0, 'E'},
        { "recordNum", 1, 0, 'N'},
        { "maxBufferRecords", 1, 0, 'R'},
        { "mMaxBufferSize", 1, 0, 'S'},
        { "maxBufferTimeMs", 1, 0, 'M'},
        { "maxRecordPackQueueLimit", 1, 0, 'Q'},
        { "shardIds", 1, 0, 'H'},
        { "version", 0, 0, 'v' },
        { "help", 0, 0, 'h' },
        { NULL, 0, 0, 0 }
    };

    while (1)
    {
        int ret = getopt_long(argc, argv, "i:k:e:p:t:E:N:R:S:M:Q:H:vh", long_opts, &opt_id);
        if (ret == -1)
        {
            break;
        }
        switch (ret)
        {
        case 'i':
            gAccount.id = optarg;
            break;
        case 'k':
            gAccount.key = optarg;
            break;
        case 'e':
            gEndpoint = optarg;
            break;
        case 'p':
            gProjectName = optarg;
            break;
        case 't':
            gTopicName = optarg;
            break;
        case 'E':
            gEpochNum = std::atoi(optarg);
            break;
        case 'N':
            gRecordNum = std::atoi(optarg);
            break;
        case 'R':
            gMaxBufferRecords = (int64_t)std::atoi(optarg);
            break;
        case 'S':
            gMaxBufferSize = (int64_t)std::atoi(optarg);
            break;
        case 'M':
            gMaxBufferTimeMs = (int64_t)std::atoi(optarg);
            break;
        case 'Q':
            gMaxRecordPackQueueLimit = (int64_t)std::atoi(optarg);
            break;
        case 'H':
            StringSplit(gShardIds, optarg, ",");
            break;
        case 'v':
        case 'h':
            Usage(argv[0]);
            exit(0);
            break;
        default:
            break;
        }
    }

    if (gEpochNum <= 0 || gRecordNum <= 0)
    {
        std::cerr << "無効なパラメータです!" << std::endl; // Invalid parameter!
        exit(1);
    }
}

inline void GenerateTupleRecord(const RecordSchema& schema, RecordEntry& record)
{
    for (int i = 0; i < schema.GetFieldCount(); i++)
    {
        const Field& field = schema.GetField(i);
        const FieldType& type = field.GetFieldType();
        switch (type)
        {
            case BIGINT:
                record.SetBigint(i, 1234l);
                break;
            case DOUBLE:
                record.SetDouble(i, 1.234);
                break;
            case BOOLEAN:
                record.SetBoolean(i, true);
                break;
            case TIMESTAMP:
                record.SetTimestamp(i, 1234l);
                break;
            case STRING:
                record.SetString(i, "1234");
                break;
            case DECIMAL:
                record.SetDecimal(i, "1234");
                break;
            case INTEGER:
                record.SetInteger(i, (int32_t)1234);
                break;
            case FLOAT:
                record.SetFloat(i, 1.234);
                break;
            case TINYINT:
                record.SetTinyint(i, (int8_t)1234);
                break;
            case SMALLINT:
                record.SetSmallint(i, (int16_t)1234);
                break;
            default:
                break;
        }
    }
}

inline void GenerateRecords(TopicMetaPtr topicMeta, RecordEntryVec& records)
{
    const RecordSchema& schema = topicMeta->GetRecordSchema();
    for (int i = 0; i < gRecordNum; i++)
    {
        if (topicMeta->GetRecordType() == "TUPLE")
        {
            // TUPLE
            records.emplace_back(schema.GetFieldCount());
            GenerateTupleRecord(schema, records.back());
        }
        else
        {
            // BLOB
            records.emplace_back(BLOB);
            records.back().SetData("test_blob_data" + std::to_string(i));
        }
    }
}

void AsyncProduce(const std::string& project, const std::string& topic, const ProducerConfiguration& conf, const StringVec& shardIds)
{
    DatahubProducer producer(project, topic, conf, shardIds);
    const TopicMetaPtr& topicMeta = producer.GetTopicMeta();

    std::vector<WriteResultFuturePtr> resultFutureVecs;
    std::map<std::string, uint64_t> writeRecordNum;
    for (const std::string& shardId : shardIds)
    {
        writeRecordNum[shardId] = 0;
    }

    try
    {
        // データを生成します。 // Generate data
        RecordEntryVec records;
        GenerateRecords(topicMeta, records);
        for (int i = 0; i < gEpochNum; i++)
        {
            auto result = producer.WriteAsync(records);
            resultFutureVecs.push_back(result);
        }
    }
    catch(const std::exception& e)
    {
        std::cerr << "WriteAsync に失敗しました: " << e.what() << std::endl; // WriteAsync fail
    }

    producer.Flush();

    for (auto it = resultFutureVecs.begin(); it != resultFutureVecs.end(); it++)
    {
        (*it)->wait();
        try
        {
            WriteResultPtr result = (*it)->get();
            writeRecordNum[result->GetShardId()] += gRecordNum;
        }
        catch (const std::exception& e)
        {
            std::cerr << "レコードの書き込みに失敗しました: " << e.what() << std::endl; // Write records fail
        }
    }

    for (auto it = writeRecordNum.begin(); it != writeRecordNum.end(); it++)
    {
        std::cout << it->second << " レコードをシャード " << it->first << " に書き込みました" << std::endl; // Write ... records to shard ...
    }
}

int main(int argc, char *argv[])
{
    parse_args(argc, argv);

    ProducerConfiguration producerConf(gAccount, gEndpoint);
    producerConf.SetEnableProtobuf(true);
    producerConf.SetLogFilePath("./DatahubAsyncProducer.log");
    if (gMaxBufferRecords > 0)
    {
        producerConf.SetMaxAsyncBufferRecords(gMaxBufferRecords);
    }
    if (gMaxBufferSize > 0)
    {
        producerConf.SetMaxAsyncBufferSize(gMaxBufferSize);
    }
    if (gMaxBufferTimeMs > 0)
    {
        producerConf.SetMaxAsyncBufferTimeMs(gMaxBufferTimeMs);
    }
    if (gMaxRecordPackQueueLimit > 0)
    {
        producerConf.SetMaxRecordPackQueueLimit(gMaxRecordPackQueueLimit);
    }

    AsyncProduce(gProjectName, gTopicName, producerConf, gShardIds);

    return 0;
}

データの同期書き込みの例

using namespace aliyun;
using namespace aliyun::datahub;

Account gAccount;
std::string gEndpoint;
std::string gProjectName;
std::string gTopicName;
int gEpochNum = 10;
int gRecordNum = 10;
StringVec gShardIds;

inline void StringSplit(StringVec& tar, const std::string& src, const std::string& pre)
{
    size_t start_pos = 0, end_pos = src.size();
    size_t pos = src.find(pre, start_pos);
    while (pos != std::string::npos)
    {
        std::string addStr = src.substr(start_pos, pos - start_pos);
        if (!addStr.empty())
        {
            tar.push_back(addStr);
        }
        start_pos = pos + 1;
        pos = src.find(pre, start_pos);
    }
    std::string addStr = src.substr(start_pos, end_pos - start_pos);
    if (!addStr.empty())
    {
        tar.push_back(addStr);
    }
}

void Usage(const char* prog)
{
    const char *start = strrchr(prog, '/');
    start = (start != NULL) ? start + 1 : prog;
    fprintf(stdout,
        "%s Ver %s\n"
        "使用方法: <プログラム> [オプション...]\n" // Usage
        "   -i --accessid           <string>          アクセス ID を設定します [必須]\n" // set access id
        "   -k --accesskey          <string>          アクセスキーを設定します [必須]\n" // set access key
        "   -e --endpoint           <string>          エンドポイントを設定します [必須]\n" // set endpoint
        "   -p --project            <string>          プロジェクトを設定します [必須]\n" // set project
        "   -t --topic              <string>          トピックを設定します [必須]\n" // set topic
        "   -E --epochNum           <int>             書き込みのエポック数を設定します [必須]\n" // set epoch num for write
        "   -N --recordNum          <int>             各エポックのレコード数を設定します [必須]\n" // set record num for each epoch
        "   -H --shardIds           <string>          シャードを設定します (, で区切る)\n" // set shards (split by ',')
        "   -v --version                              バージョンを表示します\n" // show version
        "   -h --help                                 ヘルプメッセージを表示します\n", start, PROG_VERSION); // show help message
}

void parse_args(int argc, char* argv[])
{
    int opt_id;
    static struct option long_opts[] =
    {
        { "accessid", 1, 0, 'i'},
        { "accesskey", 1, 0, 'k'},
        { "endpoint", 1, 0, 'e' },
        { "project", 1, 0, 'p' },
        { "topic", 1, 0, 't' },
        { "epochNum", 1, 0, 'E'},
        { "recordNum", 1, 0, 'N'},
        { "shardIds", 1, 0, 'H'},
        { "version", 0, 0, 'v' },
        { "help", 0, 0, 'h' },
        { NULL, 0, 0, 0 }
    };

    while (1)
    {
        int ret = getopt_long(argc, argv, "i:k:e:p:t:E:N:H:vh", long_opts, &opt_id);
        if (ret == -1)
        {
            break;
        }
        switch (ret)
        {
        case 'i':
            gAccount.id = optarg;
            break;
        case 'k':
            gAccount.key = optarg;
            break;
        case 'e':
            gEndpoint = optarg;
            break;
        case 'p':
            gProjectName = optarg;
            break;
        case 't':
            gTopicName = optarg;
            break;
        case 'E':
            gEpochNum = std::atoi(optarg);
            break;
        case 'N':
            gRecordNum = std::atoi(optarg);
            break;
        case 'H':
            StringSplit(gShardIds, optarg, ",");
            break;
        case 'v':
        case 'h':
            Usage(argv[0]);
            exit(0);
            break;
        default:
            break;
        }
    }

    if (gEpochNum <= 0 || gRecordNum <= 0)
    {
        std::cerr << "無効なパラメータです!" << std::endl; // Invalid parameter!
        exit(1);
    }
}

inline void GenerateTupleRecord(const RecordSchema& schema, RecordEntry& record)
{
    for (int i = 0; i < schema.GetFieldCount(); i++)
    {
        const Field& field = schema.GetField(i);
        const FieldType& type = field.GetFieldType();
        switch (type)
        {
            case BIGINT:
                record.SetBigint(i, 1234l);
                break;
            case DOUBLE:
                record.SetDouble(i, 1.234);
                break;
            case BOOLEAN:
                record.SetBoolean(i, true);
                break;
            case TIMESTAMP:
                record.SetTimestamp(i, 1234l);
                break;
            case STRING:
                record.SetString(i, "1234");
                break;
            case DECIMAL:
                record.SetDecimal(i, "1234");
                break;
            case INTEGER:
                record.SetInteger(i, (int32_t)1234);
                break;
            case FLOAT:
                record.SetFloat(i, 1.234);
                break;
            case TINYINT:
                record.SetTinyint(i, (int8_t)1234);
                break;
            case SMALLINT:
                record.SetSmallint(i, (int16_t)1234);
                break;
            default:
                break;
        }
    }
}

inline void GenerateRecords(const TopicMetaPtr& topicMeta, RecordEntryVec& records)
{
    const RecordSchema& schema = topicMeta->GetRecordSchema();
    for (int i = 0; i < gRecordNum; i++)
    {
        if (topicMeta->GetRecordType() == "TUPLE")
        {
            // TUPLE
            records.emplace_back(schema.GetFieldCount());
            GenerateTupleRecord(schema, records.back());
        }
        else
        {
            // BLOB
            records.emplace_back(BLOB);
            records.back().SetData("test_blob_data" + std::to_string(i));
        }
    }
}

void GeneralProduce(const std::string& project, const std::string& topic, const ProducerConfiguration& conf, const StringVec& shardIds)
{
    DatahubProducer producer(project, topic, conf, shardIds);
    const TopicMetaPtr& topicMeta = producer.GetTopicMeta();

    std::map<std::string, uint64_t> writeRecordNum;
    for (const std::string& shardId : shardIds)
    {
        writeRecordNum[shardId] = 0;
    }

    try
    {
        // データを生成します。 // Generate data.
        RecordEntryVec records;
        GenerateRecords(topicMeta, records);
        for (int i = 0; i < gEpochNum; i++)
        {
            std::string shardId = producer.Write(records);
            writeRecordNum[shardId] += records.size();
        }
    }
    catch(const std::exception& e)
    {
        std::cerr << "書き込みに失敗しました: " << e.what() << std::endl; // Write fail
    }

    for (auto it = writeRecordNum.begin(); it != writeRecordNum.end(); it++)
    {
        std::cout << it->second << " レコードをシャード " << it->first << " に書き込みました" << std::endl; // Write ... records to shard ...
    }
}

int main(int argc, char *argv[])
{
    parse_args(argc, argv);

    ProducerConfiguration producerConf(gAccount, gEndpoint);
    producerConf.SetEnableProtobuf(true);
    producerConf.SetLogFilePath("./DatahubGeneralProducer.log");

    GeneralProduce(gProjectName, gTopicName, producerConf, gShardIds);

    return 0;
}

メータリング情報の照会

void GetMeter()
{
    try
    {
        GetMeteringInfoResult GetMeteringInfo = Client.GetMeteringInfo(projectName, topicName, shardId);

    }
    catch(DatahubException e)
    {
          cerr << e << endl;
    }
}
説明

データの非同期書き込みと同期書き込みの機能は、SDK のバージョン 2.25.0 以降でサポートされています。

サブスクリプションの管理

DataHub を使用すると、サーバーはサブスクリプションの消費オフセットを保存できます。単純な構成を実行することで、可用性の高いオフセットストレージサービスを取得できます。

サブスクリプションの作成

void CreateSubscription()
{ 
  std::string projectName = "";
  std::string topicName = "";
  try
  {
    const std::string subscriptionComment = "test_subscription";
    const CreateSubscriptionResult& createSubscriptionResult = client.CreateSubscription(projectName, topicName, subscriptionComment);
  } 
   catch(const DatahubException& e)
   {
        std::cerr << "サブスクリプションの作成に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // サブスクリプションの作成失敗
   }
}

サブスクリプションの削除

void DeleteSubscription() 
{
  std::string projectName = "";
  std::string topicName = "";
  std::string subId = "";
  try 
  {
     client.DeleteSubscription(projectName, topicName, subId);
  } 
   catch(const DatahubException& e)
   {
      std::cerr << "サブスクリプションの削除に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // サブスクリプションの削除失敗
   }
}

サブスクリプションの更新

void UpdateSubscription()
{
    try 
    {
        const std::string updateSubscriptionComment = "test_subscription_1";
      	const std::string projectName = "";
        const std::string topicName = "";
        client.UpdateSubscription(projectName, topicName, subId, updateSubscriptionComment);
      }
   catch(const DatahubException& e)
   {
      std::cerr << "サブスクリプションの更新に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // サブスクリプションの更新失敗
   }
}

サブスクリプションのリスト

 void ListSubscription() 
 {
    const std::string projectName = "";
    const std::string topicName = "";
    const std::string subId = "";    try 
    {
       const ListSubscriptionResult& subscriptionResult = client.ListSubscription(projectName, topicName, 1, 20, subId)); // Extra closing parenthesis
       std::cout << subscriptionResult.GetTotalCount() << std::endl;
    }  
   catch(const DatahubException& e)
   {
      std::cerr << "サブスクリプションのリスト取得に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // サブスクリプションリストの取得失敗
   }
}

サブスクリプションの照会

void GetSubscription() 
{
    try 
    {
    	const std::vector<SubscriptionEntry>& subscriptions = subscriptionResult.GetSubscriptions(); // subscriptionResult is not defined in this scope.  Assumed to be available from a broader context.
    	std::cout << subscriptions.size() << std::endl;

    	for (size_t i = 0; i < subscriptions.size(); ++i)
    	{
          SubscriptionEntry entry = subscriptions[i];
          std::cout <<  entry.GetSubId() << std::endl;
      }						
    }  
   catch(const DatahubException& e)
   {
      std::cerr << "サブスクリプションの取得に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // サブスクリプションの取得失敗
   }
}

サブスクリプションの状態の更新

 void UpdateSubscriptionState()
 {
    try 
    {
        client.UpdateSubscriptionState(projectName, topicName, subId, SubscriptionState::OFFLINE);
        std::cout << getSubscriptionResult.GetComment() << std::endl; // getSubscriptionResult is not defined in this scope. Assumed to be available from a broader context.

    } 
   catch(const DatahubException& e)
   {
      std::cerr << "サブスクリプションの更新に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // サブスクリプションの更新失敗
   }
	
}

オフセットの管理

サブスクリプションが作成された後、最初は未消費です。サブスクリプションのオフセットストレージ機能を使用するには、オフセットに対する操作を実行します。

オフセットの初期化

 void InitSubscriptionOffsetSession()
 {
    const std::string projectName = "";
    const std::string topicName = "";
    const std::string subId = "";
    std::vector<std::string> shardIds;
    try 
    {
        const OpenSubscriptionOffsetSessionResult& offsetSessionResult =
        client.InitSubscriptionOffsetSession(projectName, topicName, subId, shardIds);
        std::cout << offsetSessionResult.GetOffsets().size() << std::endl;

    } 
   catch(const DatahubException& e)
   {
      std::cerr << "オフセットの初期化に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // オフセットの初期化失敗
   }
}

オフセットの取得

// オフセットを取得します。
void GetSubscriptionOffset() 
{
    const std::string projectName = "";
    const std::string topicName = "";
    const std::string subId = "";
    std::vector<std::string> shardIds;
    try 
    {
        const GetSubscriptionOffsetResult& getSubscriptionOffsetResult =
        client.GetSubscriptionOffset(projectName, topicName, subId, shardIds); // Missing closing parenthesis and semicolon
        std::cout << getSubscriptionOffsetResult.GetOffsets().size() << std::endl;        
        std::cout << getSubscriptionResult.GetComment() << std::endl; // getSubscriptionResult is not defined in this scope. Assumed to be available from a broader context.

    }  
   catch(const DatahubException& e)
   {
      std::cerr << "オフセットの取得に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // オフセットの取得失敗
   }
}

オフセットのリセット

void ResetSubscriptionOffset() 
{
    try 
    {
        int64_t resetTimestamp = 0l;
        int64_t resetSequence = 0l;
        uint32_t resetBatchIndex = 0u;
        const std::string projectName = "";
        const std::string topicName = "";
        const std::string subId = "";
        std::map<std::string, SubscriptionOffset> resetSubscriptionOffsets;
        for (auto iter = offsets.begin(); iter != offsets.end(); ++iter) // 'offsets' is not defined in this scope. Assumed to be available from a broader context.
        {
            SubscriptionOffset offset(resetTimestamp, resetSequence, resetBatchIndex);
            resetSubscriptionOffsets.insert(
            std::pair<std::string, SubscriptionOffset>(iter->first, offset));
        }

    client.ResetSubscriptionOffset(projectName, topicName, subId, resetSubscriptionOffsets);    }  
   catch(const DatahubException& e)
   {
      std::cerr << "オフセットのリセットに失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // オフセットのリセット失敗
   }
}

DataConnector の管理

DataHub の DataConnector は、DataHub から他のクラウドサービスにストリーミングデータを同期します。DataConnector を使用すると、DataHub トピックから MaxCompute、オブジェクトストレージサービス (OSS)、ApsaraDB RDS for MySQL、Tablestore、Elasticsearch、および Function Compute に、リアルタイムまたはほぼリアルタイムモードでデータを同期できます。DataConnector が構成されると、DataHub に書き込んだデータを他の Alibaba Cloud サービスで使用できます。

DataConnector の作成

  • 次のサンプルコードは、DataHub から MaxCompute にデータを同期するための DataConnector を作成する方法の例を示しています。

void  CreateConnector() 
{
    try{
    sdk::SinkOdpsConfig config;
    config.SetEndpoint("ODPS_ENDPOINT");
    config.SetProject("ODPS_PROJECT");
    config.SetTable("ODPS_TABLE");
    config.SetAccessId("ODPS_ACCESSID");
    config.SetAccessKey("ODPS_ACCESSKEY");
    config.SetPartitionMode(sdk::PartitionMode::SYSTEM_TIME);
    config.SetTimeRange(15);
    const std::string projectName = "";
    const std::string topicName = "";
    std::vector<std::pair<std::string, std::string> > partitionConfig;
    partitionConfig.push_back(std::pair<std::string, std::string>("ds", "%Y%m%d"));
    partitionConfig.push_back(std::pair<std::string, std::string>("hh", "%H"));
    partitionConfig.push_back(std::pair<std::string, std::string>("mm", "%M"));
    config.SetPartitionConfig(partitionConfig);

    std::vector<std::string> columnFields;
    columnFields.push_back(fieldName1); // fieldName1 is not defined in this scope. Assumed to be available from a broader context.

    const CreateConnectorResult& connectorResult = client.CreateConnector(projectName, topicName, sdk::ConnectorType::SINK_ODPS, columnFields, config);

    std::cout<<"Odps ConnectorId:" + connectorResult.GetConnectorId()<<std::endl;
    }
   catch(const DatahubException& e)
   {
      std::cerr << "odpsConnector の作成に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // odpsConnector の作成失敗
   }
}

DataConnector の削除

void DeleteConnector() 
{
    const std::string projectName = "";
    const std::string topicName = "";
    const std::string connectorId = "";

    try {
      client.DeleteConnector(projectName, topicName, connectorId);
      } 
   	catch(const DatahubException& e)
   {
      std::cerr << "Connector の削除に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // Connector の削除失敗
   }
}

DataConnector の照会

void GetConnector() 
{
    const std::string projectName = "";
    const std::string topicName = "";
    const std::string connectorId = "";
    try 
    {
        GetConnectorResult GetConnectorResult = client.GetConnector(projectName, topicName, connectorId);
        const sdk::SinkOdpsConfig* odpsConfig = dynamic_cast<const sdk::SinkOdpsConfig*>(getConnectorResult.GetConfig());
        std::cout << odpsConfig->GetPartitionConfig().size() << std::endl;
    } 
   catch(const DatahubException& e)
   {
      std::cerr << "トピックの削除に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // トピックの削除失敗 Looks like a copy/paste error. Should be "Connector の取得に失敗しました"
   }
}

DataConnector のリスト

void ListConnector() {
  try {
       const std::string projectName = "";
       const std::string topicName = "";
       const ListConnectorResult& listConnectorResult = client.ListConnector(projectName, topicName);
       std::cout << listConnectorResult.GetConnectorIds().size() << std::endl;
  }  catch(const DatahubException& e)
   {
      std::cerr << "コネクタのリスト取得に失敗しました: " << e.GetRequestId() << ", エラーコード: " << e.GetErrorCode() << ", エラーメッセージ: " << e.GetErrorMessage() << std::endl; // コネクタリストの取得失敗
   }
 }