使用SDK快速體驗通道服務功能。使用前,您需要瞭解使用通道服務的注意事項、介面等資訊。
注意事項
- TunnelWorkerConfig中預設會啟動讀資料和處理資料的線程池。如果使用的是單台機器,當需要啟動多個TunnelWorker時,建議共用一個TunnelWorkerConfig。
- 在建立全量加增量類型的Tunnel時,由於Tunnel的增量日誌最多會保留7天(具體的值和資料表的Stream的日誌到期時間一致),全量資料如果在7天內沒有消費完成,則此Tunnel進入增量階段會出現OTSTunnelExpired錯誤,導致增量資料無法消費。如果您預估全量資料無法在7天內消費完成,請及時聯絡Tablestore支援人員進行諮詢。
- TunnelWorker的初始化需要預熱時間,該值受TunnelWorkerConfig中的heartbeatIntervalInSec參數影響,可以通過TunnelWorkerConfig中的setHeartbeatIntervalInSec方法配置,預設為30s,最小值為5s。
- 當Tunnel從全量切換至增量階段時,全量的Channel會結束,增量的Channel會啟動,此階段會有初始化時間,該值也受TunnelWorkerConfig中的heartbeatIntervalInSec參數影響。
- 當用戶端(TunnelWorker)沒有被正常shutdown時(例如異常退出或者手動結束),TunnelWorker會自動進行資源的回收,包括釋放線程池,自動調用使用者在Channel上註冊的shutdown方法,關閉Tunnel串連等。
介面
介面 | 說明 |
---|---|
CreateTunnel | 建立一個通道。 |
ListTunnel | 列舉某個資料表內通道的具體資訊。 |
DescribeTunnel | 描述某個通道裡的具體Channel資訊。 |
DeleteTunnel | 刪除一個通道。 |
使用
您可以使用如下語言的SDK實現通道服務。
體驗通道服務
使用Java SDK最小化的體驗通道服務。
- 初始化Tunnel Client。
//endPoint為Tablestore執行個體的endPoint,例如https://instance.cn-hangzhou.ots.aliyuncs.com。 //accessKeyId和accessKeySecret分別為訪問Tablestore服務的AccessKey的Id和Secret。 //instanceName為執行個體名稱。 final String endPoint = ""; final String accessKeyId = ""; final String accessKeySecret = ""; final String instanceName = ""; TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
- 建立新通道。
請提前建立一張測試表或者使用已有的一張資料表。如果需要建立測試表,可以使用SyncClient中的createTable方法或者使用官網控制台等方式建立。
//支援建立TunnelType.BaseData(全量)、TunnelType.Stream(增量)、TunnelType.BaseAndStream(全量加增量)三種類型的Tunnel。 //如下樣本為建立全量加增量類型的Tunnel,如果需建立其它類型的Tunnel,則將CreateTunnelRequest中的TunnelType設定為相應的類型。 final String tableName = "testTable"; final String tunnelName = "testTunnel"; CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream); CreateTunnelResponse resp = tunnelClient.createTunnel(request); //tunnelId用於後續TunnelWorker的初始化,該值也可以通過ListTunnel或者DescribeTunnel擷取。 String tunnelId = resp.getTunnelId(); System.out.println("Create Tunnel, Id: " + tunnelId);
- 使用者自訂資料消費Callback,開始自動化的資料消費。
//使用者自訂資料消費Callback,即實現IChannelProcessor介面(process和shutdown)。 private static class SimpleProcessor implements IChannelProcessor { @Override public void process(ProcessRecordsInput input) { //ProcessRecordsInput中包含有拉取到的資料。 System.out.println("Default record processor, would print records count"); System.out.println( //NextToken用於Tunnel Client的翻頁。 String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken())); try { //類比消費處理。 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void shutdown() { System.out.println("Mock shutdown"); } } //TunnelWorkerConfig預設會啟動讀資料和處理資料的線程池。 //如果使用的是單台機器,當需要啟動多個TunnelWorker時,建議共用一個TunnelWorkerConfig。TunnelWorkerConfig中包括更多的進階參數。 TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); //配置TunnelWorker,並啟動自動化的資料處理任務。 TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); config.shutdown(); worker.shutdown(); tunnelClient.shutdown(); }
配置TunnelWorkerConfig
TunnelWorkerConfig提供Tunnel Client的自訂配置,可根據實際需要配置參數,參數說明請參見下表。
配置 | 參數 | 說明 |
---|---|---|
Heartbeat的間隔和逾時時間 | heartbeatTimeoutInSec | Heartbeat的逾時間隔。 預設值為300s。 當Heartbeat發生逾時,Tunnel服務端會認為當前TunnelClient不可用(失活),用戶端需要重新進行ConnectTunnel。 |
heartbeatIntervalInSec | 進行Heartbeat的間隔。 Heartbeat用於活躍Channel的探測、Channel狀態的更新、(自動化)資料拉取任務的初始化等。 預設值為30s,最小支援配置到5s,單位為s。 | |
記錄消費位點的時間間隔 | checkpointIntervalInMillis | 使用者消費完資料後,向Tunnel服務端進行記錄消費位點操作(checkpoint)的時間間隔。 預設值為5000ms,單位為ms。 说明
|
用戶端的自訂標識 | clientTag | 用戶端的自訂標識,可以產生Tunnel Client ID,用於區分TunnelWorker。 |
資料處理的自訂Callback | channelProcessor | 使用者註冊的處理資料的Callback,包括process和shutdown方法。 |
資料讀取和資料處理的線程池資源配置 | readRecordsExecutor | 用於資料讀取的線程池資源。無特殊需求,建議使用預設的配置。 |
processRecordsExecutor | 用於處理資料的線程池資源。無特殊需求,建議使用預設的配置。 说明
| |
記憶體控制 | maxChannelParallel | 讀取和處理資料的最大Channel並行度,可用於記憶體控制。 預設值為-1,表示不限制最大並行度。 说明 僅Java SDK 5.10.0及以上版本支援此功能。 |
最大退避時間配置 | maxRetryIntervalInMillis | Tunnel的最大退避時間基準值,最大退避時間在此基準值附近隨機變化,具體範圍為0.75*maxRetryIntervalInMillis~1.25*maxRetryIntervalInMillis。 預設值為2000ms,最小值為200ms。 说明
|
CLOSING分區狀態檢測 | enableClosingChannelDetect | 是否開啟CLOSING分區即時檢測。類型為Boolean,預設值為false,表示不開啟CLOSING分區即時檢測。 说明
|
附錄:完整代碼
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
private static class SimpleProcessor implements IChannelProcessor {
@Override
public void process(ProcessRecordsInput input) {
System.out.println("Default record processor, would print records count");
System.out.println(
//NextToken用於Tunnel Client的翻頁。
String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
try {
//類比消費處理。
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void shutdown() {
System.out.println("Mock shutdown");
}
}
public static void main(String[] args) throws Exception {
//1.初始化Tunnel Client。
final String endPoint = "";
final String accessKeyId = "";
final String accessKeySecret = "";
final String instanceName = "";
TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
//2.建立新通道(此步驟需要提前建立一張測試表,可以使用SyncClient的createTable或者使用官網控制台等方式建立)。
final String tableName = "testTable";
final String tunnelName = "testTunnel";
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse resp = tunnelClient.createTunnel(request);
//tunnelId用於後續TunnelWorker的初始化,該值也可以通過ListTunnel或者DescribeTunnel擷取。
String tunnelId = resp.getTunnelId();
System.out.println("Create Tunnel, Id: " + tunnelId);
//3.使用者自訂資料消費Callback,開始自動化的資料消費。
//TunnelWorkerConfig中有更多的進階參數。
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
try {
worker.connectAndWorking();
} catch (Exception e) {
e.printStackTrace();
config.shutdown();
worker.shutdown();
tunnelClient.shutdown();
}
}
}