全部產品
Search
文件中心

DataHub:發布/訂閱操作

更新時間:Jul 15, 2025

本文為您展示DataHub的 GO SDK的發布/訂閱操作。

發布資料

向某個topic下發布資料記錄時,每條資料記錄需要指定該topic下的一個shard, 因此一般需要通過 listShard 介面查看下當前topic下的shard列表。使用PutRecords介面時注意檢查返回結果是否資料發布失敗的情況。

說明

伺服器2.12版本及之後版本開始支援PutRecordsByShard介面,低版本請使用PutRecords介面。

參數說明

參數名

參數類型

參數說明

projectName

String

專案名稱。

topicName

string

Topic名稱

shardId

string

id of shard

records

Records list to written.

返回樣本

type PutRecordsResult struct {
    FailedRecordCount int            `json:"FailedRecordCount"`
    FailedRecords     []FailedRecord `json:"FailedRecords"`
}

錯誤說明

錯誤類名

錯誤碼

錯誤說明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

訪問的資源不存在(註:進行Split/Merge操作後,立即發送其他請求,有可能會拋出該異常 )。

AuthorizationFailedError

Unauthorized

Authorization 簽名解析異常,檢查AK是否填寫正確。

DatahubClientError

-

其他所有,並且是所有異常的基類。

InvalidParameterError

InvalidParameter

InvalidCursor

非法參數。

程式碼範例

// put tuple data
func putTupleData() {
    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("get topic failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get topic successful")
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record1.ShardId = "0"
    record1.SetValueByName("field1", "TEST1")
    record1.SetValueByName("field2", 1)
    //you can add some attributes when put record
    record1.SetAttribute("attribute", "test attribute")
    records[0] = record1
    record2 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record2.ShardId = "1"
    record2.SetValueByName("field1", datahub.String("TEST2"))
    record2.SetValueByName("field2", datahub.Bigint(2))
    records[1] = record2
    record3 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record3.ShardId = "2"
    record3.SetValueByName("field1", datahub.String("TEST3"))
    record3.SetValueByName("field2", datahub.Bigint(3))
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        result, err := dh.PutRecords(projectName, topicName, records)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("put record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
        for _, v := range result.FailedRecords {
            fmt.Println(v)
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("put records failed ")
    }
}
// put blob data
func putBlobData() {
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
    record1.ShardId = "0"
    records[0] = record1
    record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
    record2.ShardId = "1"
    record2.SetAttribute("attribute", "test attribute")
    records[1] = record2
    record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
    record3.ShardId = "2"
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        result, err := dh.PutRecords(projectName, blobTopicName, records)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("put record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
        for _, v := range result.FailedRecords {
            fmt.Println(v)
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("put records failed ")
    }
}
// put data by shard
func putDataByShard() {
    shardId := "0"
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
    records[0] = record1
    record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
    record2.SetAttribute("attribute", "test attribute")
    records[1] = record2
    record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        if err := dh.PutRecordsByShard(projectName, blobTopicName, shardId, records); err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("put record failed")
                fmt.Println(err)
                return
            }
        }
    }
    if retryNum >= maxReTry {
        fmt.Printf("put records failed ")
    }else {
        fmt.Println("put record successful")
    }
}

除了資料本身以外,在進行資料發布時,還可以添加和資料相關的額外資訊,例如資料擷取情境等。添加方式為下:

record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record1.SetAttribute("attribute","test attribute")
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.SetAttribute("attribute","test attribute")

訂閱資料

訂閱一個topic下的資料,同樣需要指定對應的shard,同時需要指定讀取遊標位置,通過 getCursor 介面擷取。

參數說明

參數名

參數類型

參數說明

projectName

String

專案名稱。

topicName

string

Topic名稱。

shardId

string

id of shard

ctype

CursorType

Which type used to get cursor

param

int64

Parameter used to get cursor.when use SEQUENCE and SYSTEM_TIME need to be set.

說明

ctype可通過以下四種方式獲得:

  • OLDEST:表示擷取的cursor指向當前有效資料中時間最久遠的record

  • LATEST:表示擷取的cursor指向當前最新的record

  • SEQUENCE: 表示擷取的cursor指向該序列的record

  • SYSTEM_TIME: 表示擷取的cursor指向該時間之後接收到的第一條record

返回樣本

type GetCursorResult struct {
    Cursor     string `json:"Cursor"`
    RecordTime int64  `json:"RecordTime"`
    Sequence   int64  `json:"Sequence"`
}

