All Products
Search
Document Center

Tablestore:Getting started

Last Updated:Aug 17, 2023

This topic describes how to use Tunnel Service with Tablestore SDK for Go. Before you use Tunnel Service, you need to familiarize yourself with its usage notes.

Usage notes

  • By default, the system starts a thread pool to read and process data based on TunnelWorkerConfig. If you want to start multiple TunnelWorkers on a single server, we recommend that you configure the TunnelWorkers to share the same TunnelWorkerConfig.

  • If you create a differential tunnel to consume full and incremental data, the incremental logs of the tunnel are retained for up to seven days. The specific expiration time of incremental logs is the same with that of the logs in the streams for a data table. If the tunnel does not consume full data within seven days, an OTSTunnelExpired error occurs when the tunnel starts to consume incremental data. As a result, the tunnel cannot consume incremental data. If you estimate that the tunnel cannot consume full data within seven days, contact Tablestore technical support or join DingTalk group 23307953 to consult the issue.

  • TunnelWorker requires time to warm up for initialization, which is determined by the heartbeatIntervalInSec parameter in TunnelWorkerConfig. You can use the setHeartbeatIntervalInSec method in TunnelWorkerConfig to set this parameter. Default value: 30. Minimum value: 5. Unit: seconds.

  • When the mode switches from the full channel to the incremental channel, the full channel is closed and the incremental channel is started. This process requires another period of time for initialization, which is also specified by the heartbeatIntervalInSec parameter.

  • When the TunnelWorker client is shut down due to an unexpected exit or manual termination, TunnelWorker uses one of the following methods to automatically recycle resources: Release the thread pool, automatically use the shutdown method that you have registered for the Channel class, and shut down the tunnel.

Prerequisites

  • Tunnel Service is installed. For more information, see Install Tunnel Service.

  • The endpoint that you want to use is obtained. For more information, see Obtain an endpoint.

  • An AccessKey pair is configured. For more information, see Configure an AccessKey pair.

  • The AccessKey pair is configured in environment variables. For more information, see Configure environment variables.

    The OTS_AK_ENV environment variable indicates the AccessKey ID of an Alibaba Cloud account or an RAM user. The OTS_SK_ENV environment variable indicates the AccessKey secret of an Alibaba Cloud account or an RAM user. Specify the AccessKey pair based on your requirements.

Use Tunnel Service

  1. Initialize a Tunnel client instance.

    When you initialize a Tunnel client instance, you can use the AccessKey pair of your Alibaba Cloud account or an RAM user or use the temporary access credentials that you obtained from STS to authenticate requests. An AccessKey pair consists of an AccessKey ID and an AccessKey secret. Temporary access credentials consist of a temporary AccessKey ID, a temporary AccessKey secret, and a security token.

    • Use the AccessKey pair of your Alibaba Cloud account or an RAM user to initialize a Tunnel client instance.

      Important
      • The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations in Tablestore is a high-risk operation. We recommend that you use an RAM user to call API operations or perform routine O&M.

      • If the AccessKey pair of an Alibaba Cloud account is leaked, the resources that belong to the account are exposed to potential risks. To ensure account security, we recommend that you create an AccessKey pair for an RAM user instead of an Alibaba Cloud account. For more information, see Create an AccessKey pair.

      The following sample code provides an example on how to use the AccessKey ID and AccessKey secret that you obtained to initialize a Tunnel client instance:

      // Set the endpoint to that of the Tablestore instance. Example: https://instance.cn-hangzhou.ots.aliyun.com. 
      // Specify the name of the instance. 
      // Specify the AccessKey ID and AccessKey secret of your Alibaba Cloud account or a RAM user. 
      accessKeyId := os.Getenv("OTS_AK_ENV")
      accessKeySecret := os.Getenv("OTS_SK_ENV")
      tunnelClient := tunnel.NewTunnelClient(endpoint, instance, accessKeyId, accessKeySecret)                    
    • Use the temporary access credentials that you obtained from STS to initialize a Tunnel client instance.

      Note

      If you want to authorize temporary access, you can use the following method to initialize a Tunnel client instance. For more information, see Configure user permissions.

      The Tunnel client instance provides the NewTunnelClientWithToken operation. You can call the operation to initialize a Tunnel client instance by using temporary access credentials. This topic provides sample code for initializing a Tunnel client instance by using temporary access credentials that can be periodically refreshed. For information, see Appendix: Complete sample code for initializing a Tunnel client instance by using temporary access credentials.

  2. Create a tunnel.

    req := &tunnel.CreateTunnelRequest{
       TableName:  "testTable",
       TunnelName: "testTunnel",
       Type:       tunnel.TunnelTypeBaseStream, // Create a BaseAndStream tunnel. 
    }
    resp, err := tunnelClient.CreateTunnel(req)
    if err != nil {
       log.Fatal("create test tunnel failed", err)
    }
    log.Println("tunnel id is", resp.TunnelId)
  3. Specify a custom callback function to start automatic data consumption.

    // Specify a custom callback function. 
    func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error {
        fmt.Println("user-defined information", ctx.CustomValue)
        for _, rec := range records {
            fmt.Println("tunnel record detail:", rec.String())
        }
        fmt.Println("a round of records consumption finished")
        return nil
    }
    
    // Configure the callback function. Information about the callback function is passed to SimpleProcessFactory. Configure TunnelWorkerConfig for the consumer. 
    workConfig := &tunnel.TunnelWorkerConfig{
       ProcessorFactory: &tunnel.SimpleProcessFactory{
          CustomValue: "user custom interface{} value",
          ProcessFunc: exampleConsumeFunction,
       },
    }
    
    // Use TunnelDaemon to continuously consume the specified tunnel. 
    daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig)
    log.Fatal(daemon.Run())

