すべてのプロダクト
Search
ドキュメントセンター

DataHub:DataHub SDK for Go

最終更新日:Jun 25, 2025

概要

クイックスタート

用語

詳細については、「用語」をご参照ください。

準備

  • DataHub にアクセスするには、Alibaba Cloud アカウントを使用する必要があります。 DataHub にアクセスするには、AccessKey ID と AccessKey シークレット、および DataHub へのアクセスに使用するエンドポイントを提供する必要があります。

  • Go 用 DataHub SDK パッケージを取得します。

go get -u -insecure github.com/aliyun/aliyun-datahub-sdk-go/datahub
  • DataHub クライアントを初期化します。

Go 用 DataHub SDK のすべての API 操作は、datahub.DataHub メソッドを使用して呼び出す必要があります。そのため、最初に DataHub オブジェクトを初期化する必要があります。 1. デフォルトパラメータを使用して DataHub オブジェクトを作成します。

import "github.com/aliyun/aliyun-datahub-sdk-go/datahub"
accessId := ""
accessKey := ""
endpoint := ""
dh := datahub.New(accessId, accessKey, endpoint)

2. カスタムパラメータを使用して DataHub オブジェクトを作成します。 次の表は、カスタマイズできるパラメータを示しています。

パラメータ

タイプ

デフォルト値

有効な値

説明

UserAgent

string

該当なし

該当なし

ユーザーのエージェント。

CompressorType

CompressorType

NOCOMPRESS

NOCOMPRESS、LZ4、DEFLATE、および ZLIB。 NOCOMPRESS の値は、圧縮が実行されないことを示します。

データ転送中に使用される圧縮形式。

EnableBinary

bool

true

true および false

DataHub からレコードを読み取るか、DataHub にレコードを書き込むときに、Protocol Buffer(PB)プロトコルを使用するかどうかを指定します。 DataHub で PB プロトコルを使用しない場合は、enable_pb パラメータを False に設定する必要があります。

HttpClient

*http.Client

datahub.DefaultHttpClient()

該当なし

詳細については、「net/http」をご参照ください。

endpoint := ""
accessId := ""
accessKey := ""
token := ""
account := datahub.NewAliyunAccount(accessId, accessKey)
// 一時的な AccessKey ベースの認証を実行します。
// account := datahub.NewStsCredential(accessId, accessKey, token)
config := datahub.NewDefaultConfig()
config.CompressorType = datahub.DEFLATE
config.EnableBinary = true;
config.HttpClient = datahub.DefaultHttpClient()
dh := datahub.NewClientWithConfig(endpoint, config, account)
  • Go 用 DataHub SDK では、Go モジュールを使用して依存関係を管理できます。

require (
    github.com/aliyun/aliyun-datahub-sdk-go/datahub v0.1.4
)

オフセット消費のサンプルコード

func OffsetConsume() {
    accessId := ""
    accessKey := ""
    endpoint := "https://dh-cn-hangzhou.aliyuncs.com"
    dh := datahub.New(accessId, accessKey, endpoint)

    projectName := ""
    topicName := ""
    subId := ""
    shardId := "0"
    shardIds := []string{"0"}

    session, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
    if err != nil {
        fmt.Println("サブスクリプションセッションのオープンに失敗しました", err) // サブスクリプションセッションのオープンに失敗しました
        return
    }

    offset := session.Offsets[shardId]
    var gc *datahub.GetCursorResult = nil

    // シーケンス番号が 0 より小さい場合、レコードは消費されません。 // シーケンス番号が0より小さい場合、レコードは消費されません
    if offset.Sequence < 0 {
        // トピックの Time to Live(TTL)内の最初のレコードのカーソルを取得します。 // トピックのTime to Live (TTL) 内の最初のレコードのカーソルを取得します
        gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
        if err != nil {
            fmt.Println("最も古いカーソルの取得に失敗しました", err) // 最も古いカーソルの取得に失敗しました
            return
        }
    } else {
        // 次のレコードのカーソルを取得します。 // 次のレコードのカーソルを取得します
        nextSequence := offset.Sequence + 1
        gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, nextSequence)

        if err != nil {
            // シーケンス番号に基づいてカーソルを取得した後に SeekOutOfRange エラーが返された場合、現在のカーソルのレコードは期限切れになります。 // シーケンス番号に基づいてカーソルを取得した後にSeekOutOfRangeエラーが返された場合、現在のカーソルのレコードは期限切れになります
            if _, ok := err.(*datahub.SeekOutOfRangeError); ok {
                fmt.Println("SeekOutOfRangeError のため、シーケンスによるカーソルの取得は成功しました。再試行します...", err) // SeekOutOfRangeErrorのため、シーケンスによるカーソルの取得は成功しました。再試行します...
                gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
                if err != nil {
                    fmt.Println("最も古いカーソルの取得に失敗しました", err) // 最も古いカーソルの取得に失敗しました
                    return
                }
            }
        }
    }

    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("トピックの取得に失敗しました", err) // トピックの取得に失敗しました
        return
    }

    // レコードを読み取り、オフセットを保存します。たとえば、タプルレコードを読み取り、1,000 レコードを読み取るたびにオフセットを保存します。 // レコードを読み取り、オフセットを保存します。たとえば、タプルレコードを読み取り、1,000レコードを読み取るたびにオフセットを保存します
    recordCount := 0
    limitNum := 100
    cursor := gc.Cursor
    for true {
        gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor, limitNum, topic.RecordSchema)

        if err != nil {
            fmt.Println("レコードの取得に失敗しました", err) // レコードの取得に失敗しました
            break
        }
        if gr.RecordCount == 0 {
            fmt.Println("データがありません。5 秒間スリープします...") // データがありません。5秒間スリープします...
            time.Sleep(time.Second * 5)
            continue
        }

        for _, record := range gr.Records {
            // レコードを表示します。 // レコードを表示します
            data, _ := record.(*datahub.TupleRecord)
            fmt.Println(data.Values)

            recordCount += 1
            // 1,000 レコードごとにオフセットを送信します。 // 1,000レコードごとにオフセットを送信します
            if recordCount%1000 == 0 {
                fmt.Println("オフセットをコミットします", record.GetSequence()) // オフセットをコミットします
                offset.Sequence = record.GetSequence()
                offset.Timestamp = record.GetSystemTime()

                offsetMap := map[string]datahub.SubscriptionOffset{shardId: offset}
                err := dh.CommitSubscriptionOffset(projectName, topicName, subId, offsetMap)
                if err != nil {
                    if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
                        fmt.Println("サブスクリプションがリセットされました。再度開きます...") // サブスクリプションがリセットされました。再度開きます...
                        // オフセットがリセットされます。セッションを再度開く必要があります。 // オフセットがリセットされます。セッションを再度開く必要があります
                        session, err = dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
                        if err != nil {
                            fmt.Println("サブスクリプションセッションの再開に失敗しました", err) // サブスクリプションセッションの再開に失敗しました
                            break
                        }
                        offset = session.Offsets[shardId]
                    } else if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
                        fmt.Println("サブスクリプションは他で使用されています") // サブスクリプションは他で使用されています
                        break
                    } else {
                        fmt.Println("オフセットのコミットに失敗しました", err) // オフセットのコミットに失敗しました
                        break
                    }
                }
                recordCount = 0
            }
        }
        cursor = gr.NextCursor
    }
}

メソッド

プロジェクトの管理

プロジェクトは、DataHub でデータを管理するための基本単位です。 1 つのプロジェクトには複数のトピックが含まれています。 DataHub のプロジェクトは、MaxCompute のプロジェクトとは独立しています。 DataHub で MaxCompute プロジェクトを再利用することはできません。 DataHub でプロジェクトを作成する必要があります。

プロジェクトの作成

説明

CreateProject(projectName, comment string) error

  • パラメータ

    • projectName: プロジェクトの名前。

    • comment: プロジェクトに関するコメント。

  • 戻り値

  • エラー

    • ResourceExistError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func createProjet(dh datahub.DataHub, projectName string) {
    if err := dh.CreateProject(projectName, "project comment"); err != nil { // project comment
        fmt.Println("プロジェクトの作成に失敗しました") // プロジェクトの作成に失敗しました
        fmt.Println(err)
        return
    }
    fmt.Println("作成に成功しました") // 作成に成功しました
}

プロジェクトの削除

DeleteProject メソッドを呼び出して、プロジェクトを削除できます。

説明