錯誤說明

錯誤類名

錯誤碼

錯誤說明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

訪問的資源不存在(註:進行Split/Merge操作後,立即發送其他請求,有可能會拋出該異常 )。

SeekOutOfRangeError

SeekOutOfRange

getCursor時,給定的sequence不在有效範圍內(通常資料已到期),或給定的timestamp大於目前時間。

AuthorizationFailedError

Unauthorized

Authorization 簽名解析異常,檢查AK是否填寫正確。

DatahubClientError

-

其他所有,並且是所有異常的基類

InvalidParameterError

InvalidParameter

InvalidCursor

非法參數

ShardSealedError

程式碼範例

從指定shard讀取資料,需要指定從哪個cursor開始讀,並指定讀取的上限資料條數,如果從cursor到shard結尾的資料條數少於Limit,則返回實際的資料條數的有資料。

func cursor(dh datahub.DataHub, projectName, topicName string) {
    shardId := "0"
    gr, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
    }else{
        fmt.Println(gr)
    }
    gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.LATEST)
    fmt.Println(err)
    fmt.Println(gr)
    var seq int64 = 10
    gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, seq)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
    }else{
        fmt.Println(gr)
    }
}

Tuple topic data

參數說明

參數名

參數類型

參數說明

projectName

String

專案名稱RecordSchema

topicName

string

Topic名稱。

shardId

string

id of shard

cursor

string

The start cursor used to read data.

limit

int

Max record size to read.

recordSchema

RecordSchema

RecordSchema for the topic.

返回樣本

type GetRecordsResult struct {
    NextCursor    string        `json:"NextCursor"`
    RecordCount   int           `json:"RecordCount"`
    StartSequence int64         `json:"StartSeq"`
    Records       []IRecord     `json:"Records"`
    RecordSchema  *RecordSchema `json:"-"`
}

錯誤說明

錯誤類名

錯誤碼

錯誤說明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

訪問的資源不存在(註:進行Split/Merge操作後,立即發送其他請求,有可能會拋出該異常 )。

AuthorizationFailedError

Unauthorized

Authorization 簽名解析異常,檢查AK是否填寫正確。

DatahubClientError

-

其他所有,並且是所有異常的基類。

InvalidParameterError

InvalidParameter

InvalidCursor

非法參數。

程式碼範例

func getTupleData() {
    shardId := "1"
    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("get topic failed")
        return
    }
    fmt.Println("get topic successful")
    cursor, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get cursor successful")
    limitNum := 100
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor.Cursor, limitNum, topic.RecordSchema)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("get record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Println("get record successful")
        for _, record := range gr.Records {
            data, ok := record.(*datahub.TupleRecord)
            if !ok {
                fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
            } else {
                fmt.Println(data.Values)
            }
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("get records failed ")
    }
}

Blob topic data

參數說明

參數名

參數類型

參數說明

projectName

String

專案名稱。

topicName

string

Topic名稱。

shardId

string

id of shard

cursor

string

The start cursor used to read data.

limit

int

Max record size to read.

返回樣本

type GetRecordsResult struct {
    NextCursor    string        `json:"NextCursor"`
    RecordCount   int           `json:"RecordCount"`
    StartSequence int64         `json:"StartSeq"`
    Records       []IRecord     `json:"Records"`
    RecordSchema  *RecordSchema `json:"-"`
}

錯誤說明

錯誤類名

錯誤碼

錯誤說明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

訪問的資源不存在(註:進行Split/Merge操作後,立即發送其他請求,有可能會拋出該異常 )。

AuthorizationFailedError

Unauthorized

Authorization 簽名解析異常,檢查AK是否填寫正確。

DatahubClientError

-

其他所有,並且是所有異常的基類。

InvalidParameterError

InvalidParameter

InvalidCursor

非法參數。

程式碼範例

func getBlobData() {
    shardId := "1"
    cursor, err := dh.GetCursor(projectName, blobTopicName, shardId, datahub.OLDEST)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get cursor successful")
    limitNum := 100
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        gr, err := dh.GetBlobRecords(projectName, blobTopicName, shardId, cursor.Cursor, limitNum)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("get record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Println("get record successful")
        for _, record := range gr.Records {
            data, ok := record.(*datahub.BlobRecord)
            if !ok {
                fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
            } else {
                fmt.Println(data.StoreData)
            }
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("get records failed ")
    }
}