すべてのプロダクト
Search
ドキュメントセンター

DataHub:パブリッシュ/サブスクライブ操作

最終更新日:Oct 28, 2025

このトピックでは、DataHub Go SDK を使用してパブリッシュ/サブスクライブ操作を実行する方法を説明します。

データのパブリッシュ

データレコードを topic にパブリッシュするときは、各レコードに shard ID を指定します。したがって、事前に ListShard API を呼び出して、トピック配下のシャードのリストを取得することをお勧めします。PutRecords API を使用するときは、必ず戻り値の結果を確認して、書き込みに失敗したレコードがあるかどうかを判断してください。

説明

DataHub サーバーバージョン 2.12 以降、PutRecordsByShard API がサポートされています。それ以前のバージョンでは、PutRecords を使用してください。

パラメーター

パラメーター

タイプ

説明

projectName

string

プロジェクト名。

topicName

string

トピック名。

shardId

string

シャード ID。

records

/

書き込むレコードのリスト。

レスポンスの例

type PutRecordsResult struct {
    FailedRecordCount int            `json:"FailedRecordCount"`
    FailedRecords     []FailedRecord `json:"FailedRecords"`
}

エラータイプ

クラス名

エラーコード

説明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

リクエストされたリソースは存在しません。(注: これは、Split/Merge 操作の直後に別のリクエストが送信された場合に発生する可能性があります。)

AuthorizationFailedError

Unauthorized

権限付与署名の解析に失敗しました。AccessKey が正しいかどうかを確認してください。

InvalidParameterError

InvalidParameter

InvalidCursor

無効なパラメーター。

DatahubClientError

-

他のすべての例外の基本クラス。

コード例

// タプルデータをパブリッシュ
func putTupleData() {
    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("get topic failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get topic successful")
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record1.ShardId = "0"
    record1.SetValueByName("field1", "TEST1")
    record1.SetValueByName("field2", 1)
    // レコードをパブリッシュする際に属性を追加できます
    record1.SetAttribute("attribute", "test attribute")
    records[0] = record1
    record2 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record2.ShardId = "1"
    record2.SetValueByName("field1", datahub.String("TEST2"))
    record2.SetValueByName("field2", datahub.Bigint(2))
    records[1] = record2
    record3 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record3.ShardId = "2"
    record3.SetValueByName("field1", datahub.String("TEST3"))
    record3.SetValueByName("field2", datahub.Bigint(3))
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        result, err := dh.PutRecords(projectName, topicName, records)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("put record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
        for _, v := range result.FailedRecords {
            fmt.Println(v)
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("put records failed ")
    }
}
// BLOB データをパブリッシュ
func putBlobData() {
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
    record1.ShardId = "0"
    records[0] = record1
    record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
    record2.ShardId = "1"
    record2.SetAttribute("attribute", "test attribute")
    records[1] = record2
    record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
    record3.ShardId = "2"
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        result, err := dh.PutRecords(projectName, blobTopicName, records)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("put record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
        for _, v := range result.FailedRecords {
            fmt.Println(v)
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("put records failed ")
    }
}
// シャードごとにデータをパブリッシュ
func putDataByShard() {
    shardId := "0"
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
    records[0] = record1
    record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
    record2.SetAttribute("attribute", "test attribute")
    records[1] = record2
    record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        if err := dh.PutRecordsByShard(projectName, blobTopicName, shardId, records); err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("put record failed")
                fmt.Println(err)
                return
            }
        }
    }
    if retryNum >= maxReTry {
        fmt.Printf("put records failed ")
    }else {
        fmt.Println("put record successful")
    }
}

データ自体に加えて、データ収集などのデータに関連する追加情報をパブリッシュ中にアタッチできます。この情報は次のように追加できます。

record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record1.SetAttribute("attribute","test attribute")
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.SetAttribute("attribute","test attribute")

データのサブスクライブ

トピック配下のデータをサブスクライブするには、対応するシャードと読み取りを開始するカーソル位置を指定します。カーソルは GetCursor API を使用して取得できます。

パラメーター

パラメーター

タイプ

説明

projectName

string

プロジェクト名。

topicName

string

トピック名。

shardId

string

シャード ID。

ctype

CursorType

カーソルを取得するために使用されるメソッド。

param

int64

カーソルタイプで必要なパラメーター。これは SEQUENCE または SYSTEM_TIME を使用するときに設定する必要があります。

説明

ctype パラメーターは、次の値をサポートしています。

  • OLDEST: カーソルは、現在利用可能な最も古い有効なレコードを指します。

  • LATEST: カーソルは、最新のレコードを指します。

  • SEQUENCE: カーソルは、指定されたシーケンス番号を持つレコードを指します。

  • SYSTEM_TIME: カーソルは、指定されたタイムスタンプの後に受信した最初のレコードを指します。

レスポンスの例

