全部產品
Search
文件中心

Tablestore:資料表同步到資料表

更新時間:Nov 19, 2025

Table Store支援多種方式實現資料表間的資料移轉或同步。可通過通道服務、DataWorks、DataX或命令列工具等方式完成資料表到資料表的同步操作。

前提條件

  • 擷取來源資料表和目標資料表的執行個體名稱、執行個體訪問地址、地區ID等資訊。

  • 為阿里雲帳號或具有Table Store許可權的RAM使用者建立AccessKey

使用SDK同步資料

基於通道服務實現資料表間的資料同步,支援同地區、跨地區以及跨帳號的資料同步情境。通過通道服務擷取資料變更記錄,並將變更即時同步至目標表。以下以Java SDK為例示範同步實現。

運行代碼前,請替換代碼中源表和目標表的資料表名稱、執行個體名稱和執行個體訪問地址,並將AccessKey ID和AccessKey Secret配置為環境變數。
import com.alicloud.openservices.tablestore.*;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.model.tunnel.*;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class TableSynchronization {
    // 源表配置項:表名稱、執行個體名稱、執行個體訪問地址、AccessKey ID、AccessKey Secret
    final static String sourceTableName = "sourceTableName";
    final static String sourceInstanceName = "sourceInstanceName";
    final static String sourceEndpoint = "sourceEndpoint";
    final static String sourceAccessKeyId =  System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_ID");
    final static String sourceKeySecret = System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_SECRET");

    // 目標表配置項:表名稱、執行個體名稱、執行個體訪問地址、AccessKey ID、AccessKey Secret
    final static String targetTableName = "targetTableName";
    final static String targetInstanceName = "targetInstanceName";
    final static String targetEndpoint = "targetEndpoint";
    final static String targetAccessKeyId = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_ID");
    final static String targetKeySecret = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_SECRET");

    // 通道名稱
    static String tunnelName = "source_table_tunnel";
    // TablestoreWriter:高並發資料寫入工具
    static TableStoreWriter tableStoreWriter;
    // 成功和失敗行數統計
    static AtomicLong succeedRows = new AtomicLong();
    static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) {
        // 建立目標表
        createTargetTable();
        System.out.println("Create target table Done.");

        // 初始化 TunnelClient
        TunnelClient tunnelClient = new TunnelClient(sourceEndpoint, sourceAccessKeyId, sourceKeySecret, sourceInstanceName);
        // 建立資料通道
        String tunnelId = createTunnel(tunnelClient);
        System.out.println("Create tunnel Done.");

        // 初始化 TablestoreWriter
        tableStoreWriter = createTablesStoreWriter();

        // 通過資料通道同步資料
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
        try {
            System.out.println("Connect tunnel and working ...");
            worker.connectAndWorking();

            // 監聽通道狀態,通道狀態從全量同步變成增量同步處理時,表示資料同步完成
            while (true) {
                if (tunnelClient.describeTunnel(new DescribeTunnelRequest(sourceTableName, tunnelName)).getTunnelInfo().getStage().equals(TunnelStage.ProcessStream)) {
                    break;
                }
                Thread.sleep(5000);
            }

            // 同步結果
            System.out.println("Data Synchronization Completed.");
            System.out.println("* Succeed Rows Count: " + succeedRows.get());
            System.out.println("* Failed Rows Count: " + failedRows.get());
            // 刪除通道
            tunnelClient.deleteTunnel(new DeleteTunnelRequest(sourceTableName, tunnelName));
            // 關閉資源
            worker.shutdown();
            config.shutdown();
            tunnelClient.shutdown();
            tableStoreWriter.close();
        }catch(Exception e){
            e.printStackTrace();
            worker.shutdown();
            config.shutdown();
            tunnelClient.shutdown();
            tableStoreWriter.close();
        }
    }

    private static void createTargetTable() throws ClientException {
        // 查詢源表資訊
        SyncClient sourceClient = new SyncClient(sourceEndpoint, sourceAccessKeyId, sourceKeySecret, sourceInstanceName);
        DescribeTableResponse response = sourceClient.describeTable(new DescribeTableRequest(sourceTableName));

        // 建立目標表
        SyncClient targetClient = new SyncClient(targetEndpoint, targetAccessKeyId, targetKeySecret, targetInstanceName);
        TableMeta tableMeta = new TableMeta(targetTableName);
        response.getTableMeta().getPrimaryKeyList().forEach(
                item -> tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema(item.getName(), item.getType()))
        );
        TableOptions tableOptions = new TableOptions(-1, 1);
        CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);
        targetClient.createTable(request);

        // 關閉資源
        sourceClient.shutdown();
        targetClient.shutdown();
    }

    private static String createTunnel(TunnelClient client) {
        // 建立資料通道,返回通道 ID
        CreateTunnelRequest request = new CreateTunnelRequest(sourceTableName, tunnelName, TunnelType.BaseAndStream);
        CreateTunnelResponse response = client.createTunnel(request);
        return response.getTunnelId();
    }

    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            if(input.getRecords().isEmpty())
                return;

            System.out.print("* Begin consume " + input.getRecords().size() + " records ... ");
            for (StreamRecord record : input.getRecords()) {
                switch (record.getRecordType()) {
                    // 寫入行資料
                    case PUT:
                        RowPutChange putChange = new RowPutChange(targetTableName, record.getPrimaryKey());
                        putChange.addColumns(getColumnsFromRecord(record));
                        tableStoreWriter.addRowChange(putChange);
                        break;
                    // 更新行資料
                    case UPDATE:
                        RowUpdateChange updateChange = new RowUpdateChange(targetTableName, record.getPrimaryKey());
                        for (RecordColumn column : record.getColumns()) {
                            switch (column.getColumnType()) {
                                // 新增屬性列
                                case PUT:
                                    updateChange.put(column.getColumn().getName(), column.getColumn().getValue(), System.currentTimeMillis());
                                    break;
                                // 刪除屬性列版本
                                case DELETE_ONE_VERSION:
                                    updateChange.deleteColumn(column.getColumn().getName(),
                                            column.getColumn().getTimestamp());
                                    break;
                                // 刪除屬性列
                                case DELETE_ALL_VERSION:
                                    updateChange.deleteColumns(column.getColumn().getName());
                                    break;
                                default:
                                    break;
                            }
                        }
                        tableStoreWriter.addRowChange(updateChange);
                        break;
                    // 刪除行資料
                    case DELETE:
                        RowDeleteChange deleteChange = new RowDeleteChange(targetTableName, record.getPrimaryKey());
                        tableStoreWriter.addRowChange(deleteChange);
                        break;
                }
            }

            // 發送緩衝區資料
            tableStoreWriter.flush();
            System.out.println("Done");
        }

        @Override
        public void shutdown() {
        }
    }

    public static List<Column> getColumnsFromRecord(StreamRecord record) {
        List<Column> retColumns = new ArrayList<>();
        for (RecordColumn recordColumn : record.getColumns()) {
            // 將資料版本號碼替換為目前時間戳,防止超過最大版本偏差
            Column column = new Column(recordColumn.getColumn().getName(), recordColumn.getColumn().getValue(), System.currentTimeMillis());
            retColumns.add(column);
        }
        return retColumns;
    }

    private static TableStoreWriter createTablesStoreWriter() {
        WriterConfig config = new WriterConfig();

        // 行層級回調,統計成功和失敗行數,列印同步失敗的資料行
        TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
            @Override
            public void onCompleted(RowChange rowChange, RowWriteResult rowWriteResult) {
                succeedRows.incrementAndGet();
            }

            @Override
            public void onFailed(RowChange rowChange, Exception exception) {
                failedRows.incrementAndGet();
                System.out.println("* Failed Rows: " + rowChange.getTableName() + " | " + rowChange.getPrimaryKey() + " | " + exception.getMessage());
            }
        };

        ServiceCredentials credentials = new DefaultCredentials(targetAccessKeyId, targetKeySecret);
        return new DefaultTableStoreWriter(targetEndpoint, credentials, targetInstanceName,
                targetTableName, config, resultCallback);
    }
}

