全部产品
Search
文档中心

Tablestore:Memulai

更新时间:Jul 02, 2025

Tunnel Service memungkinkan Anda mengonsumsi data dalam tabel. Topik ini menjelaskan cara memulai dengan Tunnel Service menggunakan Tablestore SDK untuk Go. Sebelum menggunakan Tunnel Service, pastikan Anda memahami catatan penggunaan Tunnel Service.

Catatan penggunaan

  • Secara default, sistem memulai thread pool untuk membaca dan memproses data berdasarkan TunnelWorkerConfig. Jika Anda ingin memulai beberapa TunnelWorkers pada satu server, disarankan menggunakan TunnelWorkerConfig yang sama untuk mengonfigurasi semua TunnelWorkers.

  • TunnelWorker memerlukan periode pemanasan untuk inisialisasi, yang ditentukan oleh parameter HeartbeatInterval di TunnelWorkerConfig. Nilai default adalah 30 detik.

  • Ketika klien TunnelWorker dimatikan karena keluar tak terduga atau terminasi manual, TunnelWorker secara otomatis mendaur ulang sumber daya melalui salah satu metode berikut: melepaskan thread pool, secara otomatis memanggil metode shutdown yang didaftarkan untuk kelas Channel, dan menutup tunnel.

  • Periode retensi log inkremen di tunnel sama dengan periode retensi log Stream. Log Stream dapat disimpan hingga tujuh hari, sehingga log inkremen di tunnel juga dapat disimpan hingga tujuh hari.

  • Jika Anda membuat tunnel untuk mengonsumsi data diferensial atau inkremen, perhatikan hal-hal berikut:

    • Selama konsumsi data penuh, jika tunnel gagal menyelesaikan konsumsi data penuh dalam periode retensi log inkremen (paling lama tujuh hari), kesalahan OTSTunnelExpired akan terjadi ketika tunnel mulai mengonsumsi log inkremen. Akibatnya, tunnel tidak dapat mengonsumsi log inkremen.

      Jika Anda memperkirakan bahwa tunnel tidak dapat menyelesaikan konsumsi data penuh dalam jangka waktu tertentu, hubungi dukungan teknis Tablestore.

    • Selama konsumsi data inkremen, jika tunnel gagal menyelesaikan konsumsi log inkremen dalam periode retensi log inkremen (paling lama tujuh hari), tunnel mungkin mengonsumsi data dari data terbaru yang tersedia. Dalam kasus ini, data tertentu mungkin tidak dikonsumsi.

  • Setelah tunnel kedaluwarsa, Tablestore dapat menonaktifkan tunnel. Jika tunnel tetap dinonaktifkan selama lebih dari 30 hari, tunnel tersebut akan dihapus. Anda tidak dapat memulihkan tunnel yang telah dihapus.

Prasyarat

Memulai dengan Tunnel Service

  1. Inisialisasi instance TunnelClient.

    Saat menginisialisasi instance TunnelClient, Anda dapat menggunakan kredensial akses jangka panjang atau kredensial akses sementara untuk autentikasi.

    • Gunakan kredensial akses jangka panjang untuk inisialisasi

      Pastikan variabel lingkungan TABLESTORE_ACCESS_KEY_ID dan TABLESTORE_ACCESS_KEY_SECRET dikonfigurasi. Variabel lingkungan TABLESTORE_ACCESS_KEY_ID menentukan ID AccessKey dari akun Alibaba Cloud atau pengguna RAM Anda. Variabel lingkungan TABLESTORE_ACCESS_KEY_SECRET menentukan rahasia AccessKey dari akun Alibaba Cloud atau pengguna RAM Anda.

      Peringatan

      Akun Alibaba Cloud memiliki akses penuh ke semua sumber daya akun tersebut. Kebocoran pasangan AccessKey akun Alibaba Cloud menimbulkan ancaman kritis bagi sistem. Oleh karena itu, disarankan menggunakan pasangan AccessKey dari pengguna RAM yang diberi izin minimum yang diperlukan untuk menginisialisasi instance TunnelClient.

      // Setel parameter endpoint ke titik akhir dari instance Tablestore. Contoh: https://instance.cn-hangzhou.ots.aliyuncs.com. 
      // Tentukan nama instance. 
      // Tentukan ID AccessKey dan rahasia AccessKey dari akun Alibaba Cloud atau pengguna RAM Anda. 
      endpoint := "yourEndpoint"
      instance := "yourInstance"
      accessKeyId := os.Getenv("TABLESTORE_ACCESS_KEY_ID")
      accessKeySecret := os.Getenv("TABLESTORE_ACCESS_KEY_SECRET")
      tunnelClient := tunnel.NewTunnelClient(endpoint, instance, accessKeyId, accessKeySecret)                    
    • Gunakan kredensial akses sementara untuk inisialisasi

      1. Jika Anda ingin menggunakan Tablestore SDK untuk Go untuk sementara mengakses Tablestore, Anda dapat menggunakan Security Token Service (STS) untuk menghasilkan kredensial akses sementara. Untuk informasi lebih lanjut, lihat Konfigurasikan Kredensial Akses Sementara.

      2. Klien tunnel menyediakan operasi NewTunnelClientWithToken yang dapat Anda panggil untuk menginisialisasi instance TunnelClient berdasarkan kredensial akses sementara. Topik ini menyediakan kode contoh untuk menginisialisasi instance TunnelClient menggunakan kredensial akses sementara yang dapat diperbarui secara berkala. Untuk informasi lebih lanjut, lihat Lampiran: Kode Contoh untuk Menginisialisasi Instance TunnelClient Menggunakan Kredensial Akses Sementara.

  2. Buat tunnel.

    req := &tunnel.CreateTunnelRequest{
       TableName:  "<TABLE_NAME>",
       TunnelName: "<TUNNEL_NAME>",
       Type:       tunnel.TunnelTypeBaseStream, // Buat tunnel BaseAndStream. 
    }
    resp, err := tunnelClient.CreateTunnel(req)
    if err != nil {
       log.Fatal("create test tunnel failed", err)
    }
    log.Println("tunnel id is", resp.TunnelId)
  3. Tentukan fungsi panggilan balik kustom untuk memulai konsumsi data otomatis.

    // Tentukan fungsi panggilan balik kustom. 
    func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error {
        fmt.Println("informasi yang ditentukan pengguna", ctx.CustomValue)
        for _, rec := range records {
            fmt.Println("detail rekaman tunnel:", rec.String())
        }
        fmt.Println("konsumsi sekelompok rekaman selesai")
        return nil
    }
    
    // Konfigurasikan fungsi panggilan balik. Informasi tentang fungsi panggilan balik diteruskan ke SimpleProcessFactory. Konfigurasikan TunnelWorkerConfig untuk konsumen. 
    workConfig := &tunnel.TunnelWorkerConfig{
       ProcessorFactory: &tunnel.SimpleProcessFactory{
          CustomValue: "nilai antarmuka kustom pengguna {}",
          ProcessFunc: exampleConsumeFunction,
       },
    }
    
    // Gunakan TunnelDaemon untuk terus-menerus mengonsumsi tunnel yang ditentukan. 
    tunnelId := "<TUNNEL_ID>"
    daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig)
    log.Fatal(daemon.Run())