type GetCursorResult struct {
    Cursor     string `json:"Cursor"`
    RecordTime int64  `json:"RecordTime"`
    Sequence   int64  `json:"Sequence"`
}

エラーの説明

クラス名

エラーコード

説明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

リクエストされたリソースは存在しません。(注: これは、Split/Merge 操作の直後にリクエストが送信された場合に発生する可能性があります。)

SeekOutOfRangeError

SeekOutOfRange

GetCursor を呼び出すとき、指定されたシーケンスが有効範囲外 (通常は期限切れ) であるか、指定されたタイムスタンプが現在の時刻より後です。

AuthorizationFailedError

Unauthorized

権限付与署名の解析が異常です。AccessKey が正しく入力されているか確認してください。

InvalidParameterError

InvalidParameter

InvalidCursor

無効なパラメーター

ShardSealedError

-

-

DatahubClientError

-

他のすべての例外の基本クラス。

コード例

特定のシャードからデータを読み取るには、開始カーソルと読み取るレコードの最大数を指定します。カーソルとシャードの末尾の間で利用可能なレコード数が指定された制限よりも少ない場合、実際に利用可能なレコード数のみが返されます。

func cursor(dh datahub.DataHub, projectName, topicName string) {
    shardId := "0"
    gr, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
    }else{
        fmt.Println(gr)
    }
    gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.LATEST)
    fmt.Println(err)
    fmt.Println(gr)
    var seq int64 = 10
    gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, seq)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
    }else{
        fmt.Println(gr)
    }
}

Tuple トピックデータ

パラメーター

パラメーター

タイプ

説明

projectName

String

プロジェクト名。

topicName

string

トピック名。

shardId

string

シャード ID。

cursor

string

データの読み取りに使用される開始カーソル。

limit

int

読み取るレコードの最大数。

recordSchema

RecordSchema

トピックのレコードスキーマ。

レスポンスの例

type GetRecordsResult struct {
    NextCursor    string        `json:"NextCursor"`
    RecordCount   int           `json:"RecordCount"`
    StartSequence int64         `json:"StartSeq"`
    Records       []IRecord     `json:"Records"`
    RecordSchema  *RecordSchema `json:"-"`
}

エラーの説明

クラス名

エラーコード

説明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

リクエストされたリソースは存在しません。(注: これは、Split/Merge 操作の直後に別のリクエストが送信された場合に発生する可能性があります。)

AuthorizationFailedError

Unauthorized

権限付与署名の解析に失敗しました。AccessKey が正しいかどうかを確認してください。

InvalidParameterError

InvalidParameter

InvalidCursor

無効なパラメーター。

DatahubClientError

-

他のすべての例外の基本クラス。

コード例

func getTupleData() {
    shardId := "1"
    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("get topic failed")
        return
    }
    fmt.Println("get topic successful")
    cursor, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get cursor successful")
    limitNum := 100
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor.Cursor, limitNum, topic.RecordSchema)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("get record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Println("get record successful")
        for _, record := range gr.Records {
            data, ok := record.(*datahub.TupleRecord)
            if !ok {
                fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
            } else {
                fmt.Println(data.Values)
            }
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("get records failed ")
    }
}

Blob トピックデータ

パラメーター

パラメーター

タイプ

説明

projectName

String

プロジェクト名。

topicName

string

トピック名。

shardId

string

シャード ID。

cursor

string

データの読み取りに使用される開始カーソル。

limit

int

読み取るレコードの最大数。

レスポンスの例

type GetRecordsResult struct {
    NextCursor    string        `json:"NextCursor"`
    RecordCount   int           `json:"RecordCount"`
    StartSequence int64         `json:"StartSeq"`
    Records       []IRecord     `json:"Records"`
    RecordSchema  *RecordSchema `json:"-"`
}

エラーの説明

クラス名

エラーコード

説明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

リクエストされたリソースは存在しません。(注: これは、Split/Merge 操作の直後に別のリクエストが送信された場合に発生する可能性があります。)

AuthorizationFailedError

Unauthorized

権限付与署名の解析に失敗しました。AccessKey が正しいかどうかを確認してください。

InvalidParameterError

InvalidParameter

InvalidCursor

無効なパラメーター。

DatahubClientError

-

他のすべての例外の基本クラス。

コード例

func getBlobData() {
    shardId := "1"
    cursor, err := dh.GetCursor(projectName, blobTopicName, shardId, datahub.OLDEST)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get cursor successful")
    limitNum := 100
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        gr, err := dh.GetBlobRecords(projectName, blobTopicName, shardId, cursor.Cursor, limitNum)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("get record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Println("get record successful")
        for _, record := range gr.Records {
            data, ok := record.(*datahub.BlobRecord)
            if !ok {
                fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
            } else {
                fmt.Println(data.StoreData)
            }
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("get records failed ")
    }
}