DeleteProject(projectName string) error

  • パラメータ

    • projectName: プロジェクトの名前。

  • 戻り値

  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func deleteProject(dh datahub.DataHub, projectName string) {
    if err := dh.DeleteProject("123"); err != nil {
        fmt.Println("プロジェクトの削除に失敗しました") // プロジェクトの削除に失敗しました
        fmt.Println(err)
        return
    }
    fmt.Println("プロジェクトの削除に成功しました") // プロジェクトの削除に成功しました
}

プロジェクトの一覧表示

ListProject メソッドを呼び出して、プロジェクトを一覧表示できます。

説明

ListProject() (*ListProjectResult, error)

  • パラメータ: なし

  • 戻り値

1. type ListProjectResult struct {
2.     ProjectNames []string `json:"ProjectNames"`
3. }
  • エラー

    • AuthorizationFailedError

    • DatahubClientError

  • サンプルコード

func listProject(dh datahub.DataHub, projectName string) {
    lp, err := dh.ListProject()
    if err != nil {
        fmt.Println("プロジェクトリストの取得に失敗しました") // プロジェクトリストの取得に失敗しました
        fmt.Println(err)
        return
    }
    fmt.Println("プロジェクトリストの取得に成功しました") // プロジェクトリストの取得に成功しました
    for _, projectName := range lp.ProjectNames {
        fmt.Println(projectName)
    }
}

プロジェクトの照会

GetProject メソッドを呼び出して、プロジェクトを照会できます。

説明

GetProject(projectName string) (*GetProjectResult, error)

  • パラメータ

    • projectName: プロジェクトの名前。

  • 戻り値

type GetProjectResult struct {
    CreateTime     int64  `json:"CreateTime"`
    LastModifyTime int64  `json:"LastModifyTime"`
    Comment        string `json"Comment"`
}
  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func getProject(dh datahub.DataHub, projectName string) {
    gp, err := dh.GetProject(projectName)
    if err != nil {
        fmt.Println("プロジェクトメッセージの取得に失敗しました") // プロジェクトメッセージの取得に失敗しました
        fmt.Println(err)
        return
    }
    fmt.Println("プロジェクトメッセージの取得に成功しました") // プロジェクトメッセージの取得に成功しました
    fmt.Println(*gp)
}

プロジェクトの更新

説明

UpdateProject(projectName, comment string) error

  • パラメータ

    • projectName: プロジェクトの名前。

    • comment: プロジェクトに関するコメント。

  • 戻り値

  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

1. func updateProject(dh datahub.DataHub, projectName string) {
2.     if err := dh.UpdateProject(projectName, "new project comment"); err != nil { // new project comment
3.         fmt.Println("プロジェクトコメントの更新に失敗しました") // プロジェクトコメントの更新に失敗しました
4.         fmt.Println(err)
5.         return
6.     }
7.     fmt.Println("プロジェクトコメントの更新に成功しました") // プロジェクトコメントの更新に成功しました
8. }

トピックの管理

トピックは、DataHub でのデータのサブスクライブと公開の最小単位です。トピックを使用して、さまざまなタイプのストリーミングデータを区別できます。 2 つのタイプのトピック(タプルと BLOB)がサポートされています。タプルトピックには、データベースのデータレコードに似たレコードが含まれています。各レコードには複数の列が含まれています。バイナリデータのブロックを BLOB トピックにレコードとして書き込むことができます。

トピックの作成

タプルトピック

説明

CreateTupleTopic(projectName, topicName, comment string, shardCount, lifeCycle int, recordSchema *RecordSchema) error

タプルトピックに書き込まれるデータは特定の形式です。タプルトピックのレコードスキーマを指定する必要があります。次の表は、サポートされているデータ型を示しています。

タイプ

説明

値の範囲

BIGINT

8 バイトの符号付き整数。

-9223372036854775807 ~ 9223372036854775807。

DOUBLE

倍精度浮動小数点数。長さは 8 バイトです。

-1.0 _10^308 ~ 1.0 _10^308。

BOOLEAN

ブール型。

True および False、true および false、または 0 および 1。

TIMESTAMP

タイムスタンプのタイプ。

マイクロ秒単位の精度のタイムスタンプ。

STRING

文字列。 UTF-8 エンコーディングのみがサポートされています。

STRING 型の列のすべての値のサイズは 2 MB を超えることはできません。

DECIMAL

s

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • comment: トピックに関するコメント。

    • lifeCycle: データの TTL。単位:日。この時間より前に書き込まれたデータにはアクセスできません。

    • recordSchema: このトピックのレコードスキーマ。

  • 戻り値

  • エラー

    • ResourceExistError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func Example_CreateTupleTopic(dh datahub.DataHub, projectName, topicName string) {
    // レコードスキーマを新規作成します。
    recordSchema := datahub.NewRecordSchema()
    recordSchema.AddField(datahub.Field{Name: "bigint_field", Type: datahub.BIGINT, AllowNull: true}).
        AddField(datahub.Field{Name: "timestamp_field", Type: datahub.TIMESTAMP, AllowNull: false}).
        AddField(datahub.Field{Name: "string_field", Type: datahub.STRING}).
        AddField(datahub.Field{Name: "double_field", Type: datahub.DOUBLE}).
        AddField(datahub.Field{Name: "boolean_field", Type: datahub.BOOLEAN})
    // トピックを作成します。
    if err := dh.CreateTupleTopic(projectName, topicName, "topic comment", 5, 7, recordSchema); err != nil {
        // トピックの作成に失敗しました。
        fmt.Println("create topic failed")
        fmt.Println(err)
        return
    }
    // トピックの作成に成功しました。
    fmt.Println("create topic successful")
}

BLOB トピック

説明

