All Products
Search
Document Center

DataHub:Go SDK usage guide

Last Updated:Oct 21, 2025

This topic describes the environment setup required to use the DataHub Go SDK and the error types you may encounter.

Environment preparation

Prerequisites

  • You have an Alibaba Cloud account with an AccessKey ID and AccessKey Secret. For details, see Create an AccessKey pair.

  • You have obtained the DataHub service endpoint.

  • You have installed the DataHub Go SDK using the following command:

    go get -u -insecure github.com/aliyun/aliyun-datahub-sdk-go/datahub

Initialize the DataHub client

All API operations in the DataHub Go SDK are implemented through the datahub.DataHub interface. The first step is to create a DataHub client instance.

Create a DataHub object with default parameters

Code example:

import "github.com/aliyun/aliyun-datahub-sdk-go/datahub"
accessId := "your-accesskey-id"
accessKey := "your-accesskey-secret"
endpoint := "your-DataHub-service-endpoint"
dh := datahub.New(accessId, accessKey, endpoint)

Create a DataHub object with custom configurations

Parameters:

Parameter

Parameter type

Default value

Valid values

Description

UserAgent

string

-

-

User agent string.

CompressorType

CompressorType

NOCOMPRESS

NOCOMPRESS (no compression), LZ4, DEFLATE, ZLIB

Compression format for data transmission.

EnableBinary

bool

true

true/false

The Protobuf protocol is mainly used when putting or getting records. If the DataHub version does not support Protobuf, set enable_pb to false.

HttpClient

*http.Client

datahub.DefaultHttpClient()

-

For more information, see net/http.

Code example:

endpoint := "your-DataHub-service-endpoint"
accessId := "your-accesskey-id"
accessKey := "your-accesskey-secret"
token := ""
account := datahub.NewAliyunAccount(accessId, accessKey)
// Temporary AK authentication
// account := datahub.NewStsCredential(accessId, accessKey, token)
config := datahub.NewDefaultConfig()
config.CompressorType = datahub.DEFLATE
config.EnableBinary = true;
config.HttpClient = datahub.DefaultHttpClient()
dh := datahub.NewClientWithConfig(endpoint, config, account)

More operations

The DataHub Go SDK supports package management using Go modules.

require (
    github.com/aliyun/aliyun-datahub-sdk-go/datahub v0.1.4
)

Offset consumption code example

func OffsetConsume() {
    accessId := ""
    accessKey := ""
    endpoint := "https://dh-cn-hangzhou.aliyuncs.com"
    dh := datahub.New(accessId, accessKey, endpoint)

    projectName := ""
    topicName := ""
    subId := ""
    shardId := "0"
    shardIds := []string{"0"}

    session, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
    if err != nil {
        fmt.Println("Open subscription session failed", err)
        return
    }

    offset := session.Offsets[shardId]
    var gc *datahub.GetCursorResult = nil

    //sequence < 0 indicates no data has been consumed
    if offset.Sequence < 0 {
        // Get the cursor of the first record within the lifecycle
        gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
        if err != nil {
            fmt.Println("Get oldest cursor failed", err)
            return
        }
    } else {
        // Get the cursor of the next record
        nextSequence := offset.Sequence + 1
        gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, nextSequence)

        if err != nil {
            //Getting cursor by SEQUENCE may report SeekOutOfRange error, indicating that the data of the current cursor has expired
            if _, ok := err.(*datahub.SeekOutOfRangeError); ok {
                fmt.Println("Get cursor by sequence success for SeekOutOfRangeError, will retry...")
                gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
                if err != nil {
                    fmt.Println("Get oldest cursor failed", err)
                    return
                }
            }
        }
    }

    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("Get topic failed", err)
        return
    }

    // Read and save offsets. We take reading Tuple data as an example, and save the offset every 1000 records
    recordCount := 0
    limitNum := 100
    cursor := gc.Cursor
    for true {
        gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor, limitNum, topic.RecordSchema)

        if err != nil {
            fmt.Println("Get records failed", err)
            break
        }
        if gr.RecordCount == 0 {
            fmt.Println("No data, sleep 5 seconds...")
            time.Sleep(time.Second * 5)
            continue
        }

        for _, record := range gr.Records {
            // Process the data (print only in this example)
            data, _ := record.(*datahub.TupleRecord)
            fmt.Println(data.Values)

            recordCount += 1
            // Commit offset information every 1000 records
            if recordCount%1000 == 0 {
                fmt.Println("Commit offset", record.GetSequence())
                offset.Sequence = record.GetSequence()
                offset.Timestamp = record.GetSystemTime()

                offsetMap := map[string]datahub.SubscriptionOffset{shardId: offset}
                err := dh.CommitSubscriptionOffset(projectName, topicName, subId, offsetMap)
                if err != nil {
                    if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
                        fmt.Println("Subscription reset, will reopen...")
                        // Offset has been reset. Need to reopen session.
                        session, err = dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
                        if err != nil {
                            fmt.Println("Reopen subscription session failed", err)
                            break
                        }
                        offset = session.Offsets[shardId]
                    } else if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
                        fmt.Println("Subscription used by other one")
                        break
                    } else {
                        fmt.Println("Commit offset failed", err)
                        break
                    }
                }
                recordCount = 0
            }
        }
        cursor = gr.NextCursor
    }
}

