全部產品
Search
文件中心

Tablestore:通過SDK使用通道服務

更新時間:Feb 21, 2025

本文介紹如何通過 SDK 快速體驗通道服務。在使用通道服務前,您需要瞭解使用通道服務的注意事項、介面等資訊。

注意事項

  • TunnelWorkerConfig 中預設會啟動讀資料和處理資料的線程池。如果使用的是單台機器,當需要啟動多個 TunnelWorker 時,建議共用一個 TunnelWorkerConfig。

  • TunnelWorker 的初始化需要預熱時間,該值受 TunnelWorkerConfig 中的 heartbeatIntervalInSec 參數影響,可以通過 TunnelWorkerConfig 中的 setHeartbeatIntervalInSec 方法配置,預設為 30 s。

  • 當用戶端(TunnelWorker)沒有被正常 shutdown 時(例如異常退出或者手動結束),TunnelWorker 會自動進行資源的回收,包括釋放線程池,自動調用使用者在 Channel 上註冊的 shutdown 方法,關閉 Tunnel 串連等。

  • Tunnel 的增量日誌保留時間,其數值與資料表中 Stream 的日誌到期時間長度(最長時間長度為 7 天)保持一致,因此 Tunnel 的增量日誌最多保留 7 天。

  • 增量或者全量加增量類型 Tunnel 消費資料時,可能會出現以下情況:

    • 當 Tunnel 處於全量階段時,如果全量資料在增量日誌保留時間內(最多保留 7 天)未能完成消費,將會觸發 OTSTunnelExpired 錯誤,從而導致無法繼續消費後續資料。

      如果您預計全量資料無法在指定時間內完成消費,請及時聯絡Table Store支援人員進行諮詢。

    • 當 Tunnel 處於增量階段時,如果增量資料在增量日誌保留時間內(最多保留 7 天)未能完成消費,Tunnel 將可能從最近可消費的資料處開始消費,因此存在漏消費資料的風險。

  • Tunnel 到期後,Table Store可能會禁用該 Tunnel。如果禁用狀態持續超過 30 天,則該 Tunnel 將被徹底刪除,刪除後將無法恢複。

介面

介面

說明

CreateTunnel

建立一個通道。

ListTunnel

列舉某個資料表內通道的具體資訊。

DescribeTunnel

描述某個通道裡的具體 Channel 資訊。

DeleteTunnel

刪除一個通道。

使用

您可以使用如下語言的SDK實現通道服務。

前提條件

體驗通道服務