CreateBlobTopic(projectName, topicName, comment string, shardCount, lifeCycle int) エラー

  • パラメーター

    • projectName: プロジェクト名。

    • topicName: トピック名。

    • comment: トピックに関するコメント。

    • lifeCycle: データの TTL。単位:日。その時間より前に書き込まれたデータにはアクセスできません。

  • 戻り値

  • エラー

    • ResourceExistError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func Example_CreateBlobTopic(dh datahub.DataHub, projectName, topicName string) {
    // トピックコメント
    if err := dh.CreateBlobTopic(projectName, topicName, "topic comment", 5, 7); err != nil {
        fmt.Println("create topic failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create topic successful")
}

トピックの削除

説明

DeleteTopic(projectName, topicName string) エラー

  • パラメーター

    • projectName: プロジェクト名。

    • topicName: トピック名。

  • 戻り値

  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func ExampleDataHub_DeleteTopic(dh datahub.DataHub, projectName, topicName string) {
    // トピック削除に失敗した場合
    if err := dh.DeleteTopic(projectName, topicName); err != nil {
        fmt.Println("delete failed")
        fmt.Println(err)
        return
    }
    // トピック削除に成功した場合
    fmt.Println("delete successful")
}

トピックの一覧

説明

ListTopic(projectName string) (*ListTopicResult, error) プロジェクト名に基づいてトピックを一覧表示します。

  • パラメーター

    • projectName: プロジェクトの名前。

  • 戻り値

type ListTopicResult struct {
    TopicNames [] string `json:"TopicNames"` // トピック名
}
  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプル コード

func ExampleDataHub_ListTopic(dh datahub.DataHub, projectName, topicName string) {
    lt, err := dh.ListTopic(projectName)
    if err != nil {
        // トピックリストの取得に失敗しました
        fmt.Println("get topic list failed")
        fmt.Println(err)
        return
    }
    // トピックリストの取得に成功しました
    fmt.Println("get topic list successful")
    fmt.Println(lt)
}

トピックの更新

説明

UpdateTopic(projectName、topicName、comment string) エラー

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • comment: トピックに関するコメント。

  • 戻り値

  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func ExampleDataHub_UpdateTopic(dh datahub.DataHub, projectName, topicName string) {
    // トピックコメントの更新に失敗しました
    if err := dh.UpdateTopic(projectName, topicName, "new topic comment"); err != nil {
        fmt.Println("update topic comment failed")
        fmt.Println(err)
        return
    }
    // トピックコメントの更新に成功しました
    fmt.Println("update topic comment successful")
}

スキーマの管理

スキーマは、DataHub におけるデータ ストレージの名前と種類を示します。スキーマは、タプル トピックを作成し、トピックからレコードを読み書きするときに使用されます。タプル トピックのデータは、ネットワーク経由で文字列として送信されます。そのため、データ型の変換にはスキーマが必要です。スキーマは、フィールド オブジェクトのスライスです。フィールドには、フィールド名、フィールド タイプ、ブール値の 3 つの パラメーターが含まれています。値が True の場合は、フィールドを空のままにすることができます。値が False の場合は、フィールド値を指定する必要があります。

スキーマをクエリする

get_topic メソッドを呼び出して、作成されたタプル トピックのスキーマに関する情報を取得できます。

  • サンプルコード

func getSchema(dh datahub.DataHub, projectName, topicName string) {
    gt, err := dh.GetTopic(projectName, "topic_test")
    if err != nil {
        // トピックの取得に失敗しました
        fmt.Println("get topic failed")
        fmt.Println(err)
        return
    } else {
        // スキーマを取得
        schema := gt.RecordSchema
        fmt.Println(schema)
    }
}

スキーマを定義する

タプル トピックを作成するには、スキーマを定義する必要があります。スキーマを初期化するには、次の方法を使用できます。

  • スキーマを作成する

func createSchema1() {
    fields := []datahub.Field{
        {"bigint_field", datahub.BIGINT, true}, // bigint_field を定義します。必須フィールドです。
        {"timestamp_field", datahub.TIMESTAMP, false}, // timestamp_field を定義します。
        {"string_field", datahub.STRING, false}, // string_field を定義します。
        {"double_field", datahub.DOUBLE, false}, // double_field を定義します。
        {"boolean_field", datahub.BOOLEAN, true}, // boolean_field を定義します。必須フィールドです。
        {"decimal_field", datahub.DECIMAL, false}, // decimal_field を定義します。
    }
    schema := datahub.RecordSchema{
        fields,
    }
    fmt.Println(schema)
}
  • スキーマパラメーターを 1 つずつ設定します

func createSchema2() {
    recordSchema := datahub.NewRecordSchema()
    recordSchema.AddField(datahub.Field{Name: "bigint_field", Type: datahub.BIGINT, AllowNull: true}). // bigint_field を追加します。
        AddField(datahub.Field{Name: "timestamp_field", Type: datahub.TIMESTAMP, AllowNull: false}). // timestamp_field を追加します。
        AddField(datahub.Field{Name: "string_field", Type: datahub.STRING}). // string_field を追加します。
        AddField(datahub.Field{Name: "double_field", Type: datahub.DOUBLE}). // double_field を追加します。
        AddField(datahub.Field{Name: "boolean_field", Type: datahub.BOOLEAN}). // boolean_field を追加します。
        AddField(datahub.Field{Name: "decimal_field", Type: datahub.DECIMAL}) // decimal_field を追加します。
}
  • JSON 文字列を使用してスキーマを定義する

func createSchema3() {
    str := ""
    schema, err := datahub.NewRecordSchemaFromJson(str)
    if err != nil {
        // recordSchemaの作成に失敗しました
        fmt.Println("create recordSchema failed")
        fmt.Println(err)
        return
    }
    // recordSchemaの作成に成功しました
    fmt.Println("create recordSchema successful")
    fmt.Println(schema)
}

JSON 文字列は次の形式です。"{"fields":[{"type":"BIGINT","name":"a"},{"type":"STRING","name":"b"}]}"。

シャードの管理

シャードは、トピックでのデータ送信に使用される同時トンネルです。各シャードには ID があります。シャードはさまざまな状態になる可能性があります。Opening: シャードが開始されています。Active: シャードが開始され、サービスの提供に使用できます。アクティブな各シャードはサーバーリソースを消費します。必要に応じてシャードを作成することをお勧めします。シャードをマージおよび分割できます。データ量が増加した場合は、シャードを分割してデータ送信用のトンネルを増やすことができます。これにより、トピックへの同時データ書き込みが増加します。データ量が減少した場合は、シャードをマージしてサーバーリソースを節約できます。たとえば、大規模なオンラインプロモーションでデータ量が急激に増加すると、各シャードにデータ書き込みが過負荷になります。この場合は、シャードを分割してデータ書き込みの効率を向上させることができます。オンラインプロモーションが終了すると、データ量が減少します。この場合は、シャードをマージできます。

シャードの一覧表示

説明

ListShard(projectName, topicName string) (*ListShardResult, error) // プロジェクト名とトピック名を指定してシャード一覧を取得します。

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

  • 戻り値

1. type SplitShardResult struct {
2.     NewShards []ShardEntry `json:"NewShards"` // 新しいシャード
3. }
  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func ExampleDataHub_ListShard() {
    // シャードリストの取得に失敗しました
    ls, err := dh.ListShard(projectName, topicName)
    if err != nil {
        fmt.Println("get shard list failed")
        fmt.Println(err)
        return
    }
    // シャードリストの取得に成功しました
    fmt.Println("get shard list successful")
    for _, shard := range ls.Shards {
        fmt.Println(shard)
    }
}

シャードを分割する

アクティブなシャードのみを分割できます。シャードを分割すると、2 つの新しいシャードが生成され、元のシャードは閉じられます。シャードを分割するときは、分割キーが必要です。SplitShard メソッドを呼び出すと、DataHub は自動的に分割キーを生成します。または、SplitShardWithSplitKey メソッドを呼び出して、特定の要件を満たす分割キーを指定することもできます。詳細については、「用語」トピックのシャードの管理の用語をご参照ください。

説明

SplitShard(projectName, topicName, shardId string) (SplitShardResult, error) SplitShardWithSplitKey(projectName, topicName, shardId, splitKey string) (SplitShardResult, error) // シャードを分割します。

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • shardId: 分割対象のシャードの ID。

    • splitKey: シャードの分割に使用する分割キー。

  • 戻り値

type SplitShardResult struct {
    NewShards []ShardEntry `json:"NewShards"` // 新しいシャード
}
  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプル コード

func ExampleDataHub_SplitShard() {
    // 分割するシャードの ID。
    shardId := "0"
    ss, err := dh.SplitShard(projectName, topicName, shardId)
    if err != nil {
        fmt.Println("split shard failed")
        fmt.Println(err)
        return
    }
    fmt.Println("split shard successful")
    fmt.Println(ss)
    // 分割後、関連操作を実行する前に、すべてのシャードの状態が準備完了になるまで待機する必要があります。
    dh.WaitAllShardsReady(projectName, topicName)
}

シャードのマージ

マージされる 2 つのシャードは、隣接しており、アクティブである必要があります。

説明

MergeShard(projectName, topicName, shardId, adjacentShardId string) (*MergeShardResult, error) projectName、topicName、shardId、adjacentShardId を指定してシャードをマージします。

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • shardId: マージするシャードの ID。

    • adjacentShardId: 指定したシャードに隣接するシャードの ID。

  • サンプルコード

  • 戻り値

type MergeShardResult struct {
    ShardId      string `json:"ShardId"` // シャードID
    BeginHashKey string `json:"BeginHashKey"` // 開始ハッシュキー
    EndHashKey   string `json:"EndHashKey"` // 終了ハッシュキー
}
  • エラー

    • ResourceNotFoundError

    • InvalidOperationError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

    • ShardSealedError

func ExampleDataHub_MergeShard() {
    shardId := "3"
    adjacentShardId := "4"
    ms, err := dh.MergeShard(projectName, topicName, shardId, adjacentShardId)
    if err != nil {
        fmt.Println("merge shard failed")
        fmt.Println(err)
        return
    }
    fmt.Println("merge shard successful")
    fmt.Println(ms)
    // 分割後、関連する操作を実行する前に、すべてのシャードの状態が準備完了になるまで待機する必要があります。
    dh.WaitAllShardsReady(projectName, topicName)
}

レコードの公開とサブスクライブ

アクティブなシャードとクローズされたシャードのレコードをサブスクライブできます。ただし、レコードをパブリッシュできるのはアクティブなシャードのみです。クローズされたシャードにレコードをパブリッシュすると、ShardSealedError エラーが報告されます。クローズされたシャードから読み取ったレコードが最後まで到達した場合も、ShardSealedError エラーが報告されます。これは、新しいレコードが存在しないことを示します。

レコードを公開する

特定のトピックにレコードをパブリッシュする場合、各レコードに対してこのトピック内のシャードを指定する必要があります。そのため、ListShard メソッドを呼び出して、このトピック内のシャードを表示する必要があります。PutRecords メソッドを呼び出した後、レコードのパブリッシュが失敗したかどうかを確認します。

説明

PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) // PutRecords は、複数のレコードをトピックに書き込みます。 PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) error // PutRecordsByShard は、複数のレコードを特定のシャードに書き込みます。