Error types

The DataHub Go SDK categorizes various error types. You can use type assertions to determine the specific error type and handle it accordingly. Among the defined error types, the following are retryable:

  • DatahubClientError

  • LimitExceededError

  • ServiceTemporaryUnavailableError

All other errors are non-retryable. Note that DatahubClientError may include some retryable cases such as server busy or server unavailable. It is recommended to implement retry logic in your code for retryable errors, but be sure to strictly limit the number of retries.

Class name

Error code

Description

InvalidParameterError

InvalidParameter

InvalidCursor

Invalid parameters.

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

The resource does not exist. This may occur if a request is sent right after a Split/Merge operation.

ResourceExistError

ResourceAlreadyExist

ProjectAlreadyExist

TopicAlreadyExist

ConnectorAlreadyExist

The resource already exists. Thrown when trying to create a resource that already exists.

SeekOutOfRangeError

SeekOutOfRange

When calling getCursor, the sequence is out of valid range (usually expired) or the timestamp is in the future.

AuthorizationFailedError

Unauthorized

Authorization signature parsing failed. Check your AccessKey ID and AccessKey Secret.

NoPermissionError

NoPermission

OperationDenied

No permission. Typically caused by incorrect RAM configuration or insufficient RAM user authorization.

NewShardSealedError

InvalidShardOperation

The shard is in the CLOSED state, which means it is readable but not writable. If you attempt to write data to a CLOSED shard or continue reading after the last record, this exception will be thrown.

LimitExceededError

LimitExceeded

API usage exceeded the allowed limits. Refer to Limits.

SubscriptionOfflineError

SubscriptionOffline

The subscription is offline and unavailable.

SubscriptionSessionInvalidError

OffsetSessionChanged

OffsetSessionClosed

Subscription session exception. Subscriptions establish sessions to commit offsets. If the session is used by another client, this error may occur.

SubscriptionOffsetResetError

OffsetReseted

The subscription offset has been reset.

MalformedRecordError

MalformedRecord

ShardNotReady

Invalid record format. Possible causes include schema mismatch, non-UTF-8 characters, or using protobuf when the server does not support it.

ServiceTemporaryUnavailableError

-

Temporary service unavailability, typically caused by network issues such as connection dropped. Retry is usually effective.

DatahubClientError

All other

Base class for all other errors. Retry may succeed if other conditions are ruled out.

DatahubClientError

DatahubClientError is the base error type for all errors in DataHub. Any error not explicitly defined in the table above will be of this type. It may include retryable scenarios such as server busy or server unavailable.

You can implement custom retry logic in your code to handle these cases, but again, make sure to limit retry attempts to avoid infinite loops or resource exhaustion.

type DatahubClientError struct {
    StatusCode int    `json:"StatusCode"`   // Http status code
    RequestId  string `json:"RequestId"`    // Request-id to trace the request
    Code       string `json:"ErrorCode"`    // Datahub error code
    Message    string `json:"ErrorMessage"` // Error msg of the error code
}

Error example

func example_error() {
    accessId := ""
    accessKey := ""
    endpoint := ""
    projectName := "datahub_go_test"
    maxRetry := 3
    dh := datahub.New(accessId, accessKey, endpoint)
    if err := dh.CreateProject(projectName, "project comment"); err != nil {
        if _, ok := err.(*datahub.InvalidParameterError); ok {
            fmt.Println("invalid parameter,please check your input parameter")
        } else if _, ok := err.(*datahub.ResourceExistError); ok {
            fmt.Println("project already exists")
        } else if _, ok := err.(*datahub.AuthorizationFailedError); ok {
            fmt.Println("accessId or accessKey err,please check your accessId and accessKey")
        } else if _, ok := err.(*datahub.LimitExceededError); ok {
            fmt.Println("limit exceed, so retry")
            for i := 0; i < maxRetry; i++ {
                // wait 5 seconds
                time.Sleep(5 * time.Second)
                if err := dh.CreateProject(projectName, "project comment"); err != nil {
                    fmt.Println("create project failed")
                    fmt.Println(err)
                } else {
                    fmt.Println("create project successful")
                    break
                }
            }
        } else {
            fmt.Println("unknown error")
            fmt.Println(err)
        }
    } else {
        fmt.Println("create project successful")
    }
}