All Products
Search
Document Center

DataHub:DataHub SDK for Go

Last Updated:Dec 26, 2022

Overview

Quick start

DataHub-related terms

For more information, see Terms.

Preparations

  • You must use an Alibaba Cloud account to access DataHub. To access DataHub, you must provide your AccessKey ID and AccessKey secret, and the endpoint that is used to access DataHub.

  • Obtain the DataHub SDK for Go package.

go get -u -insecure github.com/aliyun/aliyun-datahub-sdk-go/datahub
  • Initialize the DataHub client.

All API operations of DataHub SDK for Go must be called by using datahub.DataHub methods. Therefore, you must first initialize a DataHub object. 1. Use default parameters to create a DataHub object.

import "github.com/aliyun/aliyun-datahub-sdk-go/datahub"
accessId := ""
accessKey := ""
endpoint := ""
dh := datahub.New(accessId, accessKey, endpoint)

2. Use custom parameters to create a DataHub object. The following table describes the parameters that can be customized.

Parameter

Type

Default value

Valid value

Description

UserAgent

string

N/A

N/A

The agent of the user.

CompressorType

CompressorType

NOCOMPRESS

NOCOMPRESS, LZ4, DEFLATE, and ZLIB. A value of NOCOMPRESS indicates that no compression is performed.

The compression format that is used during data transmission.

EnableBinary

bool

true

true and false

Specifies whether the protocol buffer (PB) protocol is used when you read records from or write records to DataHub. If the PB protocol is not used in DataHub, you must set the enable_pb parameter to False.

HttpClient

*http.Client

datahub.DefaultHttpClient()

N/A

For more information, see net/http.

endpoint := ""
accessId := ""
accessKey := ""
token := ""
account := datahub.NewAliyunAccount(accessId, accessKey)
// Perform temporary AccessKey-based authentication.
// account := datahub.NewStsCredential(accessId, accessKey, token)
config := datahub.NewDefaultConfig()
config.CompressorType = datahub.DEFLATE
config.EnableBinary = true;
config.HttpClient = datahub.DefaultHttpClient()
dh := datahub.NewClientWithConfig(endpoint, config, account)
  • DataHub SDK for Go allows you to use the Go module to manage dependencies.

require (
    github.com/aliyun/aliyun-datahub-sdk-go/datahub v0.1.4
)

Sample code for offset consumption

func OffsetConsume() {
    accessId := ""
    accessKey := ""
    endpoint := "https://dh-cn-hangzhou.aliyuncs.com"
    dh := datahub.New(accessId, accessKey, endpoint)

    projectName := ""
    topicName := ""
    subId := ""
    shardId := "0"
    shardIds := []string{"0"}

    session, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
    if err != nil {
        fmt.Println("Open subscription session failed", err)
        return
    }

    offset := session.Offsets[shardId]
    var gc *datahub.GetCursorResult = nil

    // If the sequence number is smaller than 0, the record is not consumed.
    if offset.Sequence < 0 {
        // Obtain the cursor of the first record within the time to live (TTL) of the topic.
        gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
        if err != nil {
            fmt.Println("Get oldest cursor failed", err)
            return
        }
    } else {
        // Obtain the cursor of the next record.
        nextSequence := offset.Sequence + 1
        gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, nextSequence)

        if err != nil {
            // If the SeekOutOfRange error is returned after you obtain the cursor based on the sequence number, the record of the current cursor expires.
            if _, ok := err.(*datahub.SeekOutOfRangeError); ok {
                fmt.Println("Get cursor by sequence success for SeekOutOfRangeError, will retry...")
                gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
                if err != nil {
                    fmt.Println("Get oldest cursor failed", err)
                    return
                }
            }
        }
    }

    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("Get topic failed", err)
        return
    }

    // Read records and save offsets. For example, read tuple records and save an offset each time 1,000 records are read.
    recordCount := 0
    limitNum := 100
    cursor := gc.Cursor
    for true {
        gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor, limitNum, topic.RecordSchema)

        if err != nil {
            fmt.Println("Get records failed", err)
            break
        }
        if gr.RecordCount == 0 {
            fmt.Println("No data, sleep 5 seconds...")
            time.Sleep(time.Second * 5)
            continue
        }

        for _, record := range gr.Records {
            // Display records.
            data, _ := record.(*datahub.TupleRecord)
            fmt.Println(data.Values)

            recordCount += 1
            // Submit an offset for every 1,000 records.
            if recordCount%1000 == 0 {
                fmt.Println("Commit offset", record.GetSequence())
                offset.Sequence = record.GetSequence()
                offset.Timestamp = record.GetSystemTime()

                offsetMap := map[string]datahub.SubscriptionOffset{shardId: offset}
                err := dh.CommitSubscriptionOffset(projectName, topicName, subId, offsetMap)
                if err != nil {
                    if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
                        fmt.Println("Subscription reset, will reopen...")
                        // The offset is reset. You must open the session again.
                        session, err = dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
                        if err != nil {
                            fmt.Println("Reopen subscription session failed", err)
                            break
                        }
                        offset = session.Offsets[shardId]
                    } else if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
                        fmt.Println("Subscription used by other one")
                        break
                    } else {
                        fmt.Println("Commit offset failed", err)
                        break
                    }
                }
                recordCount = 0
            }
        }
        cursor = gr.NextCursor
    }
}

Methods

Manage projects

A project is a basic unit for managing data in DataHub. A project contains multiple topics. The projects in DataHub are independent of those in MaxCompute. You cannot reuse MaxCompute projects in DataHub. You must create projects in DataHub.

Create a project

Note