DataHub V2.12 以降では、PutRecordsByShard メソッドを呼び出すことができます。 V2.12 より前のバージョンでは、PutRecords メソッドを呼び出します。

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • shardId: シャードの ID。

    • records: 書き込むレコードのリスト。

  • 戻り値

type PutRecordsResult struct {
    FailedRecordCount int            `json:"FailedRecordCount"` // 失敗したレコードの数
    FailedRecords     []FailedRecord `json:"FailedRecords"`     // 失敗したレコード
}
  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプル コード

// タプルデータの書き込み
func putTupleData() {
    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("トピックの取得に失敗しました")
        fmt.Println(err)
        return
    }
    fmt.Println("トピックの取得に成功しました")
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record1.ShardId = "0"
    record1.SetValueByName("field1", "TEST1")
    record1.SetValueByName("field2", 1)
    // レコードを書き込む際に属性を追加できます
    record1.SetAttribute("attribute", "test attribute")
    records[0] = record1
    record2 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record2.ShardId = "1"
    record2.SetValueByName("field1", datahub.String("TEST2"))
    record2.SetValueByName("field2", datahub.Bigint(2))
    records[1] = record2
    record3 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record3.ShardId = "2"
    record3.SetValueByName("field1", datahub.String("TEST3"))
    record3.SetValueByName("field2", datahub.Bigint(3))
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        result, err := dh.PutRecords(projectName, topicName, records)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("qps 制限を超えている可能性があります。再試行します")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("レコードの書き込みに失敗しました")
                fmt.Println(err)
                return
            }
        }
        fmt.Printf("書き込みに成功した数は %d、書き込みに失敗したレコード数は %d です\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
        for _, v := range result.FailedRecords {
            fmt.Println(v)
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("レコードの書き込みに失敗しました")
    }
}
// BLOB データの書き込み
func putBlobData() {
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
    record1.ShardId = "0"
    records[0] = record1
    record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
    record2.ShardId = "1"
    record2.SetAttribute("attribute", "test attribute")
    records[1] = record2
    record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
    record3.ShardId = "2"
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        result, err := dh.PutRecords(projectName, blobTopicName, records)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("qps 制限を超えている可能性があります。再試行します")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("レコードの書き込みに失敗しました")
                fmt.Println(err)
                return
            }
        }
        fmt.Printf("書き込みに成功した数は %d、書き込みに失敗したレコード数は %d です\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
        for _, v := range result.FailedRecords {
            fmt.Println(v)
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("レコードの書き込みに失敗しました")
    }
}
// シャードごとのデータ書き込み
func putDataByShard() {
    shardId := "0"
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
    records[0] = record1
    record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
    record2.SetAttribute("attribute", "test attribute")
    records[1] = record2
    record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        if err := dh.PutRecordsByShard(projectName, blobTopicName, shardId, records); err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("qps 制限を超えている可能性があります。再試行します")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("レコードの書き込みに失敗しました")
                fmt.Println(err)
                return
            }
        }
    }
    if retryNum >= maxReTry {
        fmt.Printf("レコードの書き込みに失敗しました")
    }else {
        fmt.Println("レコードの書き込みに成功しました")
    }
}

レコードを公開するときに、レコード収集シナリオに関する情報など、レコード関連の情報を追加できます。次のサンプルコードは、レコード関連の情報を追加する方法の例を示しています。

record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
// test attribute を設定します。
record1.SetAttribute("attribute","test attribute")
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
// test attribute を設定します。
record2.SetAttribute("attribute","test attribute")

レコードのサブスクライブ

トピックのレコードをサブスクライブするには、シャードとレコードを読み取るためのカーソルも指定する必要があります。 GetCursor メソッドを呼び出してカーソルを取得できます。

説明

GetCursor(projectName, topicName, shardId string, ctype CursorType, param …int64) (*GetCursorResult, error) // 指定されたプロジェクト、トピック、シャード、カーソルタイプ、およびパラメータに基づいてカーソルを取得します。

  • パラメーター

    • projectName: プロジェクト名。

    • topicName: トピック名。

    • shardId: シャードの ID。

    • ctype: カーソルを取得するために使用されるメソッド。 OLDEST、LATEST、SEQUENCE、および SYSTEM_TIME のメソッドを使用してカーソルを取得できます。

      • OLDEST: 指定されたシャード内の最も古い有効なレコードを指すカーソル。

      • LATEST: 指定されたシャード内の最新のレコードを指すカーソル。

      • SEQUENCE: 指定されたシーケンス番号のレコードを指すカーソル。

      • SYSTEM_TIME: タイムスタンプ値が指定されたタイムスタンプ値以上である最初のレコードを指すカーソル。

    • param: SEQUENCE または SYSTEM_TIME メソッドを使用する場合に、カーソルを取得するために使用されるパラメーター。

  • 戻り値

type GetCursorResult struct {
    Cursor     string `json:"カーソル"` // カーソル
    RecordTime int64  `json:"レコード時間"` // レコード時間
    Sequence   int64  `json:"シーケンス"` // シーケンス
}
  • エラー

    • ResourceNotFoundError

    • SeekOutOfRangeError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

    • ShardSealedError

  • サンプルコード

func cursor(dh datahub.DataHub, projectName, topicName string) {
    shardId := "0"
    gr, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
    if err != nil {
        // カーソル取得失敗
        fmt.Println("get cursor failed")
        fmt.Println(err)
    }else{
        fmt.Println(gr)
    }
    gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.LATEST)
    fmt.Println(err)
    fmt.Println(gr)
    var seq int64 = 10
    gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, seq)
    if err != nil {
        // カーソル取得失敗
        fmt.Println("get cursor failed")
        fmt.Println(err)
    }else{
        fmt.Println(gr)
    }
}

指定したシャードからレコードを読み取るには、レコードの読み取りを開始するカーソルと、読み取るレコードの最大数を指定する必要があります。指定したカーソルからシャードの末尾までのレコード数が最大値より少ない場合は、実際に読み取られたレコードが返されます。

タプル トピックからレコードを読み取る

説明

GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema _RecordSchema) (_GetRecordsResult, error) //プロジェクト名、トピック名、シャード ID、カーソル文字列、制限、レコードスキーマを指定してタプルレコードを取得します。

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • shardId: シャードの ID。

    • cursor: データの読み取りを開始するカーソル。

    • limit: 読み取るレコードの最大数。

    • recordSchema: トピックのレコードスキーマ。

  • 戻り値

type GetRecordsResult struct {
    NextCursor    string        `json:"NextCursor"`
    RecordCount   int           `json:"RecordCount"`
    StartSequence int64         `json:"StartSeq"`
    Records       []IRecord     `json:"Records"`
    RecordSchema  *RecordSchema `json:"-"` // レコードスキーマ
}
  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func getTupleData() {
    shardId := "1"
    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        // トピックの取得に失敗しました
        fmt.Println("get topic failed")
        return
    }
    // トピックの取得に成功しました
    fmt.Println("get topic successful")
    cursor, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
    if err != nil {
        // カーソルの取得に失敗しました
        fmt.Println("get cursor failed")
        fmt.Println(err)
        return
    }
    // カーソルの取得に成功しました
    fmt.Println("get cursor successful")
    limitNum := 100
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor.Cursor, limitNum, topic.RecordSchema)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                // qps 制限を超えている可能性があります。再試行します
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                // レコードの取得に失敗しました
                fmt.Println("get record failed")
                fmt.Println(err)
                return
            }
        }
        // レコードの取得に成功しました
        fmt.Println("get record successful")
        for _, record := range gr.Records {
            data, ok := record.(*datahub.TupleRecord)
            if !ok {
                // レコードタイプが TupleRecord ではありません。
                fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
            } else {
                fmt.Println(data.Values)
            }
        }
        break
    }
    if retryNum >= maxReTry {
        // レコードの取得に失敗しました
        fmt.Printf("get records failed ")
    }
}

BLOB トピックからレコードを読み取る

説明

GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error) プロジェクト名、トピック名、シャード ID、カーソル文字列、制限整数を受け取り、GetRecordsResult とエラーを返します。

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • shardId: シャードの ID。

    • cursor: データの読み取りを開始するカーソル。

    • limit: 読み取るレコードの最大サイズ。

  • 戻り値

type GetRecordsResult struct {
    NextCursor    string        `json:"NextCursor"`
    RecordCount   int           `json:"RecordCount"`
    StartSequence int64         `json:"StartSeq"`
    Records       []IRecord     `json:"Records"`
    RecordSchema  *RecordSchema `json:"-"` // レコードスキーマ
}
  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプル コード

