This topic describes the environment setup required to use the DataHub Go SDK and the error types you may encounter.
Environment preparation
Prerequisites
You have an Alibaba Cloud account with an AccessKey ID and AccessKey Secret. For details, see Create an AccessKey pair.
You have obtained the DataHub service endpoint.
You have installed the DataHub Go SDK using the following command:
go get -u -insecure github.com/aliyun/aliyun-datahub-sdk-go/datahub
Initialize the DataHub client
All API operations in the DataHub Go SDK are implemented through the datahub.DataHub interface. The first step is to create a DataHub client instance.
Create a DataHub object with default parameters
Code example:
import "github.com/aliyun/aliyun-datahub-sdk-go/datahub"
accessId := "your-accesskey-id"
accessKey := "your-accesskey-secret"
endpoint := "your-DataHub-service-endpoint"
dh := datahub.New(accessId, accessKey, endpoint)Create a DataHub object with custom configurations
Parameters:
Parameter | Parameter type | Default value | Valid values | Description |
UserAgent | string | - | - | User agent string. |
CompressorType | CompressorType | NOCOMPRESS | NOCOMPRESS (no compression), LZ4, DEFLATE, ZLIB | Compression format for data transmission. |
EnableBinary | bool | true | true/false | The Protobuf protocol is mainly used when putting or getting records. If the DataHub version does not support Protobuf, set |
HttpClient | *http.Client | datahub.DefaultHttpClient() | - | For more information, see net/http. |
Code example:
endpoint := "your-DataHub-service-endpoint"
accessId := "your-accesskey-id"
accessKey := "your-accesskey-secret"
token := ""
account := datahub.NewAliyunAccount(accessId, accessKey)
// Temporary AK authentication
// account := datahub.NewStsCredential(accessId, accessKey, token)
config := datahub.NewDefaultConfig()
config.CompressorType = datahub.DEFLATE
config.EnableBinary = true;
config.HttpClient = datahub.DefaultHttpClient()
dh := datahub.NewClientWithConfig(endpoint, config, account)More operations
The DataHub Go SDK supports package management using Go modules.
require (
github.com/aliyun/aliyun-datahub-sdk-go/datahub v0.1.4
)Offset consumption code example
func OffsetConsume() {
accessId := ""
accessKey := ""
endpoint := "https://dh-cn-hangzhou.aliyuncs.com"
dh := datahub.New(accessId, accessKey, endpoint)
projectName := ""
topicName := ""
subId := ""
shardId := "0"
shardIds := []string{"0"}
session, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
if err != nil {
fmt.Println("Open subscription session failed", err)
return
}
offset := session.Offsets[shardId]
var gc *datahub.GetCursorResult = nil
//sequence < 0 indicates no data has been consumed
if offset.Sequence < 0 {
// Get the cursor of the first record within the lifecycle
gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("Get oldest cursor failed", err)
return
}
} else {
// Get the cursor of the next record
nextSequence := offset.Sequence + 1
gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, nextSequence)
if err != nil {
//Getting cursor by SEQUENCE may report SeekOutOfRange error, indicating that the data of the current cursor has expired
if _, ok := err.(*datahub.SeekOutOfRangeError); ok {
fmt.Println("Get cursor by sequence success for SeekOutOfRangeError, will retry...")
gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("Get oldest cursor failed", err)
return
}
}
}
}
topic, err := dh.GetTopic(projectName, topicName)
if err != nil {
fmt.Println("Get topic failed", err)
return
}
// Read and save offsets. We take reading Tuple data as an example, and save the offset every 1000 records
recordCount := 0
limitNum := 100
cursor := gc.Cursor
for true {
gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor, limitNum, topic.RecordSchema)
if err != nil {
fmt.Println("Get records failed", err)
break
}
if gr.RecordCount == 0 {
fmt.Println("No data, sleep 5 seconds...")
time.Sleep(time.Second * 5)
continue
}
for _, record := range gr.Records {
// Process the data (print only in this example)
data, _ := record.(*datahub.TupleRecord)
fmt.Println(data.Values)
recordCount += 1
// Commit offset information every 1000 records
if recordCount%1000 == 0 {
fmt.Println("Commit offset", record.GetSequence())
offset.Sequence = record.GetSequence()
offset.Timestamp = record.GetSystemTime()
offsetMap := map[string]datahub.SubscriptionOffset{shardId: offset}
err := dh.CommitSubscriptionOffset(projectName, topicName, subId, offsetMap)
if err != nil {
if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
fmt.Println("Subscription reset, will reopen...")
// Offset has been reset. Need to reopen session.
session, err = dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
if err != nil {
fmt.Println("Reopen subscription session failed", err)
break
}
offset = session.Offsets[shardId]
} else if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
fmt.Println("Subscription used by other one")
break
} else {
fmt.Println("Commit offset failed", err)
break
}
}
recordCount = 0
}
}
cursor = gr.NextCursor
}
}Error types
The DataHub Go SDK categorizes various error types. You can use type assertions to determine the specific error type and handle it accordingly. Among the defined error types, the following are retryable:
DatahubClientErrorLimitExceededErrorServiceTemporaryUnavailableError
All other errors are non-retryable. Note that DatahubClientError may include some retryable cases such as server busy or server unavailable. It is recommended to implement retry logic in your code for retryable errors, but be sure to strictly limit the number of retries.
Class name | Error code | Description |
InvalidParameterError |
| Invalid parameters. |
ResourceNotFoundError |
| The resource does not exist. This may occur if a request is sent right after a Split/Merge operation. |
ResourceExistError |
| The resource already exists. Thrown when trying to create a resource that already exists. |
SeekOutOfRangeError |
| When calling |
AuthorizationFailedError |
| Authorization signature parsing failed. Check your AccessKey ID and AccessKey Secret. |
NoPermissionError |
| No permission. Typically caused by incorrect RAM configuration or insufficient RAM user authorization. |
NewShardSealedError |
| The shard is in the CLOSED state, which means it is readable but not writable. If you attempt to write data to a CLOSED shard or continue reading after the last record, this exception will be thrown. |
LimitExceededError |
| API usage exceeded the allowed limits. Refer to Limits. |
SubscriptionOfflineError |
| The subscription is offline and unavailable. |
SubscriptionSessionInvalidError |
| Subscription session exception. Subscriptions establish sessions to commit offsets. If the session is used by another client, this error may occur. |
SubscriptionOffsetResetError |
| The subscription offset has been reset. |
MalformedRecordError |
| Invalid record format. Possible causes include schema mismatch, non-UTF-8 characters, or using protobuf when the server does not support it. |
ServiceTemporaryUnavailableError | - | Temporary service unavailability, typically caused by network issues such as connection dropped. Retry is usually effective. |
DatahubClientError | All other | Base class for all other errors. Retry may succeed if other conditions are ruled out. |
DatahubClientError
DatahubClientError is the base error type for all errors in DataHub. Any error not explicitly defined in the table above will be of this type. It may include retryable scenarios such as server busy or server unavailable.
You can implement custom retry logic in your code to handle these cases, but again, make sure to limit retry attempts to avoid infinite loops or resource exhaustion.
type DatahubClientError struct {
StatusCode int `json:"StatusCode"` // Http status code
RequestId string `json:"RequestId"` // Request-id to trace the request
Code string `json:"ErrorCode"` // Datahub error code
Message string `json:"ErrorMessage"` // Error msg of the error code
}Error example
func example_error() {
accessId := ""
accessKey := ""
endpoint := ""
projectName := "datahub_go_test"
maxRetry := 3
dh := datahub.New(accessId, accessKey, endpoint)
if err := dh.CreateProject(projectName, "project comment"); err != nil {
if _, ok := err.(*datahub.InvalidParameterError); ok {
fmt.Println("invalid parameter,please check your input parameter")
} else if _, ok := err.(*datahub.ResourceExistError); ok {
fmt.Println("project already exists")
} else if _, ok := err.(*datahub.AuthorizationFailedError); ok {
fmt.Println("accessId or accessKey err,please check your accessId and accessKey")
} else if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("limit exceed, so retry")
for i := 0; i < maxRetry; i++ {
// wait 5 seconds
time.Sleep(5 * time.Second)
if err := dh.CreateProject(projectName, "project comment"); err != nil {
fmt.Println("create project failed")
fmt.Println(err)
} else {
fmt.Println("create project successful")
break
}
}
} else {
fmt.Println("unknown error")
fmt.Println(err)
}
} else {
fmt.Println("create project successful")
}
}