このトピックでは、DataHub Go SDK を使用してパブリッシュ/サブスクライブ操作を実行する方法を説明します。
データのパブリッシュ
データレコードを topic にパブリッシュするときは、各レコードに shard ID を指定します。したがって、事前に ListShard API を呼び出して、トピック配下のシャードのリストを取得することをお勧めします。PutRecords API を使用するときは、必ず戻り値の結果を確認して、書き込みに失敗したレコードがあるかどうかを判断してください。
DataHub サーバーバージョン 2.12 以降、PutRecordsByShard API がサポートされています。それ以前のバージョンでは、PutRecords を使用してください。
パラメーター
パラメーター | タイプ | 説明 |
projectName | string | プロジェクト名。 |
topicName | string | トピック名。 |
shardId | string | シャード ID。 |
records | / | 書き込むレコードのリスト。 |
レスポンスの例
type PutRecordsResult struct {
FailedRecordCount int `json:"FailedRecordCount"`
FailedRecords []FailedRecord `json:"FailedRecords"`
}エラータイプ
クラス名 | エラーコード | 説明 |
ResourceNotFoundError |
| リクエストされたリソースは存在しません。(注: これは、Split/Merge 操作の直後に別のリクエストが送信された場合に発生する可能性があります。) |
AuthorizationFailedError |
| 権限付与署名の解析に失敗しました。AccessKey が正しいかどうかを確認してください。 |
InvalidParameterError |
| 無効なパラメーター。 |
DatahubClientError | - | 他のすべての例外の基本クラス。 |
コード例
// タプルデータをパブリッシュ
func putTupleData() {
topic, err := dh.GetTopic(projectName, topicName)
if err != nil {
fmt.Println("get topic failed")
fmt.Println(err)
return
}
fmt.Println("get topic successful")
records := make([]datahub.IRecord, 3)
record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record1.ShardId = "0"
record1.SetValueByName("field1", "TEST1")
record1.SetValueByName("field2", 1)
// レコードをパブリッシュする際に属性を追加できます
record1.SetAttribute("attribute", "test attribute")
records[0] = record1
record2 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record2.ShardId = "1"
record2.SetValueByName("field1", datahub.String("TEST2"))
record2.SetValueByName("field2", datahub.Bigint(2))
records[1] = record2
record3 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record3.ShardId = "2"
record3.SetValueByName("field1", datahub.String("TEST3"))
record3.SetValueByName("field2", datahub.Bigint(3))
records[2] = record3
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
result, err := dh.PutRecords(projectName, topicName, records)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("put record failed")
fmt.Println(err)
return
}
}
fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
for _, v := range result.FailedRecords {
fmt.Println(v)
}
break
}
if retryNum >= maxReTry {
fmt.Printf("put records failed ")
}
}
// BLOB データをパブリッシュ
func putBlobData() {
records := make([]datahub.IRecord, 3)
record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
record1.ShardId = "0"
records[0] = record1
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.ShardId = "1"
record2.SetAttribute("attribute", "test attribute")
records[1] = record2
record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
record3.ShardId = "2"
records[2] = record3
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
result, err := dh.PutRecords(projectName, blobTopicName, records)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("put record failed")
fmt.Println(err)
return
}
}
fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
for _, v := range result.FailedRecords {
fmt.Println(v)
}
break
}
if retryNum >= maxReTry {
fmt.Printf("put records failed ")
}
}
// シャードごとにデータをパブリッシュ
func putDataByShard() {
shardId := "0"
records := make([]datahub.IRecord, 3)
record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
records[0] = record1
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.SetAttribute("attribute", "test attribute")
records[1] = record2
record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
records[2] = record3
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
if err := dh.PutRecordsByShard(projectName, blobTopicName, shardId, records); err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("put record failed")
fmt.Println(err)
return
}
}
}
if retryNum >= maxReTry {
fmt.Printf("put records failed ")
}else {
fmt.Println("put record successful")
}
}データ自体に加えて、データ収集などのデータに関連する追加情報をパブリッシュ中にアタッチできます。この情報は次のように追加できます。
record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record1.SetAttribute("attribute","test attribute")
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.SetAttribute("attribute","test attribute")データのサブスクライブ
トピック配下のデータをサブスクライブするには、対応するシャードと読み取りを開始するカーソル位置を指定します。カーソルは GetCursor API を使用して取得できます。
パラメーター
パラメーター | タイプ | 説明 |
projectName | string | プロジェクト名。 |
topicName | string | トピック名。 |
shardId | string | シャード ID。 |
ctype | CursorType | カーソルを取得するために使用されるメソッド。 |
param | int64 | カーソルタイプで必要なパラメーター。これは |
ctype パラメーターは、次の値をサポートしています。
OLDEST: カーソルは、現在利用可能な最も古い有効なレコードを指します。
LATEST: カーソルは、最新のレコードを指します。
SEQUENCE: カーソルは、指定されたシーケンス番号を持つレコードを指します。
SYSTEM_TIME: カーソルは、指定されたタイムスタンプの後に受信した最初のレコードを指します。
レスポンスの例
type GetCursorResult struct {
Cursor string `json:"Cursor"`
RecordTime int64 `json:"RecordTime"`
Sequence int64 `json:"Sequence"`
}エラーの説明
クラス名 | エラーコード | 説明 |
ResourceNotFoundError |
| リクエストされたリソースは存在しません。(注: これは、Split/Merge 操作の直後にリクエストが送信された場合に発生する可能性があります。) |
SeekOutOfRangeError |
|
|
AuthorizationFailedError |
| 権限付与署名の解析が異常です。AccessKey が正しく入力されているか確認してください。 |
InvalidParameterError |
| 無効なパラメーター |
ShardSealedError | - | - |
DatahubClientError | - | 他のすべての例外の基本クラス。 |
コード例
特定のシャードからデータを読み取るには、開始カーソルと読み取るレコードの最大数を指定します。カーソルとシャードの末尾の間で利用可能なレコード数が指定された制限よりも少ない場合、実際に利用可能なレコード数のみが返されます。
func cursor(dh datahub.DataHub, projectName, topicName string) {
shardId := "0"
gr, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
}else{
fmt.Println(gr)
}
gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.LATEST)
fmt.Println(err)
fmt.Println(gr)
var seq int64 = 10
gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, seq)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
}else{
fmt.Println(gr)
}
}Tuple トピックデータ
パラメーター
パラメーター | タイプ | 説明 |
projectName | String | プロジェクト名。 |
topicName | string | トピック名。 |
shardId | string | シャード ID。 |
cursor | string | データの読み取りに使用される開始カーソル。 |
limit | int | 読み取るレコードの最大数。 |
recordSchema | RecordSchema | トピックのレコードスキーマ。 |
レスポンスの例
type GetRecordsResult struct {
NextCursor string `json:"NextCursor"`
RecordCount int `json:"RecordCount"`
StartSequence int64 `json:"StartSeq"`
Records []IRecord `json:"Records"`
RecordSchema *RecordSchema `json:"-"`
}エラーの説明
クラス名 | エラーコード | 説明 |
ResourceNotFoundError |
| リクエストされたリソースは存在しません。(注: これは、Split/Merge 操作の直後に別のリクエストが送信された場合に発生する可能性があります。) |
AuthorizationFailedError |
| 権限付与署名の解析に失敗しました。AccessKey が正しいかどうかを確認してください。 |
InvalidParameterError |
| 無効なパラメーター。 |
DatahubClientError | - | 他のすべての例外の基本クラス。 |
コード例
func getTupleData() {
shardId := "1"
topic, err := dh.GetTopic(projectName, topicName)
if err != nil {
fmt.Println("get topic failed")
return
}
fmt.Println("get topic successful")
cursor, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
return
}
fmt.Println("get cursor successful")
limitNum := 100
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor.Cursor, limitNum, topic.RecordSchema)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("get record failed")
fmt.Println(err)
return
}
}
fmt.Println("get record successful")
for _, record := range gr.Records {
data, ok := record.(*datahub.TupleRecord)
if !ok {
fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
} else {
fmt.Println(data.Values)
}
}
break
}
if retryNum >= maxReTry {
fmt.Printf("get records failed ")
}
}Blob トピックデータ
パラメーター
パラメーター | タイプ | 説明 |
projectName | String | プロジェクト名。 |
topicName | string | トピック名。 |
shardId | string | シャード ID。 |
cursor | string | データの読み取りに使用される開始カーソル。 |
limit | int | 読み取るレコードの最大数。 |
レスポンスの例
type GetRecordsResult struct {
NextCursor string `json:"NextCursor"`
RecordCount int `json:"RecordCount"`
StartSequence int64 `json:"StartSeq"`
Records []IRecord `json:"Records"`
RecordSchema *RecordSchema `json:"-"`
}エラーの説明
クラス名 | エラーコード | 説明 |
ResourceNotFoundError |
| リクエストされたリソースは存在しません。(注: これは、Split/Merge 操作の直後に別のリクエストが送信された場合に発生する可能性があります。) |
AuthorizationFailedError |
| 権限付与署名の解析に失敗しました。AccessKey が正しいかどうかを確認してください。 |
InvalidParameterError |
| 無効なパラメーター。 |
DatahubClientError | - | 他のすべての例外の基本クラス。 |
コード例
func getBlobData() {
shardId := "1"
cursor, err := dh.GetCursor(projectName, blobTopicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
return
}
fmt.Println("get cursor successful")
limitNum := 100
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
gr, err := dh.GetBlobRecords(projectName, blobTopicName, shardId, cursor.Cursor, limitNum)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("get record failed")
fmt.Println(err)
return
}
}
fmt.Println("get record successful")
for _, record := range gr.Records {
data, ok := record.(*datahub.BlobRecord)
if !ok {
fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
} else {
fmt.Println(data.StoreData)
}
}
break
}
if retryNum >= maxReTry {
fmt.Printf("get records failed ")
}
}