func getBlobData() {
    shardId := "1"
    cursor, err := dh.GetCursor(projectName, blobTopicName, shardId, datahub.OLDEST)
    if err != nil {
        // カーソル取得失敗
        fmt.Println("get cursor failed")
        fmt.Println(err)
        return
    }
    // カーソル取得成功
    fmt.Println("get cursor successful")
    limitNum := 100
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        gr, err := dh.GetBlobRecords(projectName, blobTopicName, shardId, cursor.Cursor, limitNum)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                // qps制限を超えている可能性があります。再試行します。
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                // レコード取得失敗
                fmt.Println("get record failed")
                fmt.Println(err)
                return
            }
        }
        // レコード取得成功
        fmt.Println("get record successful")
        for _, record := range gr.Records {
            data, ok := record.(*datahub.BlobRecord)
            if !ok {
                // レコードタイプが TupleRecord ではありません。
                fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
            } else {
                fmt.Println(data.StoreData)
            }
        }
        break
    }
    if retryNum >= maxReTry {
        // レコード取得失敗
        fmt.Printf("get records failed ")
    }
}

メーターリング情報のクエリ

メータリング情報は、シャードのリソース使用状況に関する統計情報を指し、1時間ごとに更新されます。

説明

GetMeterInfo(projectName, topicName, shardId string) (*GetMeterInfoResult, error) プロジェクト名、トピック名、シャード ID を指定してメーター情報を取得します。

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • shardId: シャードの ID。

  • 戻り値

type GetMeterInfoResult struct {
    ActiveTime int64 `json:"ActiveTime"` // アクティブ時間
    Storage    int64 `json:"Storage"`    // ストレージ
}
  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func meter(dh datahub.DataHub, projectName, topicName string) {
    shardId := "0"
    gmi, err := dh.GetMeterInfo(projectName, topicName, shardId)
    if err != nil {
        // メーター情報の取得に失敗しました
        fmt.Println("get meter information failed")
        return
    }
    // メーター情報の取得に成功しました
    fmt.Println("get meter information successful")
    fmt.Println(gmi)
}

DataConnectors の管理

DataHub の DataConnector は、ストリーミングデータを DataHub から他のクラウドサービスに同期します。DataConnector を使用すると、DataHub トピックから MaxCompute、オブジェクトストレージサービス(OSS)、Elasticsearch、AnalyticDB for MySQL、ApsaraDB RDS for MySQL、Function Compute、Tablestore、および DataHub に、リアルタイムまたはニアリアルタイムモードでデータを同期できます。DataConnector を構成すると、DataHub に書き込んだデータを他の Alibaba Cloud サービスで使用できます。次の例では、DataConnector を使用して DataHub から MaxCompute にデータを同期する方法について説明します。MaxCompute の構成の詳細については、「MaxCompute へのデータの同期」をご参照ください。DataHub V2.14.0 以降では、createConnector メソッドを除くすべての DataConnector 関連メソッドで、connectorType パラメーターが connectorId パラメーターに置き換えられています。ただし、connectorType パラメーターの値が文字列に変換されている場合は、DataHub V2.14.0 以降でも、バージョンが V2.14.0 より前の DataHub のメソッドを呼び出すことができます。

  • サンプルコード

1. gcr, err := dh.GetConnector(projectName, topicName, string(datahub.SinkOdps)) // プロジェクト名、トピック名、およびシンクタイプに基づいてコネクタを取得します。

DataConnector を作成する

説明

CreateConnector(projectName, topicName string, cType ConnectorType, columnFields []string, config interface{}) (CreateConnectorResult, error) /* コネクタを作成します */ CreateConnectorWithStartTime(projectName, topicName string, cType ConnectorType, columnFields []string, sinkStartTime int64, config interface{}) (CreateConnectorResult, error) /* 開始時刻を指定してコネクタを作成します */

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • cType: 作成する DataConnector の種類。

    • columnFields: 同期するフィールド。

    • sinkStartTime: 同期の開始時刻。

    • config: 指定された種類の DataConnector の構成の詳細。

  • 戻り値

type CreateConnectorResult struct {
    ConnectorId string `json:"ConnectorId"` // コネクタ ID
 }
  • エラー

    • ResourceNotFoundError

    • ResourceExistError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func createConnector(dh datahub.DataHub, projectName, topicName string) {
    odpsEndpoint := ""
    odpsProject := "datahub_test"
    odpsTable := "datahub_go_example"
    odpsAccessId := ""
    odpsAccessKey := "="
    odpsTimeRange := 60
    odpsPartitionMode := datahub.SystemTimeMode
    connectorType := datahub.SinkOdps
    odpsPartitionConfig := datahub.NewPartitionConfig()
    odpsPartitionConfig.AddConfig("ds", "%Y%m%d")
    odpsPartitionConfig.AddConfig("hh", "%H")
    odpsPartitionConfig.AddConfig("mm", "%M")
    sinkOdpsConfig := datahub.SinkOdpsConfig{
        Endpoint:        odpsEndpoint,
        Project:         odpsProject,
        Table:           odpsTable,
        AccessId:        odpsAccessId,
        AccessKey:       odpsAccessKey,
        TimeRange:       odpsTimeRange,
        PartitionMode:   odpsPartitionMode,
        PartitionConfig: *odpsPartitionConfig,
    }
    fileds := []string{"field1", "field2"}
    if err := dh.CreateConnector(projectName, topicName, connectorType, fileds, *sinkOdpsConfig); err != nil {
        fmt.Println("create odps connector failed") // ODPS コネクタの作成に失敗しました
        fmt.Println(err)
        return
    }
    fmt.Println("create odps connector successful") // ODPS コネクタの作成に成功しました
}

DataConnectors の一覧表示

説明

ListConnector(projectName, topicName string) (*ListConnectorResult, error) // コネクタの一覧を取得します。

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

  • 戻り値

type ListConnectorResult struct {
    ConnectorIds []string `json:"Connectors"` // コネクタの ID のリスト
}
  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func listConnector(dh datahub.DataHub, projectName, topicName string) {
    lc, err := dh.ListConnector(projectName, topicName)
    if err != nil {
        // コネクタ一覧の取得に失敗しました
        fmt.Println("get connector list failed")
        fmt.Println(err)
        return
    }
    // コネクタ一覧の取得に成功しました
    fmt.Println("get connector list successful")
    fmt.Println(lc)
}

DataConnector のクエリ

説明

GetConnector(projectName, topicName, connectorId string) (*GetConnectorResult, error) コネクタを取得します

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • connectorId: DataConnector の ID。

  • 戻り値

type GetConnectorResult struct {
    CreateTime     int64             `json:"CreateTime"`
    LastModifyTime int64             `json:"LastModifyTime"`
    ConnectorId    string            `json:"ConnectorId"`
    ClusterAddress string            `json:"ClusterAddress"`
    Type           ConnectorType     `json:"Type"`
    State          ConnectorState    `json:"State"`
    ColumnFields   []string          `json:"ColumnFields"`
    ExtraConfig    map[string]string `json:"ExtraInfo"`
    Creator        string            `json:"Creator"`
    Owner          string            `json:"Owner"`
    Config         interface{}       `json:"Config"`
}
  • エラー

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func getConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
    gcr, err := dh.GetConnector(projectName, topicName, connectorId)
    if err != nil {
        fmt.Println("ODPS コネクタの取得に失敗しました") // Translated comment
        fmt.Println(err)
        return
    }
    fmt.Println("ODPS コネクタの取得に成功しました") // Translated comment
    fmt.Println(*gcr)
}

DataConnector 構成の更新

説明

UpdateConnector(projectName, topicName, connectorId string, config interface{}) error コネクタを更新します

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • connectorId: DataConnector の ID。

    • config: 指定されたタイプの DataConnector の構成の詳細。

  • 戻り値

  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func updateConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
    gc, err := dh.GetConnector(projectName, topicName, connectorId)
    if err != nil {
        fmt.Println("get odps connector failed")
        fmt.Println(err)
        return
    }
    config, ok := gc.Config.(datahub.SinkOdpsConfig)
    if !ok {
        fmt.Println("convert config to SinkOdpsConfig failed")
        return
    }
    // configを変更します
    config.TimeRange = 200
    if err := dh.UpdateConnector(projectName, topicName, connectorId, config); err != nil {
        fmt.Println("update odps config failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update odps config successful")
}

データコネクタの削除

説明

