Tunnel Service memungkinkan Anda mengonsumsi data dalam tabel. Topik ini menjelaskan cara memulai dengan Tunnel Service menggunakan Tablestore SDK untuk Go. Sebelum menggunakan Tunnel Service, pastikan Anda memahami catatan penggunaan Tunnel Service.
Catatan penggunaan
Secara default, sistem memulai thread pool untuk membaca dan memproses data berdasarkan TunnelWorkerConfig. Jika Anda ingin memulai beberapa TunnelWorkers pada satu server, disarankan menggunakan TunnelWorkerConfig yang sama untuk mengonfigurasi semua TunnelWorkers.
TunnelWorker memerlukan periode pemanasan untuk inisialisasi, yang ditentukan oleh parameter HeartbeatInterval di TunnelWorkerConfig. Nilai default adalah 30 detik.
Ketika klien TunnelWorker dimatikan karena keluar tak terduga atau terminasi manual, TunnelWorker secara otomatis mendaur ulang sumber daya melalui salah satu metode berikut: melepaskan thread pool, secara otomatis memanggil metode shutdown yang didaftarkan untuk kelas Channel, dan menutup tunnel.
Periode retensi log inkremen di tunnel sama dengan periode retensi log Stream. Log Stream dapat disimpan hingga tujuh hari, sehingga log inkremen di tunnel juga dapat disimpan hingga tujuh hari.
Jika Anda membuat tunnel untuk mengonsumsi data diferensial atau inkremen, perhatikan hal-hal berikut:
Selama konsumsi data penuh, jika tunnel gagal menyelesaikan konsumsi data penuh dalam periode retensi log inkremen (paling lama tujuh hari), kesalahan
OTSTunnelExpiredakan terjadi ketika tunnel mulai mengonsumsi log inkremen. Akibatnya, tunnel tidak dapat mengonsumsi log inkremen.Jika Anda memperkirakan bahwa tunnel tidak dapat menyelesaikan konsumsi data penuh dalam jangka waktu tertentu, hubungi dukungan teknis Tablestore.
Selama konsumsi data inkremen, jika tunnel gagal menyelesaikan konsumsi log inkremen dalam periode retensi log inkremen (paling lama tujuh hari), tunnel mungkin mengonsumsi data dari data terbaru yang tersedia. Dalam kasus ini, data tertentu mungkin tidak dikonsumsi.
Setelah tunnel kedaluwarsa, Tablestore dapat menonaktifkan tunnel. Jika tunnel tetap dinonaktifkan selama lebih dari 30 hari, tunnel tersebut akan dihapus. Anda tidak dapat memulihkan tunnel yang telah dihapus.
Prasyarat
Tabel data telah dibuat. Untuk informasi lebih lanjut, lihat Gunakan Konsol Tablestore, Gunakan CLI Tablestore, dan Gunakan SDK Tablestore.
Titik akhir dari instance tempat tabel data berada diperoleh. Untuk informasi lebih lanjut, lihat Dapatkan Titik Akhir dari Instance Tablestore.
Kredensial akses dikonfigurasi. Untuk informasi lebih lanjut, lihat Konfigurasikan Kredensial Akses.
Memulai dengan Tunnel Service
Inisialisasi instance TunnelClient.
Saat menginisialisasi instance TunnelClient, Anda dapat menggunakan kredensial akses jangka panjang atau kredensial akses sementara untuk autentikasi.
Gunakan kredensial akses jangka panjang untuk inisialisasi
Pastikan variabel lingkungan
TABLESTORE_ACCESS_KEY_IDdanTABLESTORE_ACCESS_KEY_SECRETdikonfigurasi. Variabel lingkungan TABLESTORE_ACCESS_KEY_ID menentukan ID AccessKey dari akun Alibaba Cloud atau pengguna RAM Anda. Variabel lingkungan TABLESTORE_ACCESS_KEY_SECRET menentukan rahasia AccessKey dari akun Alibaba Cloud atau pengguna RAM Anda.PeringatanAkun Alibaba Cloud memiliki akses penuh ke semua sumber daya akun tersebut. Kebocoran pasangan AccessKey akun Alibaba Cloud menimbulkan ancaman kritis bagi sistem. Oleh karena itu, disarankan menggunakan pasangan AccessKey dari pengguna RAM yang diberi izin minimum yang diperlukan untuk menginisialisasi instance TunnelClient.
// Setel parameter endpoint ke titik akhir dari instance Tablestore. Contoh: https://instance.cn-hangzhou.ots.aliyuncs.com. // Tentukan nama instance. // Tentukan ID AccessKey dan rahasia AccessKey dari akun Alibaba Cloud atau pengguna RAM Anda. endpoint := "yourEndpoint" instance := "yourInstance" accessKeyId := os.Getenv("TABLESTORE_ACCESS_KEY_ID") accessKeySecret := os.Getenv("TABLESTORE_ACCESS_KEY_SECRET") tunnelClient := tunnel.NewTunnelClient(endpoint, instance, accessKeyId, accessKeySecret)Gunakan kredensial akses sementara untuk inisialisasi
Jika Anda ingin menggunakan Tablestore SDK untuk Go untuk sementara mengakses Tablestore, Anda dapat menggunakan Security Token Service (STS) untuk menghasilkan kredensial akses sementara. Untuk informasi lebih lanjut, lihat Konfigurasikan Kredensial Akses Sementara.
Klien tunnel menyediakan operasi NewTunnelClientWithToken yang dapat Anda panggil untuk menginisialisasi instance TunnelClient berdasarkan kredensial akses sementara. Topik ini menyediakan kode contoh untuk menginisialisasi instance TunnelClient menggunakan kredensial akses sementara yang dapat diperbarui secara berkala. Untuk informasi lebih lanjut, lihat Lampiran: Kode Contoh untuk Menginisialisasi Instance TunnelClient Menggunakan Kredensial Akses Sementara.
Buat tunnel.
req := &tunnel.CreateTunnelRequest{ TableName: "<TABLE_NAME>", TunnelName: "<TUNNEL_NAME>", Type: tunnel.TunnelTypeBaseStream, // Buat tunnel BaseAndStream. } resp, err := tunnelClient.CreateTunnel(req) if err != nil { log.Fatal("create test tunnel failed", err) } log.Println("tunnel id is", resp.TunnelId)Tentukan fungsi panggilan balik kustom untuk memulai konsumsi data otomatis.
// Tentukan fungsi panggilan balik kustom. func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error { fmt.Println("informasi yang ditentukan pengguna", ctx.CustomValue) for _, rec := range records { fmt.Println("detail rekaman tunnel:", rec.String()) } fmt.Println("konsumsi sekelompok rekaman selesai") return nil } // Konfigurasikan fungsi panggilan balik. Informasi tentang fungsi panggilan balik diteruskan ke SimpleProcessFactory. Konfigurasikan TunnelWorkerConfig untuk konsumen. workConfig := &tunnel.TunnelWorkerConfig{ ProcessorFactory: &tunnel.SimpleProcessFactory{ CustomValue: "nilai antarmuka kustom pengguna {}", ProcessFunc: exampleConsumeFunction, }, } // Gunakan TunnelDaemon untuk terus-menerus mengonsumsi tunnel yang ditentukan. tunnelId := "<TUNNEL_ID>" daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig) log.Fatal(daemon.Run())
Lampiran: Kode contoh untuk menginisialisasi instance TunnelClient menggunakan kredensial akses sementara
import (
otscommon "github.com/aliyun/aliyun-tablestore-go-sdk/common"
"github.com/aliyun/aliyun-tablestore-go-sdk/tunnel"
"sync"
"time"
)
type RefreshClient struct {
lastRefresh time.Time
refreshIntervalInMin int
}
func NewRefreshClient(intervalInMin int) *RefreshClient {
return &RefreshClient{
refreshIntervalInMin: intervalInMin,
}
}
func (c *RefreshClient) IsExpired() bool {
now := time.Now()
if c.lastRefresh.IsZero() || now.Sub(c.lastRefresh) > time.Duration(c.refreshIntervalInMin)*time.Minute {
return true
}
return false
}
func (c *RefreshClient) Update() {
c.lastRefresh = time.Now()
}
type clientCredentials struct {
accessKeyID string
accessKeySecret string
securityToken string
}
func newClientCredentials(accessKeyID string, accessKeySecret string, securityToken string) *clientCredentials {
return &clientCredentials{accessKeyID: accessKeyID, accessKeySecret: accessKeySecret, securityToken: securityToken}
}
func (c *clientCredentials) GetAccessKeyID() string {
return c.accessKeyID
}
func (c *clientCredentials) GetAccessKeySecret() string {
return c.accessKeySecret
}
func (c *clientCredentials) GetSecurityToken() string {
return c.securityToken
}
type OTSCredentialsProvider struct {
refresh *RefreshClient
cred *clientCredentials
lock sync.Mutex
}
func NewOTSCredentialsProvider() *OTSCredentialsProvider {
return &OTSCredentialsProvider{
// Modifikasi siklus pembaruan untuk kredensial akses sementara berdasarkan kebutuhan bisnis Anda. Siklus pembaruan harus lebih pendek daripada periode validitas kredensial akses sementara.
refresh: NewRefreshClient(30),
}
}
func (p *OTSCredentialsProvider) renewCredentials() error {
if p.cred == nil || p.refresh.IsExpired() {
// Dapatkan kredensial akses sementara. Anda dapat memanggil operasi AssumeRole dari RAM untuk mendapatkan ID AccessKey, rahasia AccessKey, token keamanan, dan periode validitas kredensial akses sementara.
// Konfigurasikan parameter berikut. Untuk informasi tentang SDK RAM, lihat dokumentasi RAM.
// resp, err := GetUserOtsStsToken()
accessKeyId := ""
accessKeySecret := ""
stsToken := ""
p.cred = newClientCredentials(accessKeyId, accessKeySecret, stsToken)
p.refresh.Update()
}
return nil
}
func (p *OTSCredentialsProvider) GetCredentials() otscommon.Credentials {
p.lock.Lock()
defer p.lock.Unlock()
if err := p.renewCredentials(); err != nil {
// log error
if p.cred == nil {
return newClientCredentials("", "", "")
}
}
return p.cred
}
// NewTunnelClientWithToken digunakan untuk menginisialisasi instance TunnelClient dengan fitur pembaruan kredensial akses sementara.
func NewTunnelClientWithToken(endpoint, instanceName, accessId, accessKey, token string) tunnel.TunnelClient {
return tunnel.NewTunnelClientWithToken(
endpoint,
instanceName,
"",
"",
"",
nil,
tunnel.SetCredentialsProvider(NewOTSCredentialsProvider()),
)
}