CreateProject(projectName, comment string) error

  • Parameters

    • projectName: the name of the project.

    • comment: the comments on the project.

  • Return result

  • Errors

    • ResourceExistError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func createProjet(dh datahub.DataHub, projectName string) {
    if err := dh.CreateProject(projectName, "project comment"); err != nil {
        fmt.Println("create project failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create successful")
}

Delete a project

You can call the DeleteProject method to delete a project.

Note

DeleteProject(projectName string) error

  • Parameters

    • projectName: the name of the project.

  • Return result

  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func deleteProject(dh datahub.DataHub, projectName string) {
    if err := dh.DeleteProject("123"); err != nil {
        fmt.Println("delete project failed")
        fmt.Println(err)
        return
    }
    fmt.Println("delete project successful")
}

Lists projects

You can call the ListProject method to list projects.

Note

ListProject() (*ListProjectResult, error)

  • Parameters: none

  • Return result

1. type ListProjectResult struct {
2.     ProjectNames []string `json:"ProjectNames"`
3. }
  • Errors

    • AuthorizationFailedError

    • DatahubClientError

  • Sample code

func listProject(dh datahub.DataHub, projectName string) {
    lp, err := dh.ListProject()
    if err != nil {
        fmt.Println("get project list failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get project list successful")
    for _, projectName := range lp.ProjectNames {
        fmt.Println(projectName)
    }
}

Query a project

You can call the GetProject method to query a project.

Note

GetProject(projectName string) (*GetProjectResult, error)

  • Parameters

    • projectName: the name of the project.

  • Return result

type GetProjectResult struct {
    CreateTime     int64  `json:"CreateTime"`
    LastModifyTime int64  `json:"LastModifyTime"`
    Comment        string `json"Comment"`
}
  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func getProject(dh datahub.DataHub, projectName string) {
    gp, err := dh.GetProject(projectName)
    if err != nil {
        fmt.Println("get project message failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get project message successful")
    fmt.Println(*gp)
}

Update a project

Note

UpdateProject(projectName, comment string) error

  • Parameters

    • projectName: the name of the project.

    • comment: the comments on the project.

  • Return result

  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

1. func updateProject(dh datahub.DataHub, projectName string) {
2.     if err := dh.UpdateProject(projectName, "new project comment"); err != nil {
3.         fmt.Println("update project comment failed")
4.         fmt.Println(err)
5.         return
6.     }
7.     fmt.Println("update project comment successful")
8. }

Manage topics

A topic is the smallest unit for data subscription and publishing in DataHub. You can use topics to distinguish different types of streaming data. Two types of topics are supported: tuple and blob. Tuple topics contain records that are similar to data records in databases. Each record contains multiple columns. You can write a block of binary data as a record to blob topics.

Create a topic

Tuple topic

Note

CreateTupleTopic(projectName, topicName, comment string, shardCount, lifeCycle int, recordSchema *RecordSchema) error

The data that is written to tuple topics is in a specific format. You must specify record schemas for tuple topics. The following table describes the supported data types.

Type

Description

Value range

BIGINT

An eight-byte signed integer.

-9223372036854775807 to 9223372036854775807.

DOUBLE

A double-precision floating-point number. It is eight bytes in length.

-1.0 _10^308 to 1.0 _10^308.

BOOLEAN

The Boolean type.

True and False, true and false, or 0 and 1.

TIMESTAMP

The type of timestamp.

A timestamp that is accurate to microseconds.

STRING

A string. Only UTF-8 encoding is supported.

The size of all values in a column of the STRING type cannot exceed 2 MB.

DECIMAL

s

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • comment: the comments on the topic.

    • lifeCycle: the TTL of the data. Unit: days. The data that is written before that time is not accessible.

    • recordSchema: the record schema for this topic.

  • Return result

  • Errors

    • ResourceExistError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func Example_CreateTupleTopic(dh datahub.DataHub, projectName, topicName string) {
    recordSchema := datahub.NewRecordSchema()
    recordSchema.AddField(datahub.Field{Name: "bigint_field", Type: datahub.BIGINT, AllowNull: true}).
        AddField(datahub.Field{Name: "timestamp_field", Type: datahub.TIMESTAMP, AllowNull: false}).
        AddField(datahub.Field{Name: "string_field", Type: datahub.STRING}).
        AddField(datahub.Field{Name: "double_field", Type: datahub.DOUBLE}).
        AddField(datahub.Field{Name: "boolean_field", Type: datahub.BOOLEAN})
    if err := dh.CreateTupleTopic(projectName, topicName, "topic comment", 5, 7, recordSchema); err != nil {
        fmt.Println("create topic failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create topic successful")
}

Blob topic

Note

CreateBlobTopic(projectName, topicName, comment string, shardCount, lifeCycle int) error

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • comment: the comments on the topic.

    • lifeCycle: the TTL of the data. Unit: days. The data that is written before that time is not accessible.

  • Return result

  • Errors

    • ResourceExistError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func Example_CreateBlobTopic(dh datahub.DataHub, projectName, topicName string) {
    if err := dh.CreateBlobTopic(projectName, topicName, "topic comment", 5, 7); err != nil {
        fmt.Println("create topic failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create topic successful")
}

Delete a topic

Note

DeleteTopic(projectName, topicName string) error

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

  • Return result

  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func ExampleDataHub_DeleteTopic(dh datahub.DataHub, projectName, topicName string) {
    if err := dh.DeleteTopic(projectName, topicName); err != nil {
        fmt.Println("delete failed")
        fmt.Println(err)
        return
    }
    fmt.Println("delete successful")
}

List topics

Note

ListTopic(projectName string) (*ListTopicResult, error)

  • Parameters

    • projectName: the name of the project.

  • Return result

type ListTopicResult struct {
    TopicNames [] string `json:"TopicNames"`
}
  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func ExampleDataHub_ListTopic(dh datahub.DataHub, projectName, topicName string) {
    lt, err := dh.ListTopic(projectName)
    if err != nil {
        fmt.Println("get topic list failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get topic list successful")
    fmt.Println(lt)
}

Update a topic

Note

UpdateTopic(projectName, topicName, comment string) error

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • comment: the comments on the topic.

  • Return result

  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func ExampleDataHub_UpdateTopic(dh datahub.DataHub, projectName, topicName string) {
    if err := dh.UpdateTopic(projectName, topicName, "new topic comment"); err != nil {
        fmt.Println("update topic comment failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update topic comment successful")
}

Manage schemas

A schema indicates the name and type of data storage in DataHub. Schemas are used when you create tuple topics, and read records from or write records to topics. Data in tuple topics is transmitted as strings over the network. Therefore, schemas are required for data type conversion. A schema is the slice of a field object. A field contains three parameters: the field name, field type, and Boolean value. A value of True indicates that the field can be left empty. A value of False indicates that the field value must be specified.

Query a schema

You can call the get_topic method to obtain the information about a schema in a created tuple topic.

  • Sample code

func getSchema(dh datahub.DataHub, projectName, topicName string) {
    gt, err := dh.GetTopic(projectName, "topic_test")
    if err != nil {
        fmt.Println("get topic failed")
        fmt.Println(err)
        return
    } else {
        schema := gt.RecordSchema
        fmt.Println(schema)
    }
}

Define a schema

To create a tuple topic, you must define a schema. You can use the following methods to initialize a schema:

  • Create a schema

func createSchema1() {
    fields := []datahub.Field{
        {"bigint_field", datahub.BIGINT, true},
        {"timestamp_field", datahub.TIMESTAMP, false},
        {"string_field", datahub.STRING, false},
        {"double_field", datahub.DOUBLE, false},
        {"boolean_field", datahub.BOOLEAN, true},
        {"decimal_field", datahub.DECIMAL, false},
    }
    schema := datahub.RecordSchema{
        fields,
    }
    fmt.Println(schema)
}
  • Set schema parameters one by one

func createSchema2() {
    recordSchema := datahub.NewRecordSchema()
    recordSchema.AddField(datahub.Field{Name: "bigint_field", Type: datahub.BIGINT, AllowNull: true}).
        AddField(datahub.Field{Name: "timestamp_field", Type: datahub.TIMESTAMP, AllowNull: false}).
        AddField(datahub.Field{Name: "string_field", Type: datahub.STRING}).
        AddField(datahub.Field{Name: "double_field", Type: datahub.DOUBLE}).
        AddField(datahub.Field{Name: "boolean_field", Type: datahub.BOOLEAN}).
        AddField(datahub.Field{Name: "decimal_field", Type: datahub.DECIMAL})
}
  • Define a schema by using a JSON string

func createSchema3() {
    str := ""
    schema, err := datahub.NewRecordSchemaFromJson(str)
    if err != nil {
        fmt.Println("create recordSchema failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create recordSchema successful")
    fmt.Println(schema)
}

The JSON string is in the following format: "{"fields":[{"type":"BIGINT","name":"a"},{"type":"STRING","name":"b"}]}".

Manage shards

Shards are concurrent tunnels used for data transmission in a topic. Each shard has an ID. A shard can be in different states. Opening: The shard is being started. Active: The shard is started and can be used to provide services. Each active shard consumes server resources. We recommend that you create shards as needed. You can merge and split shards. If the data amount increases, you can split shards to provide more tunnels for data transmission. This allows more concurrent data writes to topics. If the data amount decreases, you can merge shards to save server resources. For example, as the data amount sharply increases in a massive online promotion, each shard is overloaded with data writes. In this case, you can split shards to improve the efficiency in data writing. After the online promotion ends, the data amount decreases. In this case, you can merge shards.

List shards

Note

ListShard(projectName, topicName string) (*ListShardResult, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

  • Return result

1. type SplitShardResult struct {
2.     NewShards []ShardEntry `json:"NewShards"`
3. }
  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func ExampleDataHub_ListShard() {
    ls, err := dh.ListShard(projectName, topicName)
    if err != nil {
        fmt.Println("get shard list failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get shard list successful")
    for _, shard := range ls.Shards {
        fmt.Println(shard)
    }
}

Split a shard

You can split only active shards. After a shard is split, two new shards are generated, and the original shard is closed. When you split a shard, a split key is required. If you call the SplitShard method, DataHub automatically generates a split key. Alternatively, you can call the SplitShardWithSplitKey method to specify a split key to meet your dedicated requirements. For more information, see the Hash key range term in the Terms topic.

Note

SplitShard(projectName, topicName, shardId string) (SplitShardResult, error) SplitShardWithSplitKey(projectName, topicName, shardId, splitKey string) (SplitShardResult, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard to be split.

    • splitKey: the split key that is used to split the shard.

  • Return result

type SplitShardResult struct {
    NewShards []ShardEntry `json:"NewShards"`
}
  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func ExampleDataHub_SplitShard() {
    // The ID of the shard that you want to split.
    shardId := "0"
    ss, err := dh.SplitShard(projectName, topicName, shardId)
    if err != nil {
        fmt.Println("split shard failed")
        fmt.Println(err)
        return
    }
    fmt.Println("split shard successful")
    fmt.Println(ss)
    // After splitting, you need to wait for all shard states to be ready
    // before you can perform related operations.
    dh.WaitAllShardsReady(projectName, topicName)
}

Merge shards

The two shards to be merged must be adjacent and active.

Note

MergeShard(projectName, topicName, shardId, adjacentShardId string) (*MergeShardResult, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard to be merged.

    • adjacentShardId: the ID of the shard that is adjacent to the specified shard.

  • Sample code

  • Return result

type MergeShardResult struct {
    ShardId      string `json:"ShardId"`
    BeginHashKey string `json:"BeginHashKey"`
    EndHashKey   string `json:"EndHashKey"`
}
  • Errors

    • ResourceNotFoundError

    • InvalidOperationError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

    • ShardSealedError

func ExampleDataHub_MergeShard() {
    shardId := "3"
    adjacentShardId := "4"
    ms, err := dh.MergeShard(projectName, topicName, shardId, adjacentShardId)
    if err != nil {
        fmt.Println("merge shard failed")
        fmt.Println(err)
        return
    }
    fmt.Println("merge shard successful")
    fmt.Println(ms)
    // After splitting, you need to wait for all shard states to be ready
    // before you can perform related operations.
    dh.WaitAllShardsReady(projectName, topicName)
}

Record publishing and subscription

You can subscribe to the records of active and closed shards. However, you can publish records only to active shards. If you publish records to closed shards, the ShardSealedError error is reported. If the records that you read from a closed shard come to the end, the ShardSealedError error is also reported, which indicates that no new record exists.

Publish records

If you want to publish records to a specific topic, you must specify a shard in this topic for each record. Therefore, you must call the ListShard method to view the shards in this topic. After you call the PutRecords method, check whether the records fail to be published.

Note

PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) error

You can call the PutRecordsByShard method for DataHub V2.12 and later. For versions earlier than V2.12, call the PutRecords method.

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • records: the list of records to be written.

  • Return result

type PutRecordsResult struct {
    FailedRecordCount int            `json:"FailedRecordCount"`
    FailedRecords     []FailedRecord `json:"FailedRecords"`
}
  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

// put tuple data
func putTupleData() {
    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("get topic failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get topic successful")
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record1.ShardId = "0"
    record1.SetValueByName("field1", "TEST1")
    record1.SetValueByName("field2", 1)
    //you can add some attributes when put record
    record1.SetAttribute("attribute", "test attribute")
    records[0] = record1
    record2 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record2.ShardId = "1"
    record2.SetValueByName("field1", datahub.String("TEST2"))
    record2.SetValueByName("field2", datahub.Bigint(2))
    records[1] = record2
    record3 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record3.ShardId = "2"
    record3.SetValueByName("field1", datahub.String("TEST3"))
    record3.SetValueByName("field2", datahub.Bigint(3))
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        result, err := dh.PutRecords(projectName, topicName, records)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("put record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
        for _, v := range result.FailedRecords {
            fmt.Println(v)
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("put records failed ")
    }
}
// put blob data
func putBlobData() {
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
    record1.ShardId = "0"
    records[0] = record1
    record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
    record2.ShardId = "1"
    record2.SetAttribute("attribute", "test attribute")
    records[1] = record2
    record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
    record3.ShardId = "2"
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        result, err := dh.PutRecords(projectName, blobTopicName, records)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("put record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
        for _, v := range result.FailedRecords {
            fmt.Println(v)
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("put records failed ")
    }
}
// put data by shard
func putDataByShard() {
    shardId := "0"
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
    records[0] = record1
    record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
    record2.SetAttribute("attribute", "test attribute")
    records[1] = record2
    record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        if err := dh.PutRecordsByShard(projectName, blobTopicName, shardId, records); err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("put record failed")
                fmt.Println(err)
                return
            }
        }
    }
    if retryNum >= maxReTry {
        fmt.Printf("put records failed ")
    }else {
        fmt.Println("put record successful")
    }
}

When you publish records, you can add record-related information, such as the information about record collection scenarios. The following sample code provides an example on how to add record-related information:

record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record1.SetAttribute("attribute","test attribute")
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.SetAttribute("attribute","test attribute")

Subscribe to records

To subscribe to the records of a topic, you must also specify a shard and the cursor for reading the records. You can call the GetCursor method to obtain the cursor.

Note

GetCursor(projectName, topicName, shardId string, ctype CursorType, param …int64) (*GetCursorResult, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • ctype: the method that is used to obtain the cursor. You can obtain the cursor by using the following methods: OLDEST, LATEST, SEQUENCE, and SYSTEM_TIME.

      • OLDEST: the cursor that points to the earliest valid record in the specified shard.

      • LATEST: the cursor that points to the latest record in the specified shard.

      • SEQUENCE: the cursor that points to the record of the specified sequence number.

      • SYSTEM_TIME: the cursor that points to the first record whose timestamp value is greater than or equal to the specified timestamp value.

    • param: the parameter used to obtain the cursor when you use the SEQUENCE or SYSTEM_TIME method.

  • Return result

type GetCursorResult struct {
    Cursor     string `json:"Cursor"`
    RecordTime int64  `json:"RecordTime"`
    Sequence   int64  `json:"Sequence"`
}
  • Errors

    • ResourceNotFoundError

    • SeekOutOfRangeError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

    • ShardSealedError

  • Sample code

func cursor(dh datahub.DataHub, projectName, topicName string) {
    shardId := "0"
    gr, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
    }else{
        fmt.Println(gr)
    }
    gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.LATEST)
    fmt.Println(err)
    fmt.Println(gr)
    var seq int64 = 10
    gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, seq)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
    }else{
        fmt.Println(gr)
    }
}

To read records from the specified shard, you must specify a cursor from which records start to be read and the maximum number of records to be read. If the number of records from the specified cursor to the end of the shard is less than the maximum value, the records that are actually read are returned.

Read records from a tuple topic

Note

GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema _RecordSchema) (_GetRecordsResult, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • cursor: the cursor from which data starts to be read.

    • limit: the maximum number of records to be read.

    • recordSchema: the record schema for the topic.

  • Return result

type GetRecordsResult struct {
    NextCursor    string        `json:"NextCursor"`
    RecordCount   int           `json:"RecordCount"`
    StartSequence int64         `json:"StartSeq"`
    Records       []IRecord     `json:"Records"`
    RecordSchema  *RecordSchema `json:"-"`
}
  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func getTupleData() {
    shardId := "1"
    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("get topic failed")
        return
    }
    fmt.Println("get topic successful")
    cursor, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get cursor successful")
    limitNum := 100
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor.Cursor, limitNum, topic.RecordSchema)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("get record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Println("get record successful")
        for _, record := range gr.Records {
            data, ok := record.(*datahub.TupleRecord)
            if !ok {
                fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
            } else {
                fmt.Println(data.Values)
            }
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("get records failed ")
    }
}

Read records from a blob topic

Note

GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • cursor: the cursor from which data starts to be read.

    • limit: the maximum size of records to be read.

  • Return result

type GetRecordsResult struct {
    NextCursor    string        `json:"NextCursor"`
    RecordCount   int           `json:"RecordCount"`
    StartSequence int64         `json:"StartSeq"`
    Records       []IRecord     `json:"Records"`
    RecordSchema  *RecordSchema `json:"-"`
}
  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func getBlobData() {
    shardId := "1"
    cursor, err := dh.GetCursor(projectName, blobTopicName, shardId, datahub.OLDEST)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get cursor successful")
    limitNum := 100
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        gr, err := dh.GetBlobRecords(projectName, blobTopicName, shardId, cursor.Cursor, limitNum)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("get record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Println("get record successful")
        for _, record := range gr.Records {
            data, ok := record.(*datahub.BlobRecord)
            if !ok {
                fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
            } else {
                fmt.Println(data.StoreData)
            }
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("get records failed ")
    }
}

Query metering information

Metering information refers to the statistics on the resource usage of shards, which is updated every hour.

Note

GetMeterInfo(projectName, topicName, shardId string) (*GetMeterInfoResult, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

  • Return result

type GetMeterInfoResult struct {
    ActiveTime int64 `json:"ActiveTime"`
    Storage    int64 `json:"Storage"`
}
  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func meter(dh datahub.DataHub, projectName, topicName string) {
    shardId := "0"
    gmi, err := dh.GetMeterInfo(projectName, topicName, shardId)
    if err != nil {
        fmt.Println("get meter information failed")
        return
    }
    fmt.Println("get meter information successful")
    fmt.Println(gmi)
}

Manage DataConnectors

A DataConnector in DataHub synchronizes streaming data from DataHub to other cloud services. You can use DataConnectors to synchronize data from DataHub topics to MaxCompute, Object Storage Service (OSS), Elasticsearch, AnalyticDB for MySQL, ApsaraDB RDS for MySQL, Function Compute, Tablestore, and DataHub in real-time or near real-time mode. After DataConnectors are configured, the data you write to DataHub can be used in other Alibaba Cloud services. The following examples describe how to use a DataConnector to synchronize data from DataHub to MaxCompute. For more information about MaxCompute configurations, see Synchronize data to MaxCompute. In DataHub V2.14.0 and later, the connectorType parameter is replaced by the connectorId parameter for all DataConnector-related methods except the createConnector method. However, you can still call the methods of DataHub whose version is earlier than V2.14.0 in DataHub V2.14.0 and later, provided that the value of the connectorType parameter is converted to a string.

  • Sample code

1. gcr, err := dh.GetConnector(projectName, topicName, string(datahub.SinkOdps))

Create a DataConnector

Note

CreateConnector(projectName, topicName string, cType ConnectorType, columnFields []string, config interface{}) (CreateConnectorResult, error) CreateConnectorWithStartTime(projectName, topicName string, cType ConnectorType, columnFields []string, sinkStartTime int64, config interface{}) (CreateConnectorResult, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • cType: the type of DataConnector that you want to create.

    • columnFields: the fields that you want to synchronize.

    • sinkStartTime: the start time of the synchronization.

    • config: the configuration details of the specified type of DataConnector.

  • Return result

type CreateConnectorResult struct {
    ConnectorId string `json:"ConnectorId"`
 }
  • Errors

    • ResourceNotFoundError

    • ResourceExistError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func createConnector(dh datahub.DataHub, projectName, topicName string) {
    odpsEndpoint := ""
    odpsProject := "datahub_test"
    odpsTable := "datahub_go_example"
    odpsAccessId := ""
    odpsAccessKey := "="
    odpsTimeRange := 60
    odpsPartitionMode := datahub.SystemTimeMode
    connectorType := datahub.SinkOdps
    odpsPartitionConfig := datahub.NewPartitionConfig()
    odpsPartitionConfig.AddConfig("ds", "%Y%m%d")
    odpsPartitionConfig.AddConfig("hh", "%H")
    odpsPartitionConfig.AddConfig("mm", "%M")
    sinkOdpsConfig := datahub.SinkOdpsConfig{
        Endpoint:        odpsEndpoint,
        Project:         odpsProject,
        Table:           odpsTable,
        AccessId:        odpsAccessId,
        AccessKey:       odpsAccessKey,
        TimeRange:       odpsTimeRange,
        PartitionMode:   odpsPartitionMode,
        PartitionConfig: *odpsPartitionConfig,
    }
    fileds := []string{"field1", "field2"}
    if err := dh.CreateConnector(projectName, topicName, connectorType, fileds, *sinkOdpsConfig); err != nil {
        fmt.Println("create odps connector failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create odps connector successful")
}

List DataConnectors

Note

ListConnector(projectName, topicName string) (*ListConnectorResult, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

  • Return result

type ListConnectorResult struct {
    ConnectorIds []string `json:"Connectors"`
}
  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func listConnector(dh datahub.DataHub, projectName, topicName string) {
    lc, err := dh.ListConnector(projectName, topicName)
    if err != nil {
        fmt.Println("get connector list failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get connector list successful")
    fmt.Println(lc)
}

Query a DataConnector

Note

GetConnector(projectName, topicName, connectorId string) (*GetConnectorResult, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • connectorId: the ID of the DataConnector.

  • Return result

type GetConnectorResult struct {
    CreateTime     int64             `json:"CreateTime"`
    LastModifyTime int64             `json:"LastModifyTime"`
    ConnectorId    string            `json:"ConnectorId"`
    ClusterAddress string            `json:"ClusterAddress"`
    Type           ConnectorType     `json:"Type"`
    State          ConnectorState    `json:"State"`
    ColumnFields   []string          `json:"ColumnFields"`
    ExtraConfig    map[string]string `json:"ExtraInfo"`
    Creator        string            `json:"Creator"`
    Owner          string            `json:"Owner"`
    Config         interface{}       `json:"Config"`
}
  • Errors

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func getConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
    gcr, err := dh.GetConnector(projectName, topicName, connectorId)
    if err != nil {
        fmt.Println("get odps conector failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get odps conector successful")
    fmt.Println(*gcr)
}

Update DataConnector configurations

Note

UpdateConnector(projectName, topicName, connectorId string, config interface{}) error

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • connectorId: the ID of the DataConnector.

    • config: the configuration details of the specified type of DataConnector.

  • Return result

  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func updateConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
    gc, err := dh.GetConnector(projectName, topicName, connectorId)
    if err != nil {
        fmt.Println("get odps connector failed")
        fmt.Println(err)
        return
    }
    config, ok := gc.Config.(datahub.SinkOdpsConfig)
    if !ok {
        fmt.Println("convert config to SinkOdpsConfig failed")
        return
    }
    // modify the config
    config.TimeRange = 200
    if err := dh.UpdateConnector(projectName, topicName, connectorId, config); err != nil {
        fmt.Println("update odps config failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update odps config successful")
}

Delete a DataConnector

Note

DeleteConnector(projectName, topicName, connectorId string) error

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • connectorId: the ID of the DataConnector.

  • Return result

  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

 func deleteConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
     if err := dh.DeleteConnector(projectName, topicName, connectorId); err != nil {
         fmt.Println("delete odps connector failed")
         fmt.Println(err)
         return
     }
    fmt.Println("delete odps connector successful")
 }

Query the shard status of a DataConnector

You can query the status of all shards in a topic or the status of the specified shard in a topic.

Note

GetConnectorShardStatus(projectName, topicName, connectorId string) (GetConnectorShardStatusResult, error) GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId string) (ConnectorShardStatusEntry, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • connectorId: the ID of the DataConnector.

  • Return result

// getConnectorShardStatus
type GetConnectorShardStatusResult struct {
    ShardStatus map[string]ConnectorShardStatusEntry `json:"ShardStatusInfos"`
}
// GetConnectorShardStatusByShard
type ConnectorShardStatusEntry struct {
    StartSequence    int64               `json:"StartSequence"`
    EndSequence      int64               `json:"EndSequence"`
    CurrentSequence  int64               `json:"CurrentSequence"`
    CurrentTimestamp int64               `json:"CurrentTimestamp"`
    UpdateTime       int64               `json:"UpdateTime"`
    State            ConnectorShardState `json:"State"`
    LastErrorMessage string              `json:"LastErrorMessage"`
    DiscardCount     int64               `json:"DiscardCount"`
    DoneTime         int64               `json:"DoneTime"`
    WorkerAddress    string              `json:"WorkerAddress"`
}
  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func getConnectorShardStatus(dh datahub.DataHub, projectName, topicName, connectorId string) {
    gcs, err := dh.GetConnectorShardStatus(projectName, topicName, connectorId)
    if err != nil {
        fmt.Println("get connector shard status failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get connector shard status successful")
    for shard, status := range gcs.ShardStatus {
        fmt.Println(shard, status.State)
    }
    shardId := "0"
    gc, err := dh.GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId)
    if err != nil {
        fmt.Println("get connector shard status failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get connector shard status successful")
    fmt.Println(*gc)
}

Restart a shard for a DataConnector

You can restart all shards in a topic or restart the specified shard in a topic.

Note

ReloadConnector(projectName, topicName, connectorId string) error ReloadConnectorByShard(projectName, topicName, connectorId, shardId string) error

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • connectorId: the ID of the DataConnector.

    • shardId: the ID of the shard.

  • Return result

  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func reloadConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
    if err := dh.ReloadConnector(projectName, topicName, connectorId); err != nil {
        fmt.Println("reload connector shard failed")
        fmt.Println(err)
        return
    }
    fmt.Println("reload connector shard successful")
    shardId := "2"
    if err := dh.ReloadConnectorByShard(projectName, topicName, connectorId, shardId); err != nil {
        fmt.Println("reload connector shard failed")
        fmt.Println(err)
        return
    }
    fmt.Println("reload connector shard successful")
}

Add a field

You can add a field to be synchronized by using a DataConnector, provided that both the DataHub topic and MaxCompute table contain the specified field.

Note

AppendConnectorField(projectName, topicName, connectorId, fieldName string) error

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • connectorId: the ID of the DataConnector.

    • fieldName: the name of the field.

  • Return result

  • Errors

    • ResourceNotFoundError

    • InvalidParameterError

  • Sample code

func appendConnectorField(dh datahub.DataHub, projectName, topicName, connectorId string) {
    if err := dh.AppendConnectorField(projectName, topicName, connectorId, "field2"); err != nil {
        fmt.Println("append filed failed")
        fmt.Println(err)
        return
    }
    fmt.Println("append filed successful")
}

Update the status of a DataConnector

A DataConnector can be in the CONNECTOR_PAUSED or CONNECTOR_RUNNING state, which indicates that the DataConnector is stopped or running.

Note

UpdateConnectorState(projectName, topicName, connectorId string, state ConnectorState) error

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • connectorId: the ID of the DataConnector.

    • state: the state of the DataConnector that you want to update.

  • Return result

  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func updateConnectorState(dh datahub.DataHub, projectName, topicName, connectorId string) {
    if err := dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorStopped); err != nil {
        fmt.Println("update connector state failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update connector state successful")
    if err := dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorRunning); err != nil {
        fmt.Println("update connector state failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update connector state successful")
}

Update the offset of a DataConnector

Note

UpdateConnectorOffset(projectName, topicName, connectorId, shardId string, offset ConnectorOffset) error

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • connectorId: the ID of the DataConnector.

    • offset: the offset of the DataConnector.

  • Return result

  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func updateConnectorOffset(dh datahub.DataHub, projectName, topicName, connectorId string) {
    shardId := "10"
    offset := datahub.ConnectorOffset{
        Timestamp: 1565864139000,
        Sequence:  104,
    }
    dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorStopped)
    defer dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorRunning)
    if err := dh.UpdateConnectorOffset(projectName, topicName, connectorId, shardId, offset); err != nil {
        fmt.Println("update connector offset failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update connector offset successful")
}

Query the completion time of a DataConnector

You can query the completion time of a DataConnector only when the DataConnector is used to synchronize data from DataHub to MaxCompute.

Note

GetConnectorDoneTime(projectName, topicName, connectorId string) (*GetConnectorDoneTimeResult, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • connectorId: the ID of the DataConnector.

  • Return result

1. type GetConnectorDoneTimeResult struct {
2.     DoneTime int64 `json:"DoneTime"`
3. }
  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func doneTime(dh datahub.DataHub, projectName, topicName, connectorId string) {
    gcd, err := dh.GetConnectorDoneTime(projectName, topicName, connectorId)
    if err != nil {
        fmt.Println("get connector done time failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get connector done time successful")
    fmt.Println(gcd.DoneTime)
}

Manage subscriptions

DataHub allows the server to save the consumption offsets of a subscription. You can obtain highly available offset storage services by performing simple configurations.

Create a subscription

Note

CreateSubscription(projectName, topicName, comment string) error

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • comment: the comments on the subscription.

  • Return result

  • Errors

    • ResourceExistError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func createSubscription() {
    csr, err := dh.CreateSubscription(projectName, topicName, "sub comment")
    if err != nil {
        fmt.Println("create subscription failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create subscription successful")
    fmt.Println(*csr)
}

Delete a subscription

Note

DeleteSubscription(projectName, topicName, subId string) error

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

  • Return result

  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func delSubscription(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565577384801DCN0O"
    if err := dh.DeleteSubscription(projectName, topicName, subId); err != nil {
        fmt.Println("delete subscription failed")
        return
    }
    fmt.Println("delete subscription successful")
}

Query a subscription

Note

GetSubscription(projectName, topicName, subId string) (*GetSubscriptionResult, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

  • Return result

 type GetSubscriptionResult struct {
     SubscriptionEntry
 }
  • Errors

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func getSubscription(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565577384801DCN0O"
    gs, err := dh.GetSubscription(projectName, topicName, subId)
    if err != nil {
        fmt.Println("get subscription failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get subscription successful")
    fmt.Println(gs)
}

List subscriptions

You can use the pageIndex and pageSize parameters to list a specific range of subscriptions. For example, you can set the pageIndex and pageSize parameters to 1 and 10 to list the first 10 subscriptions. For another example, you can set the pageIndex and pageSize parameters to 2 and 5 to list the sixth to tenth subscriptions.

Note

ListSubscription(projectName, topicName string, pageIndex, pageSize int) (*ListSubscriptionResult, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • pageIndex: the number of the page to return. pageSize: the number of entries to return on each page.

  • Return result

type ListSubscriptionResult struct {
    TotalCount    int64               `json:"TotalCount"`
    Subscriptions []SubscriptionEntry `json:"Subscriptions"`
}
  • Errors

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func listSubscription(dh datahub.DataHub, projectName, topicName string) {
    pageIndex := 1
    pageSize := 5
    ls, err := dh.ListSubscription(projectName, topicName, pageIndex, pageSize)
    if err != nil {
        fmt.Println("get subscription list failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get subscription list successful")
    for _, sub := range ls.Subscriptions {
        fmt.Println(sub)
    }
}

Update a subscription

You can update only the comments on a subscription.

Note

UpdateSubscription(projectName, topicName, subId, comment string) error

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • comment: the comments on the subscription.

  • Return result

  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func updateSubscription(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    if err := dh.UpdateSubscription(projectName, topicName, subId, "new sub comment"); err != nil {
        fmt.Println("update subscription comment failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update subscription comment successful")
}

Update the status of a subscription

A subscription can be in the SUB_OFFLINE or SUB_ONLINE state, which indicate an offline or online subscription.

Note

UpdateSubscriptionState(projectName, topicName, subId string, state SubscriptionState) error

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • state: the state that you want to update.

  • Return result

  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func updateSubState(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    if err := dh.UpdateSubscriptionState(projectName, topicName, subId, datahub.SUB_OFFLINE); err != nil {
        fmt.Println("update subscription state failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update subscription state successful")
}

Manage offsets

After a subscription is created, it is initially unconsumed. To use the offset storage feature of the subscription, perform the following operations on offsets:

Initialize an offset

To use the subscription to manage offsets, you must first initialize the subscription. You cannot use multiple threads to concurrently consume the data of a subscription. To consume the same data by using multiple threads, different subscriptions must be used. After you call the OpenSubscriptionSession method, the value of the SessionId parameter increases by one. The previous session becomes invalid, and you cannot update the offset.

Note

OpenSubscriptionSession(projectName, topicName, subId string, shardIds []string) (*OpenSubscriptionSessionResult, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • shardIds: the IDs of shards.

  • Return result

type OpenSubscriptionSessionResult struct {
    Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
// SubscriptionOffset
type SubscriptionOffset struct {
    Timestamp int64  `json:"Timestamp"`
    Sequence  int64  `json:"Sequence"`
    VersionId int64  `json:"Version"`
    SessionId *int64 `json:"SessionId"`
}
  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func openOffset(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    shardIds := []string{"0", "1", "2"}
    oss, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
    if err != nil {
        fmt.Println("open session failed")
        fmt.Println(err)
        return
    }
    fmt.Println("open session successful")
    fmt.Println(oss)
}

Obtain an offset

You can call the GetSubscriptionOffset method to query the current offset of a subscription. Different from the offset information obtained by calling the OpenSubscriptionSession method, the offset information obtained by calling the GetSubscriptionOffset method contains the SessionId parameter whose value is nil. In this case, you cannot submit offsets. In general, the GetSubscriptionOffset method is called to view offset information.

Note

GetSubscriptionOffset(projectName, topicName, subId string, shardIds []string) (*GetSubscriptionOffsetResult, error)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • shardIds: the IDs of shards.

  • Return result

type OpenSubscriptionSessionResult struct {
    Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
// SubscriptionOffset
type SubscriptionOffset struct {
    Timestamp int64  `json:"Timestamp"`
    Sequence  int64  `json:"Sequence"`
    VersionId int64  `json:"Version"`
    SessionId *int64 `json:"SessionId"`
}
  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func getOffset(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    shardIds := []string{"0", "1", "2"}
    gss, err := dh.GetSubscriptionOffset(projectName, topicName, subId, shardIds)
    if err != nil {
        fmt.Println("get session failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get session successful")
    fmt.Println(gss)
}

Update an offset

When you update an offset, DataHub verifies the values of the versionId and sessionId parameters. Make sure that the values are the same as those in the current session. Otherwise, the update fails. When you update an offset, you must set the Timestamp and Sequence parameters for the offset. Otherwise, the offset that is obtained after the update may be invalid. If the Timestamp and Sequence parameters are not set as required, the offset is updated based on the Timestamp parameter. We recommend that you update an offset based on the Timestamp and Sequence parameters that correspond to the record of the offset.

Note

CommitSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) error

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • offsets: the offset map of shards.

  • Return result

  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func updateOffset() {
    shardIds := []string{"0", "1", "2"}
    oss, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
    if err != nil {
        fmt.Println("open session failed")
        fmt.Println(err)
    }
    fmt.Println("open session successful")
    fmt.Println(oss)
    offset := oss.Offsets["0"]
    // set offset message
    offset.Sequence = 900
    offset.Timestamp = 1565593166690
    offsetMap := map[string]datahub.SubscriptionOffset{
        "0": offset,
    }
    if err := dh.CommitSubscriptionOffset(projectName, topicName, subId, offsetMap); err != nil {
        if _, ok := err.(*datahub.SubscriptionOfflineError); ok {
            fmt.Println("the subscription has offline")
        } else if _, ok := err.(*datahub.SubscriptionSessionInvalidError); ok {
            fmt.Println("the subscription is open elsewhere")
        } else if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
            fmt.Println("the subscription is reset elsewhere")
        } else {
            fmt.Println(err)
        }
        fmt.Println("update offset failed")
        return
    }
    fmt.Println("update offset successful")
}

Reset an offset

You can reset an offset to a specific point in time. After an offset is reset, the value of the VersionId parameter for the reset offset increases by one. The previous session becomes invalid, and you cannot update the offset.

Note

ResetSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) error

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • offsets: the offset map of shards.

  • Return result

  • Errors

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • Sample code

func resetOffset(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    offset := datahub.SubscriptionOffset{
        Timestamp: 1565593166690,
    }
    offsetMap := map[string]datahub.SubscriptionOffset{
        "1": offset,
    }
    if err := dh.ResetSubscriptionOffset(projectName, topicName, subId, offsetMap); err != nil {
        fmt.Println("reset offset failed")
        fmt.Println(err)
        return
    }
    fmt.Println("reset offset successful")
}

Error types

This section describes the types of errors related to DataHub SDK for Go. You can determine the type of an error based on the error description, and process the error based on the error type. Only the DatahubClientError, LimitExceededError, and ServiceTemporaryUnavailableError errors can be resolved by retries. Some DatahubClientError errors, such as the errors that are caused because the server is busy or unavailable, can be resolved by retries. We recommend that you add retry logic to the code for the errors that can be resolved by retries. However, the number of retries must be limited.

Error type

Error message

Description

InvalidParameterError

InvalidParameter, InvalidCursor

The error message returned because a specified parameter is invalid.

ResourceNotFoundError

ResourceNotFound, NoSuchProject, NoSuchTopic, NoSuchShard, NoSuchSubscription, NoSuchConnector, NoSuchMeteringInfo

The error message returned because the resource to be accessed does not exist. If you immediately send another request after you split or merge shards, this error message is returned.

ResourceExistError

ResourceAlreadyExist, ProjectAlreadyExist, TopicAlreadyExist, ConnectorAlreadyExist

The error message returned because the resource already exists. If the resource that you want to create already exists, this error message is returned.

SeekOutOfRangeError

SeekOutOfRange

The error message returned because the specified sequence number is invalid or the specified timestamp is later than the current timestamp when you obtain the cursor. The sequence number may become invalid because the record of the cursor expires.

AuthorizationFailedError

Unauthorized

The error message returned because an error occurs when the authorization signature is being parsed. Check whether the AccessKey pair is valid.

NoPermissionError

NoPermission, OperationDenied

The error message returned because you do not have permissions. Check whether the RAM configurations are valid or the RAM user is authorized.

NewShardSealedError

InvalidShardOperation

The error message returned because the shard is closed and data cannot be read from or written to the shard. If you continue to write data to the shard or continue to read data after the last data record is read from the shard, this error message is returned.

LimitExceededError

LimitExceeded

The error message returned because the limits of DataHub SDK for Go have been exceeded. For more information, see Limits.

SubscriptionOfflineError

SubscriptionOffline

The error message returned because the subscription is offline and cannot be used.

SubscriptionSessionInvalidError

OffsetSessionChanged, OffsetSessionClosed

The error message returned because the subscription session is abnormal. When a subscription is used, a session is established to submit offsets. If the subscription is also used on another client, this error message is returned.

SubscriptionOffsetResetError

OffsetReseted

The error message returned because the offset of a subscription is reset.

MalformedRecordError

MalformedRecord and ShardNotReady

The error message returned because the record format is invalid. This may be caused because the schema is invalid, non-UTF-8 characters exist, or the client uses the PB protocol but the server does not support the PB protocol.

ServiceTemporaryUnavailableError

N/A

The error message returned because a network error, such as network disconnection, occurs. Try again.

DatahubClientError

All other errors. This error type is the base class of all errors.

The error message returned because the error does not fall in the preceding error types. This type of error can be resolved by retries. However, the number of retries must be limited.

DatahubClientError

DatahubClientError is the basic error type of DataHub. All errors in DataHub are derived based on this error type. All DataHub errors except those in the defined error types are DatahubClientError errors. DatahubClientError errors contain the errors that can be resolved by retries, such as the errors that occur because the server is busy or unavailable. You can add retry logic to your code.

type DatahubClientError struct {
    StatusCode int    `json:"StatusCode"`   // Http status code
    RequestId  string `json:"RequestId"`    // Request-id to trace the request
    Code       string `json:"ErrorCode"`    // DataHub error code
    Message    string `json:"ErrorMessage"` // Error msg of the error code
}

The following sample code provides an example on how to call the error() method:

func example_error() {
    accessId := ""
    accessKey := ""
    endpoint := ""
    projectName := "datahub_go_test"
    maxRetry := 3
    dh := datahub.New(accessId, accessKey, endpoint)
    if err := dh.CreateProject(projectName, "project comment"); err != nil {
        if _, ok := err.(*datahub.InvalidParameterError); ok {
            fmt.Println("invalid parameter,please check your input parameter")
        } else if _, ok := err.(*datahub.ResourceExistError); ok {
            fmt.Println("project already exists")
        } else if _, ok := err.(*datahub.AuthorizationFailedError); ok {
            fmt.Println("accessId or accessKey err,please check your accessId and accessKey")
        } else if _, ok := err.(*datahub.LimitExceededError); ok {
            fmt.Println("limit exceed, so retry")
            for i := 0; i < maxRetry; i++ {
                // wait 5 seconds
                time.Sleep(5 * time.Second)
                if err := dh.CreateProject(projectName, "project comment"); err != nil {
                    fmt.Println("create project failed")
                    fmt.Println(err)
                } else {
                    fmt.Println("create project successful")
                    break
                }
            }
        } else {
            fmt.Println("unknown error")
            fmt.Println(err)
        }
    } else {
        fmt.Println("create project successful")
    }
}