使用 Java SDK 最小化的體驗通道服務。

  1. 初始化 Tunnel Client。

    說明

    在運行本程式碼範例之前,請確保已設定環境變數TABLESTORE_ACCESS_KEY_IDTABLESTORE_ACCESS_KEY_SECRET,這兩個變數分別對應阿里雲帳號或 RAM 使用者的 AccessKey ID 和 AccessKey Secret。

    //instanceName為執行個體名稱。
    //endPoint為Table Store執行個體的endPoint,例如https://instance.cn-hangzhou.ots.aliyuncs.com。
    //accessKeyId和accessKeySecret分別為訪問Table Store服務的AccessKey的Id和Secret。
    final String instanceName = "yourInstanceName";
    final String endPoint = "yourEndpoint";
    final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
  2. 建立新通道。

    請提前建立一張測試表或者使用已有的一張資料表。如果需要建立測試表,可以使用 SyncClient 中的 createTable 方法或者使用官網控制台等方式建立。

    重要

    建立增量或者全量加增量類型的通道時,時間戳記的配置規則如下:

    • 如果不指定增量資料的起始時間戳記,則起始時間戳記為建立通道的時間。

    • 如果指定增量資料的起始時間戳記(startTime)和結束時間戳記(endTime),其取值範圍為[當前系統時間-Stream到期時間+5分鐘 , 當前系統時間],單位為毫秒。

      • Stream 到期時間為增量日誌到期時間長度的毫秒單位時間戳記,最大值為 7 天。您可以在為資料表開啟 Stream 功能時設定,到期時間長度一經設定不能修改。

      • 結束時間戳記的取值必須大於起始時間戳記。

    //支援建立TunnelType.BaseData(全量)、TunnelType.Stream(增量)、TunnelType.BaseAndStream(全量加增量)三種類型的Tunnel。
    //如下樣本為建立全量加增量類型的Tunnel,如果需建立其它類型的Tunnel,則將CreateTunnelRequest中的TunnelType設定為相應的類型。
    final String tableName = "testTable";
    final String tunnelName = "testTunnel";
    CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = tunnelClient.createTunnel(request);
    //tunnelId用於後續TunnelWorker的初始化,該值也可以通過ListTunnel或者DescribeTunnel擷取。
    String tunnelId = resp.getTunnelId(); 
    System.out.println("Create Tunnel, Id: " + tunnelId);
  3. 使用者自訂資料消費 Callback,開始自動化的資料消費。

    //使用者自訂資料消費Callback,即實現IChannelProcessor介面(process和shutdown)。
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            //ProcessRecordsInput中包含有拉取到的資料。
            System.out.println("Default record processor, would print records count");
            System.out.println(
                //NextToken用於Tunnel Client的翻頁。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                //類比消費處理。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    
    //TunnelWorkerConfig預設會啟動讀資料和處理資料的線程池。
    //如果使用的是單台機器,當需要啟動多個TunnelWorker時,建議共用一個TunnelWorkerConfig。TunnelWorkerConfig中包括更多的進階參數。
    TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    //配置TunnelWorker,並啟動自動化的資料處理任務。
    TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
    try {
        worker.connectAndWorking();
    } catch (Exception e) {
        e.printStackTrace();
        config.shutdown();
        worker.shutdown();
        tunnelClient.shutdown();
    }

配置 TunnelWorkerConfig

TunnelWorkerConfig 提供 Tunnel Client 的自訂配置,可根據實際需要配置參數,Java SDK 中的參數說明請參見下表。

配置

參數

說明

Heartbeat 的間隔和逾時時間

heartbeatTimeoutInSec

Heartbeat 的逾時間隔。預設值為 300 s。

當 Heartbeat 發生逾時,Tunnel 服務端會認為當前 TunnelClient 不可用(失活),用戶端需要重新進行 ConnectTunnel。

heartbeatIntervalInSec

進行 Heartbeat 的間隔。 預設值為 30 s,最小支援配置到 5 s,單位為 s。

Heartbeat 用於活躍 Channel 的探測、Channel 狀態的更新、(自動化)資料拉取任務的初始化等。

記錄消費位點的時間間隔

checkpointIntervalInMillis

使用者消費完資料後,向 Tunnel 服務端進行記錄消費位點操作(checkpoint)的時間間隔。

預設值為 5000 ms,單位為 ms。

說明
  • 因為讀取任務所在機器不同,進程可能會遇到各種類型的錯誤。例如因為環境因素重啟,需要定期對處理完的資料做記錄(checkpoint)。當任務重啟後,會接著上次的 checkpoint 繼續往後做。在極端情況下,通道服務不保證傳給您的記錄只有一次,只保證資料至少傳一次,且記錄的順序不變。如果出現局部資料重複發送的情況,需要您注意業務的處理邏輯。

  • 如果希望減少在出錯情況下資料的重複處理,可以增加做 checkpoint 的頻率。但是過於頻繁的 checkpoint 會降低系統的輸送量,請根據自身業務特點決定 checkpoint 的操作頻率。

用戶端的自訂標識

clientTag

用戶端的自訂標識,可以產生 Tunnel Client ID,用於區分 TunnelWorker。

資料處理的自訂 Callback

channelProcessor

使用者註冊的處理資料的 Callback,包括 process 和 shutdown 方法。

資料讀取和資料處理的線程池資源配置

readRecordsExecutor

用於資料讀取的線程池資源。無特殊需求,建議使用預設的配置。

processRecordsExecutor

用於處理資料的線程池資源。無特殊需求,建議使用預設的配置。

說明
  • 自訂上述線程池時,線程池中的線程數要和 Tunnel 中的 Channel 數儘可能一致,此時可以保障每個 Channel 都能很快地分配到計算資源(CPU)。

  • 在預設線程池配置中,為了保證輸送量,Table Store進行了如下操作:

    • 預設預先分配 32 個核心線程,以保障資料較小時(Channel數較少時)的即時輸送量。

    • 工作隊列的大小適當調小,當使用者資料量比較大(Channel數較多)時,可以更快觸發線程池建立線程的策略,及時彈出更多的計算資源。

    • 設定了預設的線程保活時間(預設為 60 s),當資料量下降後,可以及時回收線程資源。

記憶體控制

maxChannelParallel

讀取和處理資料的最大 Channel 並行度,可用於記憶體控制。

預設值為 -1,表示不限制最大並行度。

說明

僅 Java SDK 5.10.0 及以上版本支援此功能。

最大退避時間配置

maxRetryIntervalInMillis

Tunnel 的最大退避時間基準值,最大退避時間在此基準值附近隨機變化,具體範圍為 0.75*maxRetryIntervalInMillis~1.25*maxRetryIntervalInMillis

預設值為 2000 ms,最小值為 200 ms。

說明
  • 僅 Java SDK 5.4.0 及以上版本支援此功能。

  • Tunnel 對於資料量較小的情況(單次拉取小於 900 KB 或 500 條)會進行一定時間的指數退避,直至達到最大退避時間。

CLOSING 分區狀態檢測

enableClosingChannelDetect

是否開啟 CLOSING 分區即時檢測。預設值為 false,表示不開啟 CLOSING 分區即時檢測。

說明
  • 僅 Java SDK 5.13.13 及以上版本支援此功能。

  • 未開啟此功能時,在某些極端情境(包括但不限於通道分區數較多但用戶端資源較低等)下,會出現分區卡住不消費的情況。

  • CLOSING 分區指調度中的分區,表示該分區正在切換 Tunnel Client,會調度到其他 Tunnel Client。

附錄:完整代碼

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            System.out.println("Default record processor, would print records count");
            System.out.println(
                //NextToken用於Tunnel Client的翻頁。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                //類比消費處理。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    public static void main(String[] args) throws Exception {
        //1.初始化Tunnel Client。
        // yourInstanceName 填寫您的執行個體名稱
        final String instanceName = "yourInstanceName";
        // yourEndpoint 填寫您的執行個體訪問地址
        final String endPoint = "yourEndpoint";
         // 擷取環境變數裡的 AccessKey ID 和 AccessKey Secret
        final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
        final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
        TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
        //2.建立新通道(此步驟需要提前建立一張測試表,可以使用SyncClient的createTable或者使用官網控制台等方式建立)。
        final String tableName = "testTable";
        final String tunnelName = "testTunnel";
        CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
        CreateTunnelResponse resp = tunnelClient.createTunnel(request);
        //tunnelId用於後續TunnelWorker的初始化,該值也可以通過ListTunnel或者DescribeTunnel擷取。
        String tunnelId = resp.getTunnelId();
        System.out.println("Create Tunnel, Id: " + tunnelId);
        //3.使用者自訂資料消費Callback,開始自動化的資料消費。
        //TunnelWorkerConfig中有更多的進階參數。
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
        try {
            worker.connectAndWorking();
        } catch (Exception e) {
            e.printStackTrace();
            config.shutdown();
            worker.shutdown();
            tunnelClient.shutdown();
        }
    }
}