本文為您展示DataHub的 GO SDK的發布/訂閱操作。
發布資料
向某個topic下發布資料記錄時,每條資料記錄需要指定該topic下的一個shard, 因此一般需要通過 listShard 介面查看下當前topic下的shard列表。使用PutRecords介面時注意檢查返回結果是否資料發布失敗的情況。
伺服器2.12版本及之後版本開始支援PutRecordsByShard介面,低版本請使用PutRecords介面。
參數說明
參數名 | 參數類型 | 參數說明 |
projectName | String | 專案名稱。 |
topicName | string | Topic名稱 |
shardId | string | id of shard |
records | Records list to written. |
返回樣本
type PutRecordsResult struct {
FailedRecordCount int `json:"FailedRecordCount"`
FailedRecords []FailedRecord `json:"FailedRecords"`
}錯誤說明
錯誤類名 | 錯誤碼 | 錯誤說明 |
ResourceNotFoundError |
| 訪問的資源不存在(註:進行Split/Merge操作後,立即發送其他請求,有可能會拋出該異常 )。 |
AuthorizationFailedError |
| Authorization 簽名解析異常,檢查AK是否填寫正確。 |
DatahubClientError | - | 其他所有,並且是所有異常的基類。 |
InvalidParameterError |
| 非法參數。 |
程式碼範例
// put tuple data
func putTupleData() {
topic, err := dh.GetTopic(projectName, topicName)
if err != nil {
fmt.Println("get topic failed")
fmt.Println(err)
return
}
fmt.Println("get topic successful")
records := make([]datahub.IRecord, 3)
record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record1.ShardId = "0"
record1.SetValueByName("field1", "TEST1")
record1.SetValueByName("field2", 1)
//you can add some attributes when put record
record1.SetAttribute("attribute", "test attribute")
records[0] = record1
record2 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record2.ShardId = "1"
record2.SetValueByName("field1", datahub.String("TEST2"))
record2.SetValueByName("field2", datahub.Bigint(2))
records[1] = record2
record3 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record3.ShardId = "2"
record3.SetValueByName("field1", datahub.String("TEST3"))
record3.SetValueByName("field2", datahub.Bigint(3))
records[2] = record3
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
result, err := dh.PutRecords(projectName, topicName, records)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("put record failed")
fmt.Println(err)
return
}
}
fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
for _, v := range result.FailedRecords {
fmt.Println(v)
}
break
}
if retryNum >= maxReTry {
fmt.Printf("put records failed ")
}
}
// put blob data
func putBlobData() {
records := make([]datahub.IRecord, 3)
record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
record1.ShardId = "0"
records[0] = record1
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.ShardId = "1"
record2.SetAttribute("attribute", "test attribute")
records[1] = record2
record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
record3.ShardId = "2"
records[2] = record3
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
result, err := dh.PutRecords(projectName, blobTopicName, records)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("put record failed")
fmt.Println(err)
return
}
}
fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
for _, v := range result.FailedRecords {
fmt.Println(v)
}
break
}
if retryNum >= maxReTry {
fmt.Printf("put records failed ")
}
}
// put data by shard
func putDataByShard() {
shardId := "0"
records := make([]datahub.IRecord, 3)
record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
records[0] = record1
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.SetAttribute("attribute", "test attribute")
records[1] = record2
record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
records[2] = record3
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
if err := dh.PutRecordsByShard(projectName, blobTopicName, shardId, records); err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("put record failed")
fmt.Println(err)
return
}
}
}
if retryNum >= maxReTry {
fmt.Printf("put records failed ")
}else {
fmt.Println("put record successful")
}
}除了資料本身以外,在進行資料發布時,還可以添加和資料相關的額外資訊,例如資料擷取情境等。添加方式為下:
record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record1.SetAttribute("attribute","test attribute")
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.SetAttribute("attribute","test attribute")訂閱資料
訂閱一個topic下的資料,同樣需要指定對應的shard,同時需要指定讀取遊標位置,通過 getCursor 介面擷取。
參數說明
參數名 | 參數類型 | 參數說明 |
projectName | String | 專案名稱。 |
topicName | string | Topic名稱。 |
shardId | string | id of shard |
ctype | CursorType | Which type used to get cursor |
param | int64 | Parameter used to get cursor.when use SEQUENCE and SYSTEM_TIME need to be set. |
ctype可通過以下四種方式獲得:
OLDEST:表示擷取的cursor指向當前有效資料中時間最久遠的record
LATEST:表示擷取的cursor指向當前最新的record
SEQUENCE: 表示擷取的cursor指向該序列的record
SYSTEM_TIME: 表示擷取的cursor指向該時間之後接收到的第一條record
返回樣本
type GetCursorResult struct {
Cursor string `json:"Cursor"`
RecordTime int64 `json:"RecordTime"`
Sequence int64 `json:"Sequence"`
}錯誤說明
錯誤類名 | 錯誤碼 | 錯誤說明 |
ResourceNotFoundError |
| 訪問的資源不存在(註:進行Split/Merge操作後,立即發送其他請求,有可能會拋出該異常 )。 |
SeekOutOfRangeError |
| getCursor時,給定的sequence不在有效範圍內(通常資料已到期),或給定的timestamp大於目前時間。 |
AuthorizationFailedError |
| Authorization 簽名解析異常,檢查AK是否填寫正確。 |
DatahubClientError | - | 其他所有,並且是所有異常的基類 |
InvalidParameterError |
| 非法參數 |
ShardSealedError |
程式碼範例
從指定shard讀取資料,需要指定從哪個cursor開始讀,並指定讀取的上限資料條數,如果從cursor到shard結尾的資料條數少於Limit,則返回實際的資料條數的有資料。
func cursor(dh datahub.DataHub, projectName, topicName string) {
shardId := "0"
gr, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
}else{
fmt.Println(gr)
}
gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.LATEST)
fmt.Println(err)
fmt.Println(gr)
var seq int64 = 10
gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, seq)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
}else{
fmt.Println(gr)
}
}Tuple topic data
參數說明
參數名 | 參數類型 | 參數說明 |
projectName | String | 專案名稱RecordSchema |
topicName | string | Topic名稱。 |
shardId | string | id of shard |
cursor | string | The start cursor used to read data. |
limit | int | Max record size to read. |
recordSchema | RecordSchema | RecordSchema for the topic. |
返回樣本
type GetRecordsResult struct {
NextCursor string `json:"NextCursor"`
RecordCount int `json:"RecordCount"`
StartSequence int64 `json:"StartSeq"`
Records []IRecord `json:"Records"`
RecordSchema *RecordSchema `json:"-"`
}錯誤說明
錯誤類名 | 錯誤碼 | 錯誤說明 |
ResourceNotFoundError |
| 訪問的資源不存在(註:進行Split/Merge操作後,立即發送其他請求,有可能會拋出該異常 )。 |
AuthorizationFailedError |
| Authorization 簽名解析異常,檢查AK是否填寫正確。 |
DatahubClientError | - | 其他所有,並且是所有異常的基類。 |
InvalidParameterError |
| 非法參數。 |
程式碼範例
func getTupleData() {
shardId := "1"
topic, err := dh.GetTopic(projectName, topicName)
if err != nil {
fmt.Println("get topic failed")
return
}
fmt.Println("get topic successful")
cursor, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
return
}
fmt.Println("get cursor successful")
limitNum := 100
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor.Cursor, limitNum, topic.RecordSchema)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("get record failed")
fmt.Println(err)
return
}
}
fmt.Println("get record successful")
for _, record := range gr.Records {
data, ok := record.(*datahub.TupleRecord)
if !ok {
fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
} else {
fmt.Println(data.Values)
}
}
break
}
if retryNum >= maxReTry {
fmt.Printf("get records failed ")
}
}Blob topic data
參數說明
參數名 | 參數類型 | 參數說明 |
projectName | String | 專案名稱。 |
topicName | string | Topic名稱。 |
shardId | string | id of shard |
cursor | string | The start cursor used to read data. |
limit | int | Max record size to read. |
返回樣本
type GetRecordsResult struct {
NextCursor string `json:"NextCursor"`
RecordCount int `json:"RecordCount"`
StartSequence int64 `json:"StartSeq"`
Records []IRecord `json:"Records"`
RecordSchema *RecordSchema `json:"-"`
}錯誤說明
錯誤類名 | 錯誤碼 | 錯誤說明 |
ResourceNotFoundError |
| 訪問的資源不存在(註:進行Split/Merge操作後,立即發送其他請求,有可能會拋出該異常 )。 |
AuthorizationFailedError |
| Authorization 簽名解析異常,檢查AK是否填寫正確。 |
DatahubClientError | - | 其他所有,並且是所有異常的基類。 |
InvalidParameterError |
| 非法參數。 |
程式碼範例
func getBlobData() {
shardId := "1"
cursor, err := dh.GetCursor(projectName, blobTopicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
return
}
fmt.Println("get cursor successful")
limitNum := 100
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
gr, err := dh.GetBlobRecords(projectName, blobTopicName, shardId, cursor.Cursor, limitNum)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("get record failed")
fmt.Println(err)
return
}
}
fmt.Println("get record successful")
for _, record := range gr.Records {
data, ok := record.(*datahub.BlobRecord)
if !ok {
fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
} else {
fmt.Println(data.StoreData)
}
}
break
}
if retryNum >= maxReTry {
fmt.Printf("get records failed ")
}
}