使用DataWorks同步資料

DataWorks提供可視化的Data Integration服務,支援通過圖形介面配置Table Store資料表間的同步任務。除DataWorks外,還可使用DataX等工具實現Table Store資料表間的資料同步。

步驟一:準備工作

說明

如果源表和目標表位於不同地區,需建立VPC對等串連以實現跨地區網路連通。

建立VPC對等串連實現跨地區網路連通

以下以DataWorks工作空間和源表執行個體均位於華東1(杭州)地區,目標表位於華東2(上海)地區的情境為例進行說明。

  1. 為Tablestore執行個體綁定VPC。

    1. 登入Table Store控制台,在頁面上方選擇目標表所在地區。

    2. 單擊執行個體別名進入執行個體管理頁面。

    3. 切換到網路管理頁簽,單擊綁定VPC,選擇VPC和交換器並填寫VPC名稱,然後單擊確定

    4. 等待VPC綁定完成,頁面將自動重新整理,可在VPC列表查看綁定的VPC IDVPC訪問地址

      說明

      在DataWorks控制台添加Tablestore資料來源時,需使用該VPC訪問地址。

      image

  2. 擷取DataWorks工作空間資源群組的VPC資訊。

    1. 登入DataWorks控制台,在頁面上方選擇工作空間所在地區,然後單擊左側工作空間菜單,進入工作空間列表頁面。

    2. 單擊工作空間名稱進入空間詳情頁面,單擊左側資源群組菜單,查看工作空間綁定的資源群組列表。

    3. 在目標資源群組右側單擊網路設定,在資源調度 & Data Integration地區查看綁定的專用網路,即VPC ID

  3. 建立VPC對等串連並配置路由。

    1. 登入Virtual Private Cloud控制台,在頁面左側單擊VPC對等串連菜單,然後單擊建立對等串連

    2. 建立對等串連頁面,輸入對等串連名稱,選擇發起端VPC執行個體、接收端帳號類型、接收端地區和接收端VPC執行個體,單擊確定

    3. VPC對等串連頁面,找到已建立的VPC對等串連,分別在發起端接收端列單擊配置路由條目

      目標網段需填寫對端VPC的網段地址。即在發起端配置路由條目時,填寫接收端的網段地址;在接收端配置路由條目時,填寫發起端的網段地址。

