トンネルサービスを使用すると、テーブル内のデータを使用できます。このトピックでは、Tablestore SDK for Go を使用してトンネルサービスを開始する方法について説明します。トンネルサービスを使用する前に、トンネルサービスの使用上の注意をよく理解しておいてください。
使用上の注意
デフォルトでは、システムは TunnelWorkerConfig に基づいてデータを読み取り、処理するためのスレッドプールを起動します。単一のサーバーで複数の TunnelWorker を起動する場合は、すべての TunnelWorker の設定に同じ TunnelWorkerConfig を使用することをお勧めします。
TunnelWorker には、初期化のためのウォームアップ期間が必要です。これは、TunnelWorkerConfig の HeartbeatInterval パラメーターで指定します。デフォルト値:30。単位:秒。
予期しない終了または手動による終了が原因で TunnelWorker クライアントがシャットダウンされた場合、TunnelWorker は次のいずれかの方法を使用してリソースを自動的にリサイクルします。スレッドプールを解放し、Channel クラスに登録したシャットダウンメソッドを自動的に呼び出し、トンネルをシャットダウンします。
トンネル内の増分ログの保存期間は、Stream ログの保存期間と同じです。Stream ログは最大 7 日間保存できます。したがって、トンネル内の増分ログは最大 7 日間保存できます。
差分データまたは増分データを使用するためにトンネルを作成する場合は、次の点に注意してください。
完全データ使用中に、トンネルが増分ログの保存期間(最大 7 日間)内に完全データの使用を完了できなかった場合、トンネルが増分ログの使用を開始するときに
OTSTunnelExpiredエラーが発生します。その結果、トンネルは増分ログを使用できません。指定されたタイムウィンドウ内にトンネルが完全データ使用を完了できないと予想される場合は、Tablestore テクニカルサポートに
増分データ使用中に、トンネルが増分ログの保存期間(最大 7 日間)内に増分ログの使用を完了できなかった場合、トンネルは利用可能な最新のデータからデータを使用する可能性があります。この場合、特定のデータが使用されない可能性があります。
トンネルの有効期限が切れると、Tablestore はトンネルを無効にする場合があります。トンネルが無効状態のまま 30 日以上経過すると、トンネルは削除されます。削除されたトンネルは復元できません。
前提条件
データテーブルが作成されていること。詳細については、「Tablestore コンソールを使用する」、「Tablestore CLI を使用する」、および「Tablestore SDK を使用する」をご参照ください。
データテーブルが存在するインスタンスのエンドポイントが取得されていること。詳細については、「Tablestore インスタンスのエンドポイントを取得する」をご参照ください。
アクセス認証情報が設定されていること。詳細については、「アクセス認証情報を設定する」をご参照ください。
トンネルサービスの開始方法
TunnelClient インスタンスを初期化します。
TunnelClient インスタンスを初期化するときに、認証に長期アクセス認証情報または一時アクセス認証情報を使用できます。
初期化に長期アクセス認証情報を使用する
TABLESTORE_ACCESS_KEY_ID環境変数とTABLESTORE_ACCESS_KEY_SECRET環境変数が設定されていることを確認します。TABLESTORE_ACCESS_KEY_ID 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。TABLESTORE_ACCESS_KEY_SECRET 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレットを指定します。警告Alibaba Cloud アカウントは、アカウントのすべてのリソースへのフルアクセス権を持っています。Alibaba Cloud アカウントの AccessKey ペアの漏洩は、システムにとって重大な脅威となります。したがって、TunnelClient インスタンスを初期化するには、最小限の必要な権限が付与された RAM ユーザーの AccessKey ペアを使用することをお勧めします。
// endpoint パラメーターを Tablestore インスタンスのエンドポイントに設定します。例:https://instance.cn-hangzhou.ots.aliyuncs.com。 // インスタンスの名前を指定します。 // Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey シークレットを指定します。 endpoint := "yourEndpoint" instance := "yourInstance" accessKeyId := os.Getenv("TABLESTORE_ACCESS_KEY_ID") accessKeySecret := os.Getenv("TABLESTORE_ACCESS_KEY_SECRET") tunnelClient := tunnel.NewTunnelClient(endpoint, instance, accessKeyId, accessKeySecret)初期化に一時アクセス認証情報を使用する
Tablestore SDK for Go を使用して Tablestore に一時的にアクセスする場合は、セキュリティトークンサービス(STS)を使用して一時アクセス認証情報を生成できます。詳細については、「一時アクセス認証情報を設定する」をご参照ください。
トンネルクライアントは、NewTunnelClientWithToken 操作を提供します。この操作を呼び出すと、一時アクセス認証情報に基づいて TunnelClient インスタンスを初期化できます。このトピックでは、定期的に更新できる一時アクセス認証情報を使用して TunnelClient インスタンスを初期化するためのサンプルコードを提供します。詳細については、「付録:一時アクセス認証情報を使用して TunnelClient インスタンスを初期化するためのサンプルコード」をご参照ください。
トンネルを作成します。
req := &tunnel.CreateTunnelRequest{ TableName: "<TABLE_NAME>", TunnelName: "<TUNNEL_NAME>", Type: tunnel.TunnelTypeBaseStream, // BaseAndStream トンネルを作成します。 } resp, err := tunnelClient.CreateTunnel(req) if err != nil { log.Fatal("テストトンネルの作成に失敗しました", err) } log.Println("トンネル ID は", resp.TunnelId)カスタムコールバック関数を指定して、自動データ使用を開始します。
// カスタムコールバック関数を指定します。 func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error { fmt.Println("ユーザー定義情報", ctx.CustomValue) for _, rec := range records { fmt.Println("トンネルレコードの詳細:", rec.String()) } fmt.Println("レコード使用のラウンドが完了しました") return nil } // コールバック関数を設定します。コールバック関数に関する情報は SimpleProcessFactory に渡されます。コンシューマーの TunnelWorkerConfig を設定します。 workConfig := &tunnel.TunnelWorkerConfig{ ProcessorFactory: &tunnel.SimpleProcessFactory{ CustomValue: "ユーザーカスタム interface{} 値", ProcessFunc: exampleConsumeFunction, }, } // TunnelDaemon を使用して、指定されたトンネルを継続的に使用します。 tunnelId := "<TUNNEL_ID>" daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig) log.Fatal(daemon.Run())
付録:一時アクセス認証情報を使用して TunnelClient インスタンスを初期化するためのサンプルコード
import (
otscommon "github.com/aliyun/aliyun-tablestore-go-sdk/common"
"github.com/aliyun/aliyun-tablestore-go-sdk/tunnel"
"sync"
"time"
)
type RefreshClient struct {
lastRefresh time.Time
refreshIntervalInMin int
}
func NewRefreshClient(intervalInMin int) *RefreshClient {
return &RefreshClient{
refreshIntervalInMin: intervalInMin,
}
}
func (c *RefreshClient) IsExpired() bool {
now := time.Now()
if c.lastRefresh.IsZero() || now.Sub(c.lastRefresh) > time.Duration(c.refreshIntervalInMin)*time.Minute {
return true
}
return false
}
func (c *RefreshClient) Update() {
c.lastRefresh = time.Now()
}
type clientCredentials struct {
accessKeyID string
accessKeySecret string
securityToken string
}
func newClientCredentials(accessKeyID string, accessKeySecret string, securityToken string) *clientCredentials {
return &clientCredentials{accessKeyID: accessKeyID, accessKeySecret: accessKeySecret, securityToken: securityToken}
}
func (c *clientCredentials) GetAccessKeyID() string {
return c.accessKeyID
}
func (c *clientCredentials) GetAccessKeySecret() string {
return c.accessKeySecret
}
func (c *clientCredentials) GetSecurityToken() string {
return c.securityToken
}
type OTSCredentialsProvider struct {
refresh *RefreshClient
cred *clientCredentials
lock sync.Mutex
}
func NewOTSCredentialsProvider() *OTSCredentialsProvider {
return &OTSCredentialsProvider{
// ビジネス要件に基づいて、一時アクセス認証情報の更新サイクルを変更します。更新サイクルは、一時アクセス認証情報の有効期間よりも短くする必要があります。
refresh: NewRefreshClient(30),
}
}
func (p *OTSCredentialsProvider) renewCredentials() error {
if p.cred == nil || p.refresh.IsExpired() {
// 一時アクセス認証情報を取得します。RAM の AssumeRole 操作を呼び出して、一時アクセス認証情報の AccessKey ID、AccessKey シークレット、セキュリティトークン、および有効期間を取得できます。
// 次のパラメーターを設定します。RAM SDK の詳細については、RAM のドキュメントを参照してください。
// resp, err := GetUserOtsStsToken()
accessKeyId := ""
accessKeySecret := ""
stsToken := ""
p.cred = newClientCredentials(accessKeyId, accessKeySecret, stsToken)
p.refresh.Update()
}
return nil
}
func (p *OTSCredentialsProvider) GetCredentials() otscommon.Credentials {
p.lock.Lock()
defer p.lock.Unlock()
if err := p.renewCredentials(); err != nil {
// エラーをログに記録します
if p.cred == nil {
return newClientCredentials("", "", "")
}
}
return p.cred
}
// NewTunnelClientWithToken は、一時アクセス認証情報を更新する機能を使用して TunnelClient インスタンスを初期化するために使用されます。
func NewTunnelClientWithToken(endpoint, instanceName, accessId, accessKey, token string) tunnel.TunnelClient {
return tunnel.NewTunnelClientWithToken(
endpoint,
instanceName,
"",
"",
"",
nil,
tunnel.SetCredentialsProvider(NewOTSCredentialsProvider()),
)
}