DeleteConnector(projectName、topicName、connectorId string) エラー

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • connectorId: DataConnector の ID。

  • 戻り値

  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

 func deleteConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
     if err := dh.DeleteConnector(projectName, topicName, connectorId); err != nil {
         fmt.Println("delete odps connector failed") // ODPS コネクタの削除に失敗しました
         fmt.Println(err)
         return
     }
    fmt.Println("delete odps connector successful") // ODPS コネクタの削除に成功しました
 }

DataConnector のシャードステータスをクエリする

トピック内のすべてのシャードのステータス、またはトピック内の指定されたシャードのステータスをクエリできます。

説明

GetConnectorShardStatus(projectName, topicName, connectorId string) (GetConnectorShardStatusResult, error) コネクタシャードステータスを取得します。 GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId string) (ConnectorShardStatusEntry, error) シャード ID 別にコネクタシャードステータスを取得します。

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • shardId: シャードの ID。

    • connectorId: DataConnector の ID。

  • 戻り値

// getConnectorShardStatus
type GetConnectorShardStatusResult struct {
    ShardStatus map[string]ConnectorShardStatusEntry `json:"ShardStatusInfos"`
}
// GetConnectorShardStatusByShard
type ConnectorShardStatusEntry struct {
    StartSequence    int64               `json:"StartSequence"`
    EndSequence      int64               `json:"EndSequence"`
    CurrentSequence  int64               `json:"CurrentSequence"`
    CurrentTimestamp int64               `json:"CurrentTimestamp"`
    UpdateTime       int64               `json:"UpdateTime"`
    State            ConnectorShardState `json:"State"`
    LastErrorMessage string              `json:"LastErrorMessage"`
    DiscardCount     int64               `json:"DiscardCount"`
    DoneTime         int64               `json:"DoneTime"`
    WorkerAddress    string              `json:"WorkerAddress"`
}
  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func getConnectorShardStatus(dh datahub.DataHub, projectName, topicName, connectorId string) {
    gcs, err := dh.GetConnectorShardStatus(projectName, topicName, connectorId)
    if err != nil {
        fmt.Println("コネクタシャードステータスの取得に失敗しました") // Translated comment
        fmt.Println(err)
        return
    }
    fmt.Println("コネクタシャードステータスの取得に成功しました") // Translated comment
    for shard, status := range gcs.ShardStatus {
        fmt.Println(shard, status.State)
    }
    shardId := "0"
    gc, err := dh.GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId)
    if err != nil {
        fmt.Println("コネクタシャードステータスの取得に失敗しました") // Translated comment
        fmt.Println(err)
        return
    }
    fmt.Println("コネクタシャードステータスの取得に成功しました") // Translated comment
    fmt.Println(*gc)
}

DataConnector のシャードを再起動する

トピック内のすべてのシャードを再起動するか、トピック内の指定されたシャードを再起動できます。

説明

ReloadConnector(projectName, topicName, connectorId string) エラー ReloadConnectorByShard(projectName, topicName, connectorId, shardId string) エラー

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • connectorId: DataConnector の ID。

    • shardId: シャードの ID。

  • 戻り値

  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func reloadConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
    if err := dh.ReloadConnector(projectName, topicName, connectorId); err != nil {
        fmt.Println("コネクタシャードの再読み込みに失敗しました") // Translated comment
        fmt.Println(err)
        return
    }
    fmt.Println("コネクタシャードの再読み込みに成功しました") // Translated comment
    shardId := "2"
    if err := dh.ReloadConnectorByShard(projectName, topicName, connectorId, shardId); err != nil {
        fmt.Println("コネクタシャードの再読み込みに失敗しました") // Translated comment
        fmt.Println(err)
        return
    }
    fmt.Println("コネクタシャードの再読み込みに成功しました") // Translated comment
}

フィールドを追加

DataConnector を使用して同期するフィールドを追加できます。ただし、DataHub トピックと MaxCompute テーブルの両方に指定されたフィールドが含まれている必要があります。

説明

AppendConnectorField(projectName、topicName、connectorId、fieldName string) エラー

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • connectorId: DataConnector の ID。

    • fieldName: フィールドの名前。

  • 戻り値

  • エラー

    • ResourceNotFoundError

    • InvalidParameterError

  • サンプルコード

func appendConnectorField(dh datahub.DataHub, projectName, topicName, connectorId string) {
    if err := dh.AppendConnectorField(projectName, topicName, connectorId, "field2"); err != nil {
        fmt.Println("フィールドの追加に失敗しました") // Translated comment
        fmt.Println(err)
        return
    }
    fmt.Println("フィールドの追加に成功しました") // Translated comment
}

DataConnector のステータスを更新する

DataConnector は、CONNECTOR_PAUSED または CONNECTOR_RUNNING 状態になる可能性があります。これは、DataConnector が停止しているか実行中であるかを示します。

説明

UpdateConnectorState(projectName、topicName、connectorId string、state ConnectorState) エラー

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • connectorId: DataConnector の ID。

    • state: 更新する DataConnector の状態。

  • 戻り値

  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func updateConnectorState(dh datahub.DataHub, projectName, topicName, connectorId string) {
    // コネクタの状態の更新に失敗しました
    if err := dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorStopped); err != nil {
        fmt.Println("update connector state failed")
        fmt.Println(err)
        return
    }
    // コネクタの状態の更新に成功しました
    fmt.Println("update connector state successful")
    // コネクタの状態の更新に失敗しました
    if err := dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorRunning); err != nil {
        fmt.Println("update connector state failed")
        fmt.Println(err)
        return
    }
    // コネクタの状態の更新に成功しました
    fmt.Println("update connector state successful")
}

DataConnector のオフセットを更新する

説明

UpdateConnectorOffset(projectName、topicName、connectorId、shardId string、offset ConnectorOffset) エラー

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • shardId: シャードの ID。

    • connectorId: DataConnector の ID。

    • offset: DataConnector のオフセット。

  • 戻り値

  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func updateConnectorOffset(dh datahub.DataHub, projectName, topicName, connectorId string) {
    shardId := "10"
    offset := datahub.ConnectorOffset{
        Timestamp: 1565864139000,
        Sequence:  104,
    }
    dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorStopped)
    defer dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorRunning)
    if err := dh.UpdateConnectorOffset(projectName, topicName, connectorId, shardId, offset); err != nil {
        fmt.Println("コネクタオフセットの更新に失敗しました") // 翻訳: update connector offset failed
        fmt.Println(err)
        return
    }
    fmt.Println("コネクタオフセットの更新に成功しました") // 翻訳: update connector offset successful
}

DataConnector の完了時間のクエリ

DataConnector が DataHub から MaxCompute にデータを同期するために使用されている場合にのみ、DataConnector の完了時間をクエリできます。

説明

GetConnectorDoneTime(projectName, topicName, connectorId string) (*GetConnectorDoneTimeResult, error) プロジェクト名、トピック名、コネクタ ID を指定して、コネクタの完了時刻を取得します。

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • connectorId: DataConnector の ID。

  • 戻り値

1. type GetConnectorDoneTimeResult struct { // GetConnectorDoneTimeResult 型の構造体
2.     DoneTime int64 `json:"DoneTime"`
3. }
  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func doneTime(dh datahub.DataHub, projectName, topicName, connectorId string) {
    // コネクタの完了時刻を取得します。
    gcd, err := dh.GetConnectorDoneTime(projectName, topicName, connectorId)
    if err != nil {
        // コネクタの完了時刻の取得に失敗しました。
        fmt.Println("get connector done time failed")
        fmt.Println(err)
        return
    }
    // コネクタの完了時刻の取得に成功しました。
    fmt.Println("get connector done time successful")
    fmt.Println(gcd.DoneTime)
}

サブスクリプションの管理

DataHub では、サーバーがサブスクリプションの消費オフセットを保存できます。簡単な構成を実行することで、可用性の高いオフセットストレージサービスを取得できます。

サブスクリプションの作成

説明

