Table Store支援多種方式實現資料表間的資料移轉或同步。可通過通道服務、DataWorks、DataX或命令列工具等方式完成資料表到資料表的同步操作。
前提條件
擷取來源資料表和目標資料表的執行個體名稱、執行個體訪問地址、地區ID等資訊。
為阿里雲帳號或具有Table Store許可權的RAM使用者建立AccessKey。
使用SDK同步資料
基於通道服務實現資料表間的資料同步,支援同地區、跨地區以及跨帳號的資料同步情境。通過通道服務擷取資料變更記錄,並將變更即時同步至目標表。以下以Java SDK為例示範同步實現。
運行代碼前,請替換代碼中源表和目標表的資料表名稱、執行個體名稱和執行個體訪問地址,並將AccessKey ID和AccessKey Secret配置為環境變數。
import com.alicloud.openservices.tablestore.*;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.model.tunnel.*;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class TableSynchronization {
// 源表配置項:表名稱、執行個體名稱、執行個體訪問地址、AccessKey ID、AccessKey Secret
final static String sourceTableName = "sourceTableName";
final static String sourceInstanceName = "sourceInstanceName";
final static String sourceEndpoint = "sourceEndpoint";
final static String sourceAccessKeyId = System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_ID");
final static String sourceKeySecret = System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_SECRET");
// 目標表配置項:表名稱、執行個體名稱、執行個體訪問地址、AccessKey ID、AccessKey Secret
final static String targetTableName = "targetTableName";
final static String targetInstanceName = "targetInstanceName";
final static String targetEndpoint = "targetEndpoint";
final static String targetAccessKeyId = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_ID");
final static String targetKeySecret = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_SECRET");
// 通道名稱
static String tunnelName = "source_table_tunnel";
// TablestoreWriter:高並發資料寫入工具
static TableStoreWriter tableStoreWriter;
// 成功和失敗行數統計
static AtomicLong succeedRows = new AtomicLong();
static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) {
// 建立目標表
createTargetTable();
System.out.println("Create target table Done.");
// 初始化 TunnelClient
TunnelClient tunnelClient = new TunnelClient(sourceEndpoint, sourceAccessKeyId, sourceKeySecret, sourceInstanceName);
// 建立資料通道
String tunnelId = createTunnel(tunnelClient);
System.out.println("Create tunnel Done.");
// 初始化 TablestoreWriter
tableStoreWriter = createTablesStoreWriter();
// 通過資料通道同步資料
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
try {
System.out.println("Connect tunnel and working ...");
worker.connectAndWorking();
// 監聽通道狀態,通道狀態從全量同步變成增量同步處理時,表示資料同步完成
while (true) {
if (tunnelClient.describeTunnel(new DescribeTunnelRequest(sourceTableName, tunnelName)).getTunnelInfo().getStage().equals(TunnelStage.ProcessStream)) {
break;
}
Thread.sleep(5000);
}
// 同步結果
System.out.println("Data Synchronization Completed.");
System.out.println("* Succeed Rows Count: " + succeedRows.get());
System.out.println("* Failed Rows Count: " + failedRows.get());
// 刪除通道
tunnelClient.deleteTunnel(new DeleteTunnelRequest(sourceTableName, tunnelName));
// 關閉資源
worker.shutdown();
config.shutdown();
tunnelClient.shutdown();
tableStoreWriter.close();
}catch(Exception e){
e.printStackTrace();
worker.shutdown();
config.shutdown();
tunnelClient.shutdown();
tableStoreWriter.close();
}
}
private static void createTargetTable() throws ClientException {
// 查詢源表資訊
SyncClient sourceClient = new SyncClient(sourceEndpoint, sourceAccessKeyId, sourceKeySecret, sourceInstanceName);
DescribeTableResponse response = sourceClient.describeTable(new DescribeTableRequest(sourceTableName));
// 建立目標表
SyncClient targetClient = new SyncClient(targetEndpoint, targetAccessKeyId, targetKeySecret, targetInstanceName);
TableMeta tableMeta = new TableMeta(targetTableName);
response.getTableMeta().getPrimaryKeyList().forEach(
item -> tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema(item.getName(), item.getType()))
);
TableOptions tableOptions = new TableOptions(-1, 1);
CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);
targetClient.createTable(request);
// 關閉資源
sourceClient.shutdown();
targetClient.shutdown();
}
private static String createTunnel(TunnelClient client) {
// 建立資料通道,返回通道 ID
CreateTunnelRequest request = new CreateTunnelRequest(sourceTableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse response = client.createTunnel(request);
return response.getTunnelId();
}
private static class SimpleProcessor implements IChannelProcessor {
@Override
public void process(ProcessRecordsInput input) {
if(input.getRecords().isEmpty())
return;
System.out.print("* Begin consume " + input.getRecords().size() + " records ... ");
for (StreamRecord record : input.getRecords()) {
switch (record.getRecordType()) {
// 寫入行資料
case PUT:
RowPutChange putChange = new RowPutChange(targetTableName, record.getPrimaryKey());
putChange.addColumns(getColumnsFromRecord(record));
tableStoreWriter.addRowChange(putChange);
break;
// 更新行資料
case UPDATE:
RowUpdateChange updateChange = new RowUpdateChange(targetTableName, record.getPrimaryKey());
for (RecordColumn column : record.getColumns()) {
switch (column.getColumnType()) {
// 新增屬性列
case PUT:
updateChange.put(column.getColumn().getName(), column.getColumn().getValue(), System.currentTimeMillis());
break;
// 刪除屬性列版本
case DELETE_ONE_VERSION:
updateChange.deleteColumn(column.getColumn().getName(),
column.getColumn().getTimestamp());
break;
// 刪除屬性列
case DELETE_ALL_VERSION:
updateChange.deleteColumns(column.getColumn().getName());
break;
default:
break;
}
}
tableStoreWriter.addRowChange(updateChange);
break;
// 刪除行資料
case DELETE:
RowDeleteChange deleteChange = new RowDeleteChange(targetTableName, record.getPrimaryKey());
tableStoreWriter.addRowChange(deleteChange);
break;
}
}
// 發送緩衝區資料
tableStoreWriter.flush();
System.out.println("Done");
}
@Override
public void shutdown() {
}
}
public static List<Column> getColumnsFromRecord(StreamRecord record) {
List<Column> retColumns = new ArrayList<>();
for (RecordColumn recordColumn : record.getColumns()) {
// 將資料版本號碼替換為目前時間戳,防止超過最大版本偏差
Column column = new Column(recordColumn.getColumn().getName(), recordColumn.getColumn().getValue(), System.currentTimeMillis());
retColumns.add(column);
}
return retColumns;
}
private static TableStoreWriter createTablesStoreWriter() {
WriterConfig config = new WriterConfig();
// 行層級回調,統計成功和失敗行數,列印同步失敗的資料行
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult rowWriteResult) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception exception) {
failedRows.incrementAndGet();
System.out.println("* Failed Rows: " + rowChange.getTableName() + " | " + rowChange.getPrimaryKey() + " | " + exception.getMessage());
}
};
ServiceCredentials credentials = new DefaultCredentials(targetAccessKeyId, targetKeySecret);
return new DefaultTableStoreWriter(targetEndpoint, credentials, targetInstanceName,
targetTableName, config, resultCallback);
}
}使用DataWorks同步資料
DataWorks提供可視化的Data Integration服務,支援通過圖形介面配置Table Store資料表間的同步任務。除DataWorks外,還可使用DataX等工具實現Table Store資料表間的資料同步。
步驟一:準備工作
建立目標資料表,確保其主鍵結構(包括資料類型及順序)與源表完全一致。
開通DataWorks服務,並在源表或目標表所在地區建立工作空間。
建立Serverless資源群組並綁定到工作空間。有關計費資訊,請參見Serverless資源群組計費。
如果源表和目標表位於不同地區,需建立VPC對等串連以實現跨地區網路連通。
步驟二:新增Table Store資料來源
分別為來源資料表和目標資料表所在的執行個體添加Table Store資料來源。
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的,在下拉框中選擇對應工作空間後單擊進入Data Integration。
在左側導覽列,單擊資料來源。
在資料來源列表頁面,單擊新增資料來源。
在新增資料來源對話方塊,搜尋並選擇資料來源類型為Tablestore。
在新增OTS資料來源對話方塊,根據下表配置資料來源參數。
參數
說明
資料來源名稱
資料來源名稱必須以字母、數字、底線(_)組合,且不能以數字和底線(_)開頭。
資料來源描述
對資料來源進行簡單描述,不得超過80個字元。
地區
選擇Tablestore執行個體所屬地區。
Table Store執行個體名稱
Tablestore執行個體的名稱。
Endpoint
Tablestore執行個體的服務地址,推薦使用VPC地址。
AccessKey ID
阿里雲帳號或者RAM使用者的AccessKey ID和AccessKey Secret。
AccessKey Secret
測試資源群組連通性。
建立資料來源時,需要測試資源群組的連通性,確保同步任務使用的資源群組能夠與資料來源正常連通,否則將無法正常執行資料同步任務。
在串連配置地區,單擊相應資源群組連通狀態列的測試連通性。
測試連通性通過後,連通狀態顯示可連通,單擊完成。可在資料來源列表中查看建立的資料來源。
如果測試連通性結果為無法通過,可使用連通性診斷工具自助解決。
步驟三:配置和運行同步任務
建立任務節點
進入資料開發頁面。
登入DataWorks控制台。
在頁面上方,選擇資源群組和地區。
在左側導覽列,單擊。
選擇對應工作空間後單擊進入Data Studio。
在Data Studio控制台的資料開發頁面,單擊專案目錄右側的
表徵圖,然後選擇。在建立節點對話方塊,選擇路徑,資料來源和資料去向都選擇Tablestore,填寫名稱,然後單擊確認。
配置同步任務
在專案目錄下,單擊開啟建立的離線同步任務節點,通過嚮導模式或指令碼模式配置同步任務。
嚮導模式(預設)
配置以下內容:
資料來源:選擇來來源資料源和去向資料來源。
運行資源:選擇資源群組,選擇後會自動檢測資料來源連通性。
資料來源:
表:下拉選擇來來源資料表。
主鍵區間分布(起始):資料讀取的起始主鍵,格式為JSON數組,
inf_min表示無限小。當主鍵包含1個
int類型的主鍵列id和1個string類型的主鍵列name時,樣本配置如下:指定主鍵範圍
全量資料
[ { "type": "int", "value": "000" }, { "type": "string", "value": "aaa" } ][ { "type": "inf_min" }, { "type": "inf_min" } ]主鍵區間分布(結束):資料讀取的結束主鍵,格式為JSON數組,
inf_max表示無限大。當主鍵包含1個
int類型的主鍵列id和1個string類型的主鍵列name時,樣本配置如下:指定主鍵範圍
全量資料
[ { "type": "int", "value": "999" }, { "type": "string", "value": "zzz" } ][ { "type": "inf_max" }, { "type": "inf_max" } ]切分配置資訊:自訂切分配置資訊,格式為JSON數組,普通情況下不建議配置(設定為
[])。當Tablestore資料存放區發生熱點,且使用Tablestore Reader自動切分的策略不能生效時,建議使用自訂的切分規則。切分指定的是在主鍵起始和結束區間內的切分點,僅配置切分鍵,無需指定全部的主鍵。
資料去向:
表:下拉選擇去向資料表。
主鍵資訊:去向資料表的主鍵資訊,格式為JSON數組。
當主鍵包含1個
int類型的主鍵列id和1個string類型的主鍵列name時,樣本配置如下:[ { "name": "id", "type": "int" }, { "name": "name", "type": "string" } ]寫入模式:資料寫入Table Store的模式,支援以下兩種模式:
PutRow:寫入行資料。如果目標行資料不存在,則新增一行。如果目標行資料已存在,則覆蓋原有行。
UpdateRow:更新行資料。如果該行不存在,則新增一行。如果該行存在,則根據請求的內容在這一行中新增、修改或者刪除指定列的值。
去向欄位對應:配置來來源資料表到去向資料表的欄位對應,一行表示一個欄位,格式為JSON。
來源欄位:需包含來來源資料表的主鍵資訊。
當主鍵包含1個
int類型的主鍵列id和1個string類型的主鍵列name,屬性列包含1個int類型的欄位age時,樣本配置如下:{"name":"id","type":"int"} {"name":"name","type":"string"} {"name":"age","type":"int"}目標欄位:無需包含去向資料表的主鍵資訊。
當主鍵包含1個
int類型的主鍵列id和1個string類型的主鍵列name,屬性列包含1個int類型的欄位age時,樣本配置如下:{"name":"age","type":"int"}
配置完成後,單擊頁面上方的儲存。
指令碼模式
單擊頁面上方的指令碼模式,在切換後的頁面中編輯指令碼。
以下以主鍵包含1個int類型的主鍵列id和1個string類型的主鍵列name,屬性列包含1個int類型的欄位age時的配置為例。配置時請替換樣本指令碼內的資料來源datasource和表名稱table。
全量資料
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "ots",
"parameter": {
"datasource": "source_data",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
}
],
"range": {
"begin": [
{
"type": "inf_min"
},
{
"type": "inf_min"
}
],
"end": [
{
"type": "inf_max"
},
{
"type": "inf_max"
}
],
"split": []
},
"table": "source_table",
"newVersion": "true"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "ots",
"parameter": {
"datasource": "target_data",
"column": [
{
"name": "age",
"type": "int"
}
],
"writeMode": "UpdateRow",
"table": "target_table",
"newVersion": "true",
"primaryKey": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
]
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 2,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}指定主鍵範圍
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "ots",
"parameter": {
"datasource": "source_data",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
}
],
"range": {
"begin": [
{
"type": "int",
"value": "000"
},
{
"type": "string",
"value": "aaa"
}
],
"end": [
{
"type": "int",
"value": "999"
},
{
"type": "string",
"value": "zzz"
}
],
"split": []
},
"table": "source_table",
"newVersion": "true"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "ots",
"parameter": {
"datasource": "target_data",
"column": [
{
"name": "age",
"type": "int"
}
],
"writeMode": "UpdateRow",
"table": "target_table",
"newVersion": "true",
"primaryKey": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
]
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 2,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}指令碼編輯完成後,單擊頁面上方的儲存。
運行同步任務
單擊頁面上方的運行,開始同步任務,首次運行時需確認調試配置。
步驟四:查看同步結果
運行同步任務後,可通過日誌查看任務的執行狀態,並在Table Store控制台查看目標資料表的同步結果。
在頁面下方查看任務運行狀態和結果,出現以下資訊時表示同步任務運行成功。
2025-11-18 11:16:23 INFO Shell run successfully! 2025-11-18 11:16:23 INFO Current task status: FINISH 2025-11-18 11:16:23 INFO Cost time is: 77.208s查看目標資料表的資料。
前往Table Store控制台,在頁面上方,選擇資源群組和地區。
單擊執行個體別名,在資料表列表單擊目標資料表。
單擊資料管理,查看目標資料表的資料。
使用命令列工具同步資料
使用命令列工具進行資料同步時,需要手動將源表的資料匯出為本地JSON檔案,隨後再將其匯入到目標表。此方法僅適用於少量資料的遷移情境,大規模資料移轉不建議採用此方法。
步驟一:準備工作
步驟二:匯出源表資料
啟動命令列工具,通過config命令配置源表所在執行個體的接入資訊。更多資訊,請參見啟動並配置接入資訊。
執行前請使用源表所在的執行個體訪問地址、執行個體名稱、AccessKey ID、AccessKey Secret替換命令中的endpoint、instance、id、key。
config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************匯出資料。
執行
use命令以使用源表。以source_table為例。use --wc -t source_table匯出源表中的資料到本地JSON檔案中。具體操作,請參見匯出資料。
scan -o /tmp/sourceData.json
步驟三:匯入目標表資料
通過config命令配置目標表所在執行個體的接入資訊。
執行前請使用目標表所在的執行個體訪問地址、執行個體名稱、AccessKey ID、AccessKey Secret替換命令中的endpoint、instance、id、key。
config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************匯入資料。
執行
use命令以使用目標表。以target_table為例。use --wc -t target_table匯入本地JSON檔案中的資料到目標表中。具體操作,請參見匯入資料。
import -i /tmp/sourceData.json