步驟二:新增Table Store資料來源

分別為來源資料表和目標資料表所在的執行個體添加Table Store資料來源。

  1. 登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的Data Integration > Data Integration,在下拉框中選擇對應工作空間後單擊進入Data Integration

  2. 在左側導覽列,單擊資料來源

  3. 資料來源列表頁面,單擊新增資料來源

  4. 新增資料來源對話方塊,搜尋並選擇資料來源類型為Tablestore

  5. 新增OTS資料來源對話方塊,根據下表配置資料來源參數。

    參數

    說明

    資料來源名稱

    資料來源名稱必須以字母、數字、底線(_)組合,且不能以數字和底線(_)開頭。

    資料來源描述

    對資料來源進行簡單描述,不得超過80個字元。

    地區

    選擇Tablestore執行個體所屬地區。

    Table Store執行個體名稱

    Tablestore執行個體的名稱。

    Endpoint

    Tablestore執行個體的服務地址,推薦使用VPC地址

    AccessKey ID

    阿里雲帳號或者RAM使用者的AccessKey ID和AccessKey Secret。

    AccessKey Secret

  6. 測試資源群組連通性。

    建立資料來源時,需要測試資源群組的連通性,確保同步任務使用的資源群組能夠與資料來源正常連通,否則將無法正常執行資料同步任務。

    1. 串連配置地區,單擊相應資源群組連通狀態列的測試連通性

    2. 測試連通性通過後,連通狀態顯示可連通,單擊完成。可在資料來源列表中查看建立的資料來源。

      如果測試連通性結果為無法通過,可使用連通性診斷工具自助解決。

步驟三:配置和運行同步任務

建立任務節點

  1. 進入資料開發頁面。

    1. 登入DataWorks控制台

    2. 在頁面上方,選擇資源群組和地區。

    3. 在左側導覽列,單擊資料開發與營運 > 資料開發

    4. 選擇對應工作空間後單擊進入Data Studio

  2. 在Data Studio控制台的資料開發頁面,單擊專案目錄右側的image表徵圖,然後選擇建立節點 > Data Integration > 離線同步

  3. 建立節點對話方塊,選擇路徑,資料來源和資料去向都選擇Tablestore,填寫名稱,然後單擊確認