CreateSubscription(projectName、topicName、comment string) エラー

  • パラメーター

    • projectName: プロジェクト名。

    • topicName: トピック名。

    • comment: サブスクリプションに関するコメント。

  • 戻り値

  • エラー

    • ResourceExistError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func createSubscription() {
    csr, err := dh.CreateSubscription(projectName, topicName, "サブスクリプションコメント") // サブスクリプションコメント
    if err != nil {
        fmt.Println("create subscription failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create subscription successful")
    fmt.Println(*csr)
}

サブスクリプションの削除

説明

DeleteSubscription(projectName、topicName、subId string) エラー

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • subId: サブスクリプションの ID。

  • 戻り値

  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func delSubscription(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565577384801DCN0O"
    if err := dh.DeleteSubscription(projectName, topicName, subId); err != nil {
        // サブスクリプションの削除に失敗しました
        fmt.Println("delete subscription failed")
        return
    }
    // サブスクリプションの削除に成功しました
    fmt.Println("delete subscription successful")
}

サブスクリプションのクエリ

説明

GetSubscription(projectName、topicName、subId string) (*GetSubscriptionResult, error)

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • subId: サブスクリプションの ID。

  • 戻り値

 type GetSubscriptionResult struct {
     SubscriptionEntry
 }
  • エラー

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func getSubscription(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565577384801DCN0O"
    gs, err := dh.GetSubscription(projectName, topicName, subId)
    if err != nil {
        // サブスクリプションの取得に失敗しました
        fmt.Println("get subscription failed")
        fmt.Println(err)
        return
    }
    // サブスクリプションの取得に成功しました
    fmt.Println("get subscription successful")
    fmt.Println(gs)
}

サブスクリプションの一覧表示

pageIndex パラメーターと pageSize パラメーターを使用して、特定の範囲のサブスクリプションを一覧表示できます。たとえば、pageIndex パラメーターと pageSize パラメーターを 1 と 10 に設定して、最初の 10 件のサブスクリプションを一覧表示できます。別の例として、pageIndex パラメーターと pageSize パラメーターを 2 と 5 に設定して、6 番目から 10 番目のサブスクリプションを一覧表示できます。

説明

ListSubscription(projectName, topicName string, pageIndex, pageSize int) (*ListSubscriptionResult, error) プロジェクト名、トピック名、ページインデックス、ページサイズを指定してサブスクリプションを一覧表示します。

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • pageIndex: 返すページの番号。 pageSize: 各ページに返すエントリの番号。

  • 戻り値

type ListSubscriptionResult struct {
    TotalCount    int64               `json:"TotalCount"` // 全体の購読数
    Subscriptions []SubscriptionEntry `json:"Subscriptions"` // 購読エントリ
}
  • エラー

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func listSubscription(dh datahub.DataHub, projectName, topicName string) {
    pageIndex := 1
    pageSize := 5
    ls, err := dh.ListSubscription(projectName, topicName, pageIndex, pageSize)
    if err != nil {
        // サブスクリプション一覧の取得に失敗しました
        fmt.Println("get subscription list failed")
        fmt.Println(err)
        return
    }
    // サブスクリプション一覧の取得に成功しました
    fmt.Println("get subscription list successful")
    for _, sub := range ls.Subscriptions {
        fmt.Println(sub)
    }
}

サブスクリプションの更新

サブスクリプションのコメントのみを更新できます。

説明

UpdateSubscription(projectName、topicName、subId、comment string) エラー

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • subId: サブスクリプションの ID。

    • comment: サブスクリプションのコメント。

  • 戻り値

  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func updateSubscription(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    if err := dh.UpdateSubscription(projectName, topicName, subId, "new sub comment"); err != nil { // サブスクリプションコメントの更新に失敗しました
        fmt.Println("update subscription comment failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update subscription comment successful") // サブスクリプションコメントの更新に成功しました
}

サブスクリプションの状態を更新する

サブスクリプションは、SUB_OFFLINE または SUB_ONLINE 状態になる可能性があり、それぞれオフラインまたはオンラインのサブスクリプションを示します。

説明

UpdateSubscriptionState(projectName, topicName, subId string, state SubscriptionState) エラー

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • subId: サブスクリプションの ID。

    • state: 更新する状態。

  • 戻り値

  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func updateSubState(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    if err := dh.UpdateSubscriptionState(projectName, topicName, subId, datahub.SUB_OFFLINE); err != nil {
        // サブスクリプション状態の更新に失敗しました
        fmt.Println("update subscription state failed")
        fmt.Println(err)
        return
    }
    // サブスクリプション状態の更新に成功しました
    fmt.Println("update subscription state successful")
}

オフセットの管理

サブスクリプションが作成された後、最初は消費されていません。サブスクリプションのオフセットストレージ機能を使用するには、オフセットに対して次の操作を実行します。

オフセットを初期化します

オフセットの管理にサブスクリプションを使用するには、最初にサブスクリプションを初期化する必要があります。複数のスレッドを使用してサブスクリプションのデータを同時に消費することはできません。複数のスレッドを使用して同じデータを消費するには、異なるサブスクリプションを使用する必要があります。 OpenSubscriptionSession メソッドを呼び出すと、SessionId パラメーターの値が 1 ずつ増加します。以前のセッションは無効になり、オフセットを更新できなくなります。

説明

OpenSubscriptionSession(projectName, topicName, subId string, shardIds []string) (*OpenSubscriptionSessionResult, error) サブスクリプションセッションを開きます

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • subId: サブスクリプションの ID。

    • shardIds: シャードの ID。

  • 戻り値

type OpenSubscriptionSessionResult struct {
    Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
// SubscriptionOffset サブスクリプションオフセット
type SubscriptionOffset struct {
    Timestamp int64  `json:"Timestamp"`
    Sequence  int64  `json:"Sequence"`
    VersionId int64  `json:"Version"`
    SessionId *int64 `json:"SessionId"`
}
  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプル コード

func openOffset(dh datahub.DataHub, projectName, topicName string) {
    // subId を設定します。
    subId := "1565580329258VXSY8"
    // shardId のリストを設定します。
    shardIds := []string{"0", "1", "2"}
    // サブスクリプションセッションを開きます。
    oss, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
    if err != nil {
        // セッションのオープンに失敗しました。
        fmt.Println("open session failed")
        fmt.Println(err)
        return
    }
    // セッションのオープンに成功しました。
    fmt.Println("open session successful")
    fmt.Println(oss)
}

オフセットを取得する

GetSubscriptionOffset メソッドを呼び出して、サブスクリプションの現在のオフセットを照会できます。 OpenSubscriptionSession メソッドを呼び出して取得したオフセット情報とは異なり、GetSubscriptionOffset メソッドを呼び出して取得したオフセット情報には、値が nil の SessionId パラメーターが含まれています。この場合、オフセットを送信することはできません。一般的に、GetSubscriptionOffset メソッドはオフセット情報を表示するために呼び出されます。

説明

GetSubscriptionOffset(projectName, topicName, subId string, shardIds []string) (*GetSubscriptionOffsetResult, error) // プロジェクト名、トピック名、サブスクリプション ID、シャード ID のリストを指定して、サブスクリプションのオフセットを取得します。

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • subId: サブスクリプションの ID。

    • shardIds: シャードの ID。

  • 戻り値

type OpenSubscriptionSessionResult struct {
    Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
// サブスクリプションオフセット
type SubscriptionOffset struct {
    Timestamp int64  `json:"Timestamp"`
    Sequence  int64  `json:"Sequence"`
    VersionId int64  `json:"Version"`
    SessionId *int64 `json:"SessionId"`
}
  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプル コード

func getOffset(dh datahub.DataHub, projectName, topicName string) {
    // サブスクリプションIDを取得します。
    subId := "1565580329258VXSY8"
    // シャードIDのリストを取得します。
    shardIds := []string{"0", "1", "2"}
    // サブスクリプションオフセットを取得します。
    gss, err := dh.GetSubscriptionOffset(projectName, topicName, subId, shardIds)
    if err != nil {
        // セッションの取得に失敗しました。
        fmt.Println("get session failed")
        fmt.Println(err)
        return
    }
    // セッションの取得に成功しました。
    fmt.Println("get session successful")
    fmt.Println(gss)
}

オフセットの更新

オフセットを更新すると、DataHub は versionId パラメーターと sessionId パラメーターの値を確認します。これらの値が現在のセッションの値と同じであることを確認してください。同じでない場合、更新は失敗します。オフセットを更新する場合は、オフセットの Timestamp パラメーターと Sequence パラメーターを設定する必要があります。設定しない場合、更新後に取得されるオフセットが無効になる可能性があります。Timestamp パラメーターと Sequence パラメーターが要件に従って設定されていない場合、オフセットは Timestamp パラメーターに基づいて更新されます。オフセットのレコードに対応する Timestamp パラメーターと Sequence パラメーターに基づいてオフセットを更新することをお勧めします。

説明

CommitSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) エラー

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • subId: サブスクリプションの ID。

    • offsets: シャードのオフセットマップ。

  • 戻り値

  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func updateOffset() {
    shardIds := []string{"0", "1", "2"}
    oss, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
    if err != nil {
        fmt.Println("open session failed")
        fmt.Println(err)
    }
    fmt.Println("open session successful")
    fmt.Println(oss)
    offset := oss.Offsets["0"]
    // オフセットメッセージを設定します
    offset.Sequence = 900
    offset.Timestamp = 1565593166690
    offsetMap := map[string]datahub.SubscriptionOffset{
        "0": offset,
    }
    if err := dh.CommitSubscriptionOffset(projectName, topicName, subId, offsetMap); err != nil {
        if _, ok := err.(*datahub.SubscriptionOfflineError); ok {
            fmt.Println("the subscription has offline")
        } else if _, ok := err.(*datahub.SubscriptionSessionInvalidError); ok {
            fmt.Println("the subscription is open elsewhere")
        } else if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
            fmt.Println("the subscription is reset elsewhere")
        } else {
            fmt.Println(err)
        }
        fmt.Println("update offset failed")
        return
    }
    fmt.Println("update offset successful")
}

オフセットをリセットする

特定の時点にオフセットをリセットできます。オフセットがリセットされると、リセットされたオフセットの VersionId パラメーターの値が 1 ずつ増加します。以前のセッションは無効になり、オフセットを更新できなくなります。

説明

ResetSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) エラー

  • パラメーター

    • projectName: プロジェクトの名前。

    • topicName: トピックの名前。

    • subId: サブスクリプションの ID。

    • offsets: シャードのオフセットマップ。

  • 戻り値

  • エラー

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • サンプルコード

func resetOffset(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    offset := datahub.SubscriptionOffset{
        Timestamp: 1565593166690,
    }
    offsetMap := map[string]datahub.SubscriptionOffset{
        "1": offset,
    }
    if err := dh.ResetSubscriptionOffset(projectName, topicName, subId, offsetMap); err != nil {
        fmt.Println("オフセットのリセットに失敗しました") // reset offset failed
        fmt.Println(err)
        return
    }
    fmt.Println("オフセットのリセットに成功しました") // reset offset successful
}

エラーの種類

このセクションでは、DataHub SDK for Go に関連するエラーの種類について説明します。エラーの説明に基づいてエラーの種類を特定し、エラーの種類に基づいてエラーを処理できます。DatahubClientError、LimitExceededError、および ServiceTemporaryUnavailableError エラーのみが再試行によって解決できます。サーバーがビジー状態または使用不可であることが原因で発生するエラーなど、一部の DatahubClientError エラーは、再試行によって解決できます。再試行で解決できるエラーについては、コードに再試行ロジックを追加することをお勧めします。ただし、再試行回数は制限する必要があります。

エラータイプ

エラーメッセージ

説明

InvalidParameterError

InvalidParameter、InvalidCursor

指定されたパラメータが無効なために返されるエラーメッセージです。

ResourceNotFoundError

ResourceNotFound、NoSuchProject、NoSuchTopic、NoSuchShard、NoSuchSubscription、NoSuchConnector、NoSuchMeteringInfo

アクセスしようとするリソースが存在しないために返されるエラーメッセージです。 シャードを分割またはマージした後にすぐに別のリクエストを送信すると、このエラーメッセージが返されます。

ResourceExistError

ResourceAlreadyExist、ProjectAlreadyExist、TopicAlreadyExist、ConnectorAlreadyExist

リソースが既に存在するために返されるエラーメッセージです。 作成しようとするリソースが既に存在する場合、このエラーメッセージが返されます。

SeekOutOfRangeError

SeekOutOfRange

カーソルを取得するときに、指定されたシーケンス番号が無効であるか、指定されたタイムスタンプが現在のタイムスタンプより後であるために返されるエラーメッセージです。 カーソルのレコードの有効期限が切れているため、シーケンス番号が無効になる場合があります。

AuthorizationFailedError

Unauthorized

認証署名の解析中にエラーが発生したために返されるエラーメッセージです。 AccessKeyペアが有効かどうかを確認してください。

NoPermissionError

NoPermission、OperationDenied

権限がないために返されるエラーメッセージです。 RAM構成が有効かどうか、またはRAMユーザーが承認されているかどうかを確認してください。

NewShardSealedError

InvalidShardOperation

シャードが閉じられており、シャードからデータを読み書きできないために返されるエラーメッセージです。 シャードへのデータの書き込みを続行するか、シャードから最後のデータレコードが読み取られた後にデータの読み取りを続行すると、このエラーメッセージが返されます。

LimitExceededError

LimitExceeded

DataHub SDK for Go の制限を超えたために返されるエラーメッセージです。 詳細については、「制限」をご参照ください。

SubscriptionOfflineError

SubscriptionOffline

サブスクリプションがオフラインであり、使用できないために返されるエラーメッセージです。

SubscriptionSessionInvalidError

OffsetSessionChanged、OffsetSessionClosed

サブスクリプションセッションが異常であるために返されるエラーメッセージです。 サブスクリプションを使用すると、オフセットを送信するためのセッションが確立されます。 別のクライアントでもサブスクリプションが使用されている場合、このエラーメッセージが返されます。

SubscriptionOffsetResetError

OffsetReseted

サブスクリプションのオフセットがリセットされたために返されるエラーメッセージです。

MalformedRecordError

MalformedRecord and ShardNotReady

レコード形式が無効なために返されるエラーメッセージです。 これは、スキーマが無効であるか、UTF-8以外の文字が存在するか、クライアントがPBプロトコルを使用しているがサーバーがPBプロトコルをサポートしていないことが原因である可能性があります。

ServiceTemporaryUnavailableError

N/A

ネットワークの切断などのネットワークエラーが発生したために返されるエラーメッセージです。 再試行してください。

DatahubClientError

その他すべてのエラー。 このエラ―タイプはすべてのエラーの基本クラスです。

エラーが上記のエラ―タイプに該当しないために返されるエラーメッセージです。 このタイプのエラーは再試行によって解決できます。 ただし、再試行回数は制限する必要があります。

DatahubClientError

DatahubClientError は、DataHub の基本的なエラータイプです。DataHub のすべてのエラーはこのエラータイプに基づいて派生します。定義済みのエラータイプに含まれない DataHub のすべてのエラーは、DatahubClientError エラーです。DatahubClientError エラーには、サーバーがビジー状態または使用不可であるために発生するエラーなど、再試行によって解決できるエラーが含まれます。コードに再試行ロジックを追加できます。

type DatahubClientError struct {
    StatusCode int    `json:"StatusCode"`   // HTTPステータスコード
    RequestId  string `json:"RequestId"`    // リクエストを追跡するためのリクエスト ID
    Code       string `json:"ErrorCode"`    // DataHubエラーコード
    Message    string `json:"ErrorMessage"` // エラーコードのエラーメッセージ
}

次のサンプルコードは、error() メソッドの呼び出し例を示しています。

func example_error() {
    accessId := ""
    accessKey := ""
    endpoint := ""
    projectName := "datahub_go_test"
    maxRetry := 3
    dh := datahub.New(accessId, accessKey, endpoint)
    if err := dh.CreateProject(projectName, "project comment"); err != nil {
        if _, ok := err.(*datahub.InvalidParameterError); ok {
            // 無効なパラメーターです。入力パラメーターを確認してください。
            fmt.Println("invalid parameter,please check your input parameter")
        } else if _, ok := err.(*datahub.ResourceExistError); ok {
            // プロジェクトは既に存在します。
            fmt.Println("project already exists")
        } else if _, ok := err.(*datahub.AuthorizationFailedError); ok {
            // accessIdまたはaccessKeyエラーです。accessIdとaccessKeyを確認してください。
            fmt.Println("accessId or accessKey err,please check your accessId and accessKey")
        } else if _, ok := err.(*datahub.LimitExceededError); ok {
            // 制限を超えているため、再試行します。
            fmt.Println("limit exceed, so retry")
            for i := 0; i < maxRetry; i++ {
                // 5秒待機します。
                time.Sleep(5 * time.Second)
                if err := dh.CreateProject(projectName, "project comment"); err != nil {
                    // プロジェクトの作成に失敗しました。
                    fmt.Println("create project failed")
                    fmt.Println(err)
                } else {
                    // プロジェクトの作成に成功しました。
                    fmt.Println("create project successful")
                    break
                }
            }
        } else {
            // 不明なエラーです。
            fmt.Println("unknown error")
            fmt.Println(err)
        }
    } else {
        // プロジェクトの作成に成功しました。
        fmt.Println("create project successful")
    }
}