Tunnel Service allows you to consume the data in a table. This topic describes how to get started with Tunnel Service by using Tablestore SDK for Go. Before you use Tunnel Service, make sure that you are familiar with the usage notes of Tunnel Service.
Usage notes
By default, the system starts a thread pool to read and process data based on TunnelWorkerConfig. If you want to start multiple TunnelWorkers on a single server, we recommend that you use the same TunnelWorkerConfig to configure all TunnelWorkers.
TunnelWorker requires a warm-up period for initialization, which is specified by the HeartbeatInterval parameter in TunnelWorkerConfig. Default value: 30. Unit: seconds.
When the TunnelWorker client shuts down due to an unexpected exit or manual termination, TunnelWorker automatically recycles resources by using one of the following methods: release the thread pool, automatically call the shutdown method that you registered for the Channel class, and shut down the tunnel.
The retention period of incremental logs in tunnels is the same as the retention period of Stream logs. Stream logs can be retained for up to seven days. Therefore, incremental logs in tunnels can be retained for up to seven days.
If you create a tunnel to consume differential or incremental data, take note of the following items:
During full data consumption, if the tunnel fails to complete consumption of full data within the retention period of incremental logs (seven days at most), an
OTSTunnelExpired
error occurs when the tunnel starts to consume incremental logs. As a result, the tunnel cannot consume incremental logs.If you estimate that the tunnel cannot complete full data consumption within the specified time window, contact Tablestore technical support.
During incremental data consumption, if the tunnel fails to complete consumption of incremental logs within the retention period of incremental logs (seven days at most), the tunnel may consume data from the latest available data. In this case, specific data may not be consumed.
After a tunnel expires, Tablestore may disable the tunnel. If a tunnel remains in the disabled state for more than 30 days, the tunnel is deleted. You cannot restore a deleted tunnel.
Prerequisites
A data table is created. For more information, see Use the Tablestore console, Use the Tablestore CLI, and Use Tablestore SDKs.
The endpoint of the instance in which the data table resides is obtained. For more information, see Obtain an endpoint of a Tablestore instance.
Access credentials are configured. For more information, see Configure access credentials.
Get started with Tunnel Service
Initialize a TunnelClient instance.
When you initialize a TunnelClient instance, you can use long-term access credentials or a temporary access credentials for authentication.
Use long-term access credentials for initialization
Make sure that the
TABLESTORE_ACCESS_KEY_ID
andTABLESTORE_ACCESS_KEY_SECRET
environment variables are configured. The TABLESTORE_ACCESS_KEY_ID environment variable specifies the AccessKey ID of your Alibaba Cloud account or RAM user. The TABLESTORE_ACCESS_KEY_SECRET environment variable specifies the AccessKey secret of your Alibaba Cloud account or RAM user.WarningAn Alibaba Cloud account has full access to all resources of the account. Leaks of the Alibaba Cloud account AccessKey pair pose critical threats to the system. Therefore, we recommend that you use an AccessKey pair of a RAM user that is granted minimum required permissions to initialize a TunnelClient instance.
// Set the endpoint parameter to the endpoint of the Tablestore instance. Example: https://instance.cn-hangzhou.ots.aliyuncs.com. // Specify the name of the instance. // Specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account or a RAM user. endpoint := "yourEndpoint" instance := "yourInstance" accessKeyId := os.Getenv("TABLESTORE_ACCESS_KEY_ID") accessKeySecret := os.Getenv("TABLESTORE_ACCESS_KEY_SECRET") tunnelClient := tunnel.NewTunnelClient(endpoint, instance, accessKeyId, accessKeySecret)
Use temporary access credentials for initialization
If you want to use Tablestore SDK for Go to temporarily access Tablestore, you can use Security Token Service (STS) to generate temporary access credentials. For more information, see Configure temporary access credentials.
A tunnel client provides the NewTunnelClientWithToken operation that you can call to initialize a TunnelClient instance based on temporary access credentials. This topic provides sample code for initializing a TunnelClient instance by using temporary access credentials that can be periodically refreshed. For more information, see Appendix: Sample code for initializing a TunnelClient instance by using temporary access credentials.
Create a tunnel.
req := &tunnel.CreateTunnelRequest{ TableName: "<TABLE_NAME>", TunnelName: "<TUNNEL_NAME>", Type: tunnel.TunnelTypeBaseStream, // Create a BaseAndStream tunnel. } resp, err := tunnelClient.CreateTunnel(req) if err != nil { log.Fatal("create test tunnel failed", err) } log.Println("tunnel id is", resp.TunnelId)
Specify a custom callback function to start automatic data consumption.
// Specify a custom callback function. func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error { fmt.Println("user-defined information", ctx.CustomValue) for _, rec := range records { fmt.Println("tunnel record detail:", rec.String()) } fmt.Println("a round of records consumption finished") return nil } // Configure the callback function. Information about the callback function is passed to SimpleProcessFactory. Configure TunnelWorkerConfig for the consumer. workConfig := &tunnel.TunnelWorkerConfig{ ProcessorFactory: &tunnel.SimpleProcessFactory{ CustomValue: "user custom interface{} value", ProcessFunc: exampleConsumeFunction, }, } // Use TunnelDaemon to continuously consume the specified tunnel. tunnelId := "<TUNNEL_ID>" daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig) log.Fatal(daemon.Run())
Appendix: Sample code for initializing a TunnelClient instance by using temporary access credentials
import (
otscommon "github.com/aliyun/aliyun-tablestore-go-sdk/common"
"github.com/aliyun/aliyun-tablestore-go-sdk/tunnel"
"sync"
"time"
)
type RefreshClient struct {
lastRefresh time.Time
refreshIntervalInMin int
}
func NewRefreshClient(intervalInMin int) *RefreshClient {
return &RefreshClient{
refreshIntervalInMin: intervalInMin,
}
}
func (c *RefreshClient) IsExpired() bool {
now := time.Now()
if c.lastRefresh.IsZero() || now.Sub(c.lastRefresh) > time.Duration(c.refreshIntervalInMin)*time.Minute {
return true
}
return false
}
func (c *RefreshClient) Update() {
c.lastRefresh = time.Now()
}
type clientCredentials struct {
accessKeyID string
accessKeySecret string
securityToken string
}
func newClientCredentials(accessKeyID string, accessKeySecret string, securityToken string) *clientCredentials {
return &clientCredentials{accessKeyID: accessKeyID, accessKeySecret: accessKeySecret, securityToken: securityToken}
}
func (c *clientCredentials) GetAccessKeyID() string {
return c.accessKeyID
}
func (c *clientCredentials) GetAccessKeySecret() string {
return c.accessKeySecret
}
func (c *clientCredentials) GetSecurityToken() string {
return c.securityToken
}
type OTSCredentialsProvider struct {
refresh *RefreshClient
cred *clientCredentials
lock sync.Mutex
}
func NewOTSCredentialsProvider() *OTSCredentialsProvider {
return &OTSCredentialsProvider{
// Modify the refresh cycle for temporary access credentials based on your business requirements. The refresh cycle must be shorter than the validity period of the temporary access credentials.
refresh: NewRefreshClient(30),
}
}
func (p *OTSCredentialsProvider) renewCredentials() error {
if p.cred == nil || p.refresh.IsExpired() {
// Obtain temporary access credentials. You can call the AssumeRole operation of RAM to obtain the AccessKey ID, AccessKey secret, security token, and validity period of the temporary access credentials.
// Configure the following parameters. For information about RAM SDKs, see the documentation of RAM.
// resp, err := GetUserOtsStsToken()
accessKeyId := ""
accessKeySecret := ""
stsToken := ""
p.cred = newClientCredentials(accessKeyId, accessKeySecret, stsToken)
p.refresh.Update()
}
return nil
}
func (p *OTSCredentialsProvider) GetCredentials() otscommon.Credentials {
p.lock.Lock()
defer p.lock.Unlock()
if err := p.renewCredentials(); err != nil {
// log error
if p.cred == nil {
return newClientCredentials("", "", "")
}
}
return p.cred
}
// NewTunnelClientWithToken is used to initialize a TunnelClient instance with the feature of refreshing temporary access credentials.
func NewTunnelClientWithToken(endpoint, instanceName, accessId, accessKey, token string) tunnel.TunnelClient {
return tunnel.NewTunnelClientWithToken(
endpoint,
instanceName,
"",
"",
"",
nil,
tunnel.SetCredentialsProvider(NewOTSCredentialsProvider()),
)
}