配置同步任務

專案目錄下,單擊開啟建立的離線同步任務節點,通過嚮導模式或指令碼模式配置同步任務。

嚮導模式(預設)

配置以下內容:

  • 資料來源:選擇來來源資料源和去向資料來源。

  • 運行資源:選擇資源群組,選擇後會自動檢測資料來源連通性。

  • 資料來源

    • :下拉選擇來來源資料表。

    • 主鍵區間分布(起始):資料讀取的起始主鍵,格式為JSON數組,inf_min表示無限小。

      當主鍵包含1個int類型的主鍵列id和1個string類型的主鍵列name時,樣本配置如下:

      指定主鍵範圍

      全量資料

      [
        {
          "type": "int",
          "value": "000"
        },
        {
          "type": "string",
          "value": "aaa"
        }
      ]
      [
        {
          "type": "inf_min"
        },
        {
          "type": "inf_min"
        }
      ]
    • 主鍵區間分布(結束):資料讀取的結束主鍵,格式為JSON數組,inf_max表示無限大。

      當主鍵包含1個int類型的主鍵列id和1個string類型的主鍵列name時,樣本配置如下:

      指定主鍵範圍

      全量資料

      [
        {
          "type": "int",
          "value": "999"
        },
        {
          "type": "string",
          "value": "zzz"
        }
      ]
      [
        {
          "type": "inf_max"
        },
        {
          "type": "inf_max"
        }
      ]
    • 切分配置資訊:自訂切分配置資訊,格式為JSON數組,普通情況下不建議配置(設定為[])。

      當Tablestore資料存放區發生熱點,且使用Tablestore Reader自動切分的策略不能生效時,建議使用自訂的切分規則。切分指定的是在主鍵起始和結束區間內的切分點,僅配置切分鍵,無需指定全部的主鍵。

  • 資料去向

    • :下拉選擇去向資料表。

    • 主鍵資訊:去向資料表的主鍵資訊,格式為JSON數組。

      當主鍵包含1個int類型的主鍵列id和1個string類型的主鍵列name時,樣本配置如下:

      [
        {
          "name": "id",
          "type": "int"
        },
        {
          "name": "name",
          "type": "string"
        }
      ]
    • 寫入模式:資料寫入Table Store的模式,支援以下兩種模式:

      • PutRow:寫入行資料。如果目標行資料不存在,則新增一行。如果目標行資料已存在,則覆蓋原有行。

      • UpdateRow:更新行資料。如果該行不存在,則新增一行。如果該行存在,則根據請求的內容在這一行中新增、修改或者刪除指定列的值。

  • 去向欄位對應:配置來來源資料表到去向資料表的欄位對應,一行表示一個欄位,格式為JSON。

    • 來源欄位:需包含來來源資料表的主鍵資訊。

      當主鍵包含1個int類型的主鍵列id和1個string類型的主鍵列name,屬性列包含1個int類型的欄位age時,樣本配置如下:

      {"name":"id","type":"int"}
      {"name":"name","type":"string"}
      {"name":"age","type":"int"}
    • 目標欄位:無需包含去向資料表的主鍵資訊。

      當主鍵包含1個int類型的主鍵列id和1個string類型的主鍵列name,屬性列包含1個int類型的欄位age時,樣本配置如下:

      {"name":"age","type":"int"}

配置完成後,單擊頁面上方的儲存

指令碼模式

單擊頁面上方的指令碼模式,在切換後的頁面中編輯指令碼。

以下以主鍵包含1個int類型的主鍵列id和1個string類型的主鍵列name,屬性列包含1個int類型的欄位age時的配置為例。配置時請替換樣本指令碼內的資料來源datasource和表名稱table

全量資料