Lampiran: Kode contoh untuk menginisialisasi instance TunnelClient menggunakan kredensial akses sementara

import (
    otscommon "github.com/aliyun/aliyun-tablestore-go-sdk/common"
    "github.com/aliyun/aliyun-tablestore-go-sdk/tunnel"
    "sync"
    "time"
)

type RefreshClient struct {
    lastRefresh          time.Time
    refreshIntervalInMin int
}

func NewRefreshClient(intervalInMin int) *RefreshClient {
    return &RefreshClient{
        refreshIntervalInMin: intervalInMin,
    }
}

func (c *RefreshClient) IsExpired() bool {
    now := time.Now()
    if c.lastRefresh.IsZero() || now.Sub(c.lastRefresh) > time.Duration(c.refreshIntervalInMin)*time.Minute {
        return true
    }

    return false
}

func (c *RefreshClient) Update() {
    c.lastRefresh = time.Now()
}

type clientCredentials struct {
    accessKeyID     string
    accessKeySecret string
    securityToken   string
}

func newClientCredentials(accessKeyID string, accessKeySecret string, securityToken string) *clientCredentials {
    return &clientCredentials{accessKeyID: accessKeyID, accessKeySecret: accessKeySecret, securityToken: securityToken}
}

func (c *clientCredentials) GetAccessKeyID() string {
    return c.accessKeyID
}

func (c *clientCredentials) GetAccessKeySecret() string {
    return c.accessKeySecret
}

func (c *clientCredentials) GetSecurityToken() string {
    return c.securityToken
}

type OTSCredentialsProvider struct {
    refresh *RefreshClient
    cred    *clientCredentials
    lock    sync.Mutex
}

func NewOTSCredentialsProvider() *OTSCredentialsProvider {
    return &OTSCredentialsProvider{
        // Modifikasi siklus pembaruan untuk kredensial akses sementara berdasarkan kebutuhan bisnis Anda. Siklus pembaruan harus lebih pendek daripada periode validitas kredensial akses sementara. 
        refresh: NewRefreshClient(30),
    }
}

func (p *OTSCredentialsProvider) renewCredentials() error {
    if p.cred == nil || p.refresh.IsExpired() {
        // Dapatkan kredensial akses sementara. Anda dapat memanggil operasi AssumeRole dari RAM untuk mendapatkan ID AccessKey, rahasia AccessKey, token keamanan, dan periode validitas kredensial akses sementara. 
        // Konfigurasikan parameter berikut. Untuk informasi tentang SDK RAM, lihat dokumentasi RAM.  
        // resp, err := GetUserOtsStsToken()
        accessKeyId := ""
        accessKeySecret := ""
        stsToken := ""
        p.cred = newClientCredentials(accessKeyId, accessKeySecret, stsToken)
        p.refresh.Update()
    }

    return nil
}

func (p *OTSCredentialsProvider) GetCredentials() otscommon.Credentials {
    p.lock.Lock()
    defer p.lock.Unlock()

    if err := p.renewCredentials(); err != nil {
        // log error
        if p.cred == nil {
            return newClientCredentials("", "", "")
        }
    }

    return p.cred
}

// NewTunnelClientWithToken digunakan untuk menginisialisasi instance TunnelClient dengan fitur pembaruan kredensial akses sementara. 
func NewTunnelClientWithToken(endpoint, instanceName, accessId, accessKey, token string) tunnel.TunnelClient {
    return tunnel.NewTunnelClientWithToken(
        endpoint,
        instanceName,
        "",
        "",
        "",
        nil,
        tunnel.SetCredentialsProvider(NewOTSCredentialsProvider()),
    )
}