Tablestore は、テーブル間でデータを移行または同期するための複数のメソッドを提供します。トンネルサービス、DataWorks、DataX、またはコマンドラインインターフェイスを使用して、あるテーブルから別のテーブルにデータを同期できます。
前提条件
ソーステーブルとターゲットテーブルのインスタンス名、エンドポイント、および リージョン ID を取得します。
Alibaba Cloud アカウントまたは Tablestore の権限を持つ RAM ユーザーの AccessKey を作成します。
SDK を使用したデータの同期
トンネルサービス を使用して、テーブル間でデータを同期できます。このメソッドは、同一リージョン内、異なるリージョン間、および異なるアカウント間でのデータ同期をサポートします。トンネルサービスはデータの変更をキャプチャし、リアルタイムでターゲットテーブルに同期します。次の例は、Java SDK を使用してこの同期を実装する方法を示しています。
コードを実行する前に、テーブル名、インスタンス名、エンドポイントをソーステーブルとターゲットテーブルの実際の値に置き換えてください。次に、AccessKey ID と AccessKey Secret を環境変数として設定します。
import com.alicloud.openservices.tablestore.*;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.model.tunnel.*;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class TableSynchronization {
// ソーステーブルの設定: テーブル名、インスタンス名、エンドポイント、AccessKey ID、AccessKey Secret
final static String sourceTableName = "sourceTableName";
final static String sourceInstanceName = "sourceInstanceName";
final static String sourceEndpoint = "sourceEndpoint";
final static String sourceAccessKeyId = System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_ID");
final static String sourceKeySecret = System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_SECRET");
// ターゲットテーブルの設定: テーブル名、インスタンス名、エンドポイント、AccessKey ID、AccessKey Secret
final static String targetTableName = "targetTableName";
final static String targetInstanceName = "targetInstanceName";
final static String targetEndpoint = "targetEndpoint";
final static String targetAccessKeyId = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_ID");
final static String targetKeySecret = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_SECRET");
// トンネル名
static String tunnelName = "source_table_tunnel";
// TablestoreWriter: 高同時実行データ書き込み用のツール。
static TableStoreWriter tableStoreWriter;
// 成功した行と失敗した行の統計。
static AtomicLong succeedRows = new AtomicLong();
static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) {
// ターゲットテーブルを作成します。
createTargetTable();
System.out.println("Create target table: Done.");
// TunnelClient を初期化します。
TunnelClient tunnelClient = new TunnelClient(sourceEndpoint, sourceAccessKeyId, sourceKeySecret, sourceInstanceName);
// トンネルを作成します。
String tunnelId = createTunnel(tunnelClient);
System.out.println("Create tunnel: Done.");
// TablestoreWriter を初期化します。
tableStoreWriter = createTablesStoreWriter();
// トンネルを介してデータを同期します。
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
try {
System.out.println("Connecting to tunnel and working...");
worker.connectAndWorking();
// トンネルのステータスを監視します。ステータスが完全データ同期から増分同期に変わると、データ同期は完了です。
while (true) {
if (tunnelClient.describeTunnel(new DescribeTunnelRequest(sourceTableName, tunnelName)).getTunnelInfo().getStage().equals(TunnelStage.ProcessStream)) {
break;
}
Thread.sleep(5000);
}
// 同期結果。
System.out.println("Data synchronization completed.");
System.out.println("* Succeeded rows: " + succeedRows.get());
System.out.println("* Failed rows: " + failedRows.get());
// トンネルを削除します。
tunnelClient.deleteTunnel(new DeleteTunnelRequest(sourceTableName, tunnelName));
// リソースをシャットダウンします。
worker.shutdown();
config.shutdown();
tunnelClient.shutdown();
tableStoreWriter.close();
}catch(Exception e){
e.printStackTrace();
worker.shutdown();
config.shutdown();
tunnelClient.shutdown();
tableStoreWriter.close();
}
}
private static void createTargetTable() throws ClientException {
// ソーステーブルの情報をクエリします。
SyncClient sourceClient = new SyncClient(sourceEndpoint, sourceAccessKeyId, sourceKeySecret, sourceInstanceName);
DescribeTableResponse response = sourceClient.describeTable(new DescribeTableRequest(sourceTableName));
// ターゲットテーブルを作成します。
SyncClient targetClient = new SyncClient(targetEndpoint, targetAccessKeyId, targetKeySecret, targetInstanceName);
TableMeta tableMeta = new TableMeta(targetTableName);
response.getTableMeta().getPrimaryKeyList().forEach(
item -> tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema(item.getName(), item.getType()))
);
TableOptions tableOptions = new TableOptions(-1, 1);
CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);
targetClient.createTable(request);
// リソースをシャットダウンします。
sourceClient.shutdown();
targetClient.shutdown();
}
private static String createTunnel(TunnelClient client) {
// トンネルを作成し、トンネル ID を返します。
CreateTunnelRequest request = new CreateTunnelRequest(sourceTableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse response = client.createTunnel(request);
return response.getTunnelId();
}
private static class SimpleProcessor implements IChannelProcessor {
@Override
public void process(ProcessRecordsInput input) {
if(input.getRecords().isEmpty())
return;
System.out.print("* Start to consume " + input.getRecords().size() + " records... ");
for (StreamRecord record : input.getRecords()) {
switch (record.getRecordType()) {
// 行データを書き込みます。
case PUT:
RowPutChange putChange = new RowPutChange(targetTableName, record.getPrimaryKey());
putChange.addColumns(getColumnsFromRecord(record));
tableStoreWriter.addRowChange(putChange);
break;
// 行データを更新します。
case UPDATE:
RowUpdateChange updateChange = new RowUpdateChange(targetTableName, record.getPrimaryKey());
for (RecordColumn column : record.getColumns()) {
switch (column.getColumnType()) {
// 属性列を追加します。
case PUT:
updateChange.put(column.getColumn().getName(), column.getColumn().getValue(), System.currentTimeMillis());
break;
// 属性列のバージョンを削除します。
case DELETE_ONE_VERSION:
updateChange.deleteColumn(column.getColumn().getName(),
column.getColumn().getTimestamp());
break;
// 属性列を削除します。
case DELETE_ALL_VERSION:
updateChange.deleteColumns(column.getColumn().getName());
break;
default:
break;
}
}
tableStoreWriter.addRowChange(updateChange);
break;
// 行データを削除します。
case DELETE:
RowDeleteChange deleteChange = new RowDeleteChange(targetTableName, record.getPrimaryKey());
tableStoreWriter.addRowChange(deleteChange);
break;
}
}
// バッファーをフラッシュします。
tableStoreWriter.flush();
System.out.println("Done.");
}
@Override
public void shutdown() {
}
}
public static List<Column> getColumnsFromRecord(StreamRecord record) {
List<Column> retColumns = new ArrayList<>();
for (RecordColumn recordColumn : record.getColumns()) {
// 最大バージョン ドリフトを超えないように、データ バージョン番号を現在のタイムスタンプに置き換えます。
Column column = new Column(recordColumn.getColumn().getName(), recordColumn.getColumn().getValue(), System.currentTimeMillis());
retColumns.add(column);
}
return retColumns;
}
private static TableStoreWriter createTablesStoreWriter() {
WriterConfig config = new WriterConfig();
// 行レベルのコールバックで、成功した行と失敗した行をカウントし、失敗した行に関する情報を出力します。
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult rowWriteResult) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception exception) {
failedRows.incrementAndGet();
System.out.println("* Failed Row: " + rowChange.getTableName() + " | " + rowChange.getPrimaryKey() + " | " + exception.getMessage());
}
};
ServiceCredentials credentials = new DefaultCredentials(targetAccessKeyId, targetKeySecret);
return new DefaultTableStoreWriter(targetEndpoint, credentials, targetInstanceName,
targetTableName, config, resultCallback);
}
}DataWorks を使用したデータの同期
DataWorks は、グラフィカルインターフェイスを使用して Tablestore テーブル間の同期タスクを設定できる、視覚的なデータ統合サービスを提供します。DataX などの他のツールを使用して、Tablestore テーブル間でデータを同期することもできます。
ステップ 1: 準備
ターゲットデータテーブルを作成します。ターゲットテーブルのプライマリキー構造 (プライマリキー列のデータ型と順序を含む) がソーステーブルのプライマリキー構造と同一であることを確認してください。
DataWorks をアクティブ化し、ソーステーブルまたはターゲットテーブルが配置されているリージョンに ワークスペースを作成します。
サーバーレスリソースグループを作成し、ワークスペースにアタッチします。課金の詳細については、「サーバーレスリソースグループの課金」をご参照ください。
ソーステーブルとターゲットテーブルが異なるリージョンにある場合は、VPC ピアリング接続を作成して、リージョン間のネットワーク接続を確立する必要があります。
ステップ 2: Tablestore データソースの追加
ソーステーブルインスタンスとターゲットテーブルインスタンスの両方に Tablestore データソースを追加します。
DataWorks コンソールにログインします。ターゲットリージョンに切り替えます。左側のナビゲーションウィンドウで、 を選択します。ドロップダウンリストからワークスペースを選択し、[データ統合へ移動] をクリックします。
左側のナビゲーションウィンドウで、[データソース] をクリックします。
[データソースリスト] ページで、[データソースの追加] をクリックします。
[データソースの追加] ダイアログボックスで、データソースタイプとして [Tablestore] を検索して選択します。
[OTS データソースの追加] ダイアログボックスで、次の表の説明に従ってデータソースパラメーターを設定します。
パラメーター
説明
データソース名
データソース名は、文字、数字、アンダースコア (_) の組み合わせである必要があります。数字またはアンダースコア (_) で始めることはできません。
データソースの説明
データソースの簡単な説明。説明の長さは 80 文字を超えることはできません。
リージョン
Tablestore インスタンスが存在するリージョンを選択します。
Tablestore インスタンス名
Tablestore インスタンスの名前。
エンドポイント
Tablestore インスタンスのエンドポイント。VPC エンドポイントを使用します。
AccessKey ID
Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey Secret。
AccessKey Secret
リソースグループの接続性をテストします。
データソースを作成するときは、リソースグループの接続性をテストして、同期タスクのリソースグループがデータソースに接続できることを確認する必要があります。そうしないと、データ同期タスクを実行できません。
[接続設定] セクションで、リソースグループの [接続ステータス] 列にある [接続性のテスト] をクリックします。
接続性テストに合格すると、[接続ステータス] が [接続済み] に変わります。[完了] をクリックします。新しいデータソースがデータソースリストに表示されます。
接続性テストが [失敗] した場合は、[接続診断ツール] を使用して問題をトラブルシューティングします。
ステップ 3: 同期タスクの設定と実行
タスクノードの作成
[データ開発] ページに移動します。
DataWorks コンソールにログインします。
上部のナビゲーションバーで、リソースグループとリージョンを選択します。
左側のナビゲーションウィンドウで、 を選択します。
ワークスペースを選択し、[DataStudio へ移動] をクリックします。
DataStudio コンソールの[データ開発] ページで、[プロジェクトフォルダー] の右側にある
アイコンをクリックし、次に を選択します。[ノードの作成] ダイアログボックスで、[パス] を選択し、データソースとデータ宛先の両方を Tablestore に設定し、名前を入力して [確認] をクリックします。
同期タスクの設定
[プロジェクトフォルダ] で、新しく作成されたオフライン同期タスクノードをクリックします。コードレス UI またはコードエディタで同期タスクを設定できます。
コードレス UI (デフォルト)
次の項目を設定します:
[データソース]: ソースと宛先のデータソースを選択します。
[ランタイムリソース]: リソースグループを選択します。データソースの接続性は自動的にテストされます。
データソース:
[テーブル]: ドロップダウンリストから、ソースデータテーブルを選択します。
[プライマリキー範囲 (開始)]: データ読み取り操作の開始プライマリキー。フォーマットは JSON 配列です。
inf_minは負の無限大を示します。プライマリキーが
idという名前のintプライマリキー列とnameという名前のstringプライマリキー列で構成されている場合、次の設定が例です:特定のプライマリキー範囲
完全データ
[ { "type": "int", "value": "000" }, { "type": "string", "value": "aaa" } ][ { "type": "inf_min" }, { "type": "inf_min" } ][プライマリキー範囲 (終了)]: データ読み取り操作の終了プライマリキー。フォーマットは JSON 配列です。
inf_maxは正の無限大を示します。プライマリキーが
idという名前のintプライマリキー列とnameという名前のstringプライマリキー列で構成されている場合、次の設定が例です:特定のプライマリキー範囲
完全データ
[ { "type": "int", "value": "999" }, { "type": "string", "value": "zzz" } ][ { "type": "inf_max" }, { "type": "inf_max" } ][シャード設定]: カスタムシャード設定。フォーマットは JSON 配列です。通常の状況では、このパラメーターを設定する必要はありません。
[]に設定できます。Tablestore データストレージでホットスポットが発生し、Tablestore Reader の自動シャーディングポリシーが効果的でない場合は、カスタムシャーディングルールを使用できます。シャーディングは、開始と終了のプライマリキー範囲内のシャードポイントを指定します。すべてのプライマリキーではなく、シャードキーのみを設定する必要があります。
データ送信先:
[テーブル]: ドロップダウンリストから、宛先データテーブルを選択します。
[プライマリキー情報]: 宛先データテーブルのプライマリキー情報。フォーマットは JSON 配列です。
プライマリキーが
idという名前のintプライマリキー列とnameという名前のstringプライマリキー列で構成されている場合、次の設定が例です:[ { "name": "id", "type": "int" }, { "name": "name", "type": "string" } ][書き込みモード]: Tablestore にデータを書き込むモード。次のモードがサポートされています:
PutRow: 行データを書き込みます。ターゲット行が存在しない場合は、新しい行が追加されます。ターゲット行が存在する場合は、元の行が上書きされます。
UpdateRow: 行データを更新します。行が存在しない場合は、新しい行が追加されます。行が存在する場合は、リクエストに基づいて行の指定された列の値が追加、変更、または削除されます。
[宛先フィールドマッピング]: ソースデータテーブルから宛先データテーブルへのフィールドマッピングを設定します。各行は JSON フォーマットのフィールドを表します。
[ソースフィールド]: これには、ソースデータテーブルのプライマリキー情報を含める必要があります。
プライマリキーが
idという名前のintプライマリキー列とnameという名前のstringプライマリキー列で構成され、属性列にageという名前のintフィールドが含まれる場合、次の設定が例です:{"name":"id","type":"int"} {"name":"name","type":"string"} {"name":"age","type":"int"}[ターゲットフィールド]: これには、宛先データテーブルのプライマリキー情報を含める必要はありません。
プライマリキーが
idという名前のintプライマリキー列とnameという名前のstringプライマリキー列で構成され、属性列にageという名前のintフィールドが含まれる場合、次の設定が例です:{"name":"age","type":"int"}
設定が完了したら、ページ上部の [保存] をクリックします。
コードエディタ
ページ上部の [コードエディタ] をクリックします。表示されるページでスクリプトを編集します。
次の例は、プライマリキーがidという名前のintプライマリキー列とnameという名前のstringプライマリキー列で構成され、属性列にageという名前のintフィールドが含まれるテーブルの設定を示しています。タスクを設定する際は、サンプルスクリプト内のdatasourceとtableの名前を実際の値に置き換えてください。
完全データ
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "ots",
"parameter": {
"datasource": "source_data",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
}
],
"range": {
"begin": [
{
"type": "inf_min"
},
{
"type": "inf_min"
}
],
"end": [
{
"type": "inf_max"
},
{
"type": "inf_max"
}
],
"split": []
},
"table": "source_table",
"newVersion": "true"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "ots",
"parameter": {
"datasource": "target_data",
"column": [
{
"name": "age",
"type": "int"
}
],
"writeMode": "UpdateRow",
"table": "target_table",
"newVersion": "true",
"primaryKey": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
]
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 2,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}特定のプライマリキー範囲
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "ots",
"parameter": {
"datasource": "source_data",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
}
],
"range": {
"begin": [
{
"type": "int",
"value": "000"
},
{
"type": "string",
"value": "aaa"
}
],
"end": [
{
"type": "int",
"value": "999"
},
{
"type": "string",
"value": "zzz"
}
],
"split": []
},
"table": "source_table",
"newVersion": "true"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "ots",
"parameter": {
"datasource": "target_data",
"column": [
{
"name": "age",
"type": "int"
}
],
"writeMode": "UpdateRow",
"table": "target_table",
"newVersion": "true",
"primaryKey": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
]
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 2,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}スクリプトの編集が完了したら、ページ上部の [保存] をクリックします。
同期タスクの実行
ページ上部の [実行] をクリックして同期タスクを開始します。タスクを初めて実行するときは、[デバッグ設定] を確認する必要があります。
ステップ 4: 同期結果の表示
同期タスクの実行後、ログで実行ステータスを表示し、Tablestore コンソールで同期結果を確認できます。
ページ下部でタスクの実行ステータスと結果を表示します。次のログ情報は、同期タスクが正常に実行されたことを示しています。
2025-11-18 11:16:23 INFO Shell run successfully! 2025-11-18 11:16:23 INFO Current task status: FINISH 2025-11-18 11:16:23 INFO Cost time is: 77.208sターゲットテーブルのデータを表示します。
Tablestore コンソールに移動します。上部のナビゲーションバーで、リソースグループとリージョンを選択します。
インスタンスのエイリアスをクリックします。[データテーブルリスト] で、ターゲットデータテーブルをクリックします。
[データ管理] をクリックして、ターゲットデータテーブルのデータを表示します。
コマンドラインインターフェイスを使用したデータの同期
このメソッドでは、ソーステーブルからローカルの JSON ファイルに手動でデータをエクスポートし、そのファイルをターゲットテーブルにインポートする必要があります。このメソッドは少量のデータの移行にのみ適しており、大規模なデータ移行には推奨されません。
ステップ 1: 準備
ターゲットデータテーブルを作成します。そのプライマリキー構造 (列の名前、データ型、順序を含む) がソーステーブルのプライマリキー構造と同一であることを確認してください。
ステップ 2: ソーステーブルデータのエクスポート
コマンドラインインターフェイスを起動し、`config` コマンドを実行して、ソーステーブルが配置されているインスタンスのアクセス情報を設定します。詳細については、「起動とアクセス情報の設定」をご参照ください。
コマンドを実行する前に、`endpoint`、`instance`、`id`、および `key` を、ソーステーブルが配置されているインスタンスのエンドポイント、インスタンス名、AccessKey ID、および AccessKey Secret に置き換えてください。
config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************データをエクスポートします。
useコマンドを実行して、ソーステーブルを選択します。次の例ではsource_tableを使用します。use --wc -t source_tableソーステーブルからローカルの JSON ファイルにデータをエクスポートします。詳細については、「データのエクスポート」をご参照ください。
scan -o /tmp/sourceData.json
ステップ 3: ターゲットテーブルへのデータのインポート
`config` コマンドを実行して、ターゲットテーブルが配置されているインスタンスのアクセス情報を設定します。
コマンドを実行する前に、`endpoint`、`instance`、`id`、および `key` を、ターゲットテーブルが配置されているインスタンスのエンドポイント、インスタンス名、AccessKey ID、および AccessKey Secret に置き換えてください。
config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************データをインポートします。
useコマンドを実行して、ターゲットテーブルを選択します。次の例ではtarget_tableを使用します。use --wc -t target_tableローカルの JSON ファイルからターゲットテーブルにデータをインポートします。詳細については、「データのインポート」をご参照ください。
import -i /tmp/sourceData.json