{
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "ots",
            "parameter": {
                "datasource": "source_data",
                "column": [
                    {
                        "name": "id",
                        "type": "int"
                    },
                    {
                        "name": "name",
                        "type": "string"
                    },
                    {
                        "name": "age",
                        "type": "int"
                    }
                ],
                "range": {
                    "begin": [
                        {
                            "type": "inf_min"
                        },
                        {
                            "type": "inf_min"
                        }
                    ],
                    "end": [
                        {
                            "type": "inf_max"
                        },
                        {
                            "type": "inf_max"
                        }
                    ],
                    "split": []
                },
                "table": "source_table",
                "newVersion": "true"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "ots",
            "parameter": {
                "datasource": "target_data",
                "column": [
                    {
                        "name": "age",
                        "type": "int"
                    }
                ],
                "writeMode": "UpdateRow",
                "table": "target_table",
                "newVersion": "true",
                "primaryKey": [
                    {
                        "name": "id",
                        "type": "int"
                    },
                    {
                        "name": "name",
                        "type": "string"
                    }
                ]
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "concurrent": 2,
            "throttle": false
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

指定主鍵範圍

{
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "ots",
            "parameter": {
                "datasource": "source_data",
                "column": [
                    {
                        "name": "id",
                        "type": "int"
                    },
                    {
                        "name": "name",
                        "type": "string"
                    },
                    {
                        "name": "age",
                        "type": "int"
                    }
                ],
                "range": {
                    "begin": [
                        {
                            "type": "int",
                            "value": "000"
                        },
                        {
                            "type": "string",
                            "value": "aaa"
                        }
                    ],
                    "end": [
                        {
                            "type": "int",
                            "value": "999"
                        },
                        {
                            "type": "string",
                            "value": "zzz"
                        }
                    ],
                    "split": []
                },
                "table": "source_table",
                "newVersion": "true"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "ots",
            "parameter": {
                "datasource": "target_data",
                "column": [
                    {
                        "name": "age",
                        "type": "int"
                    }
                ],
                "writeMode": "UpdateRow",
                "table": "target_table",
                "newVersion": "true",
                "primaryKey": [
                    {
                        "name": "id",
                        "type": "int"
                    },
                    {
                        "name": "name",
                        "type": "string"
                    }
                ]
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "concurrent": 2,
            "throttle": false
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

指令碼編輯完成後,單擊頁面上方的儲存

運行同步任務

單擊頁面上方的運行,開始同步任務,首次運行時需確認調試配置

步驟四:查看同步結果

運行同步任務後,可通過日誌查看任務的執行狀態,並在Table Store控制台查看目標資料表的同步結果。

  1. 在頁面下方查看任務運行狀態和結果,出現以下資訊時表示同步任務運行成功。

    2025-11-18 11:16:23 INFO Shell run successfully!
    2025-11-18 11:16:23 INFO Current task status: FINISH
    2025-11-18 11:16:23 INFO Cost time is: 77.208s
  2. 查看目標資料表的資料。

    1. 前往Table Store控制台,在頁面上方,選擇資源群組和地區。

    2. 單擊執行個體別名,在資料表列表單擊目標資料表。

    3. 單擊資料管理,查看目標資料表的資料。

使用命令列工具同步資料

使用命令列工具進行資料同步時,需要手動將源表的資料匯出為本地JSON檔案,隨後再將其匯入到目標表。此方法僅適用於少量資料的遷移情境,大規模資料移轉不建議採用此方法。

步驟一:準備工作

步驟二:匯出源表資料

  1. 啟動命令列工具,通過config命令配置源表所在執行個體的接入資訊。更多資訊,請參見啟動並配置接入資訊

    執行前請使用源表所在的執行個體訪問地址、執行個體名稱、AccessKey ID、AccessKey Secret替換命令中的endpoint、instance、id、key。
    config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************
  2. 匯出資料。

    1. 執行use命令以使用源表。以source_table為例。

      use --wc -t source_table
    2. 匯出源表中的資料到本地JSON檔案中。具體操作,請參見匯出資料

      scan -o /tmp/sourceData.json

步驟三:匯入目標表資料

  1. 通過config命令配置目標表所在執行個體的接入資訊。

    執行前請使用目標表所在的執行個體訪問地址、執行個體名稱、AccessKey ID、AccessKey Secret替換命令中的endpoint、instance、id、key。
    config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************
  2. 匯入資料。

    1. 執行use命令以使用目標表。以target_table為例。

      use --wc -t target_table
    2. 匯入本地JSON檔案中的資料到目標表中。具體操作,請參見匯入資料

      import -i /tmp/sourceData.json 

常見問題