Appendix: Complete sample code for initializing a Tunnel client instance by using temporary access credentials

Note

For more information about how to call the AssumeRole operation of Resource Access Management (RAM) to obtain temporary access credentials from STS, see STS SDK overview and AssumeRole.

import (
    otscommon "github.com/aliyun/aliyun-tablestore-go-sdk/common"
    "github.com/aliyun/aliyun-tablestore-go-sdk/tunnel"
    "sync"
    "time"
)

type RefreshClient struct {
    lastRefresh          time.Time
    refreshIntervalInMin int
}

func NewRefreshClient(intervalInMin int) *RefreshClient {
    return &RefreshClient{
        refreshIntervalInMin: intervalInMin,
    }
}

func (c *RefreshClient) IsExpired() bool {
    now := time.Now()
    if c.lastRefresh.IsZero() || now.Sub(c.lastRefresh) > time.Duration(c.refreshIntervalInMin)*time.Minute {
        return true
    }

    return false
}

func (c *RefreshClient) Update() {
    c.lastRefresh = time.Now()
}

type clientCredentials struct {
    accessKeyID     string
    accessKeySecret string
    securityToken   string
}

func newClientCredentials(accessKeyID string, accessKeySecret string, securityToken string) *clientCredentials {
    return &clientCredentials{accessKeyID: accessKeyID, accessKeySecret: accessKeySecret, securityToken: securityToken}
}

func (c *clientCredentials) GetAccessKeyID() string {
    return c.accessKeyID
}

func (c *clientCredentials) GetAccessKeySecret() string {
    return c.accessKeySecret
}

func (c *clientCredentials) GetSecurityToken() string {
    return c.securityToken
}

type OTSCredentialsProvider struct {
    refresh *RefreshClient
    cred    *clientCredentials
    lock    sync.Mutex
}

func NewOTSCredentialsProvider() *OTSCredentialsProvider {
    return &OTSCredentialsProvider{
        // Modify the refresh cycle for temporary access credentials based on your business requirements. The refresh cycle must be shorter than the validity period of the temporary access credentials. 
        refresh: NewRefreshClient(30),
    }
}

func (p *OTSCredentialsProvider) renewCredentials() error {
    if p.cred == nil || p.refresh.IsExpired() {
        // Obtain temporary access credentials. You can call the AssumeRole operation of RAM to obtain the AccessKey ID, AccessKey secret, security token, and validity period of the temporary access credentials. 
        // Configure the following parameters. For information about RAM SDKs, see the documentation of RAM.  
        // resp, err := GetUserOtsStsToken()
        accessKeyId := ""
        accessKeySecret := ""
        stsToken := ""
        p.cred = newClientCredentials(accessKeyId, accessKeySecret, stsToken)
        p.refresh.Update()
    }

    return nil
}

func (p *OTSCredentialsProvider) GetCredentials() otscommon.Credentials {
    p.lock.Lock()
    defer p.lock.Unlock()

    if err := p.renewCredentials(); err != nil {
        // log error
        if p.cred == nil {
            return newClientCredentials("", "", "")
        }
    }

    return p.cred
}

// NewTunnelClientWithToken is used to initialize a Tunnel client instance with the feature of refreshing STS tokens. 
func NewTunnelClientWithToken(endpoint, instanceName, accessId, accessKey, token string) tunnel.TunnelClient {
    return tunnel.NewTunnelClientWithToken(
        endpoint,
        instanceName,
        "",
        "",
        "",
        nil,
        tunnel.SetCredentialsProvider(NewOTSCredentialsProvider()),
    )
}