すべてのプロダクト
Search
ドキュメントセンター

Tablestore:あるテーブルから別のテーブルにデータを同期する

最終更新日:Nov 19, 2025

Tablestore は、テーブル間でデータを移行または同期するための複数のメソッドを提供します。トンネルサービス、DataWorks、DataX、またはコマンドラインインターフェイスを使用して、あるテーブルから別のテーブルにデータを同期できます。

前提条件

  • ソーステーブルとターゲットテーブルのインスタンス名、エンドポイント、および リージョン ID を取得します。

  • Alibaba Cloud アカウントまたは Tablestore の権限を持つ RAM ユーザーの AccessKey を作成します。

SDK を使用したデータの同期

トンネルサービス を使用して、テーブル間でデータを同期できます。このメソッドは、同一リージョン内、異なるリージョン間、および異なるアカウント間でのデータ同期をサポートします。トンネルサービスはデータの変更をキャプチャし、リアルタイムでターゲットテーブルに同期します。次の例は、Java SDK を使用してこの同期を実装する方法を示しています。

コードを実行する前に、テーブル名、インスタンス名、エンドポイントをソーステーブルとターゲットテーブルの実際の値に置き換えてください。次に、AccessKey ID と AccessKey Secret を環境変数として設定します。
import com.alicloud.openservices.tablestore.*;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.model.tunnel.*;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class TableSynchronization {
    // ソーステーブルの設定: テーブル名、インスタンス名、エンドポイント、AccessKey ID、AccessKey Secret
    final static String sourceTableName = "sourceTableName";
    final static String sourceInstanceName = "sourceInstanceName";
    final static String sourceEndpoint = "sourceEndpoint";
    final static String sourceAccessKeyId =  System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_ID");
    final static String sourceKeySecret = System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_SECRET");

    // ターゲットテーブルの設定: テーブル名、インスタンス名、エンドポイント、AccessKey ID、AccessKey Secret
    final static String targetTableName = "targetTableName";
    final static String targetInstanceName = "targetInstanceName";
    final static String targetEndpoint = "targetEndpoint";
    final static String targetAccessKeyId = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_ID");
    final static String targetKeySecret = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_SECRET");

    // トンネル名
    static String tunnelName = "source_table_tunnel";
    // TablestoreWriter: 高同時実行データ書き込み用のツール。
    static TableStoreWriter tableStoreWriter;
    // 成功した行と失敗した行の統計。
    static AtomicLong succeedRows = new AtomicLong();
    static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) {
        // ターゲットテーブルを作成します。
        createTargetTable();
        System.out.println("Create target table: Done.");

        // TunnelClient を初期化します。
        TunnelClient tunnelClient = new TunnelClient(sourceEndpoint, sourceAccessKeyId, sourceKeySecret, sourceInstanceName);
        // トンネルを作成します。
        String tunnelId = createTunnel(tunnelClient);
        System.out.println("Create tunnel: Done.");

        // TablestoreWriter を初期化します。
        tableStoreWriter = createTablesStoreWriter();

        // トンネルを介してデータを同期します。
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
        try {
            System.out.println("Connecting to tunnel and working...");
            worker.connectAndWorking();

            // トンネルのステータスを監視します。ステータスが完全データ同期から増分同期に変わると、データ同期は完了です。
            while (true) {
                if (tunnelClient.describeTunnel(new DescribeTunnelRequest(sourceTableName, tunnelName)).getTunnelInfo().getStage().equals(TunnelStage.ProcessStream)) {
                    break;
                }
                Thread.sleep(5000);
            }

            // 同期結果。
            System.out.println("Data synchronization completed.");
            System.out.println("* Succeeded rows: " + succeedRows.get());
            System.out.println("* Failed rows: " + failedRows.get());
            // トンネルを削除します。
            tunnelClient.deleteTunnel(new DeleteTunnelRequest(sourceTableName, tunnelName));
            // リソースをシャットダウンします。
            worker.shutdown();
            config.shutdown();
            tunnelClient.shutdown();
            tableStoreWriter.close();
        }catch(Exception e){
            e.printStackTrace();
            worker.shutdown();
            config.shutdown();
            tunnelClient.shutdown();
            tableStoreWriter.close();
        }
    }

    private static void createTargetTable() throws ClientException {
        // ソーステーブルの情報をクエリします。
        SyncClient sourceClient = new SyncClient(sourceEndpoint, sourceAccessKeyId, sourceKeySecret, sourceInstanceName);
        DescribeTableResponse response = sourceClient.describeTable(new DescribeTableRequest(sourceTableName));

        // ターゲットテーブルを作成します。
        SyncClient targetClient = new SyncClient(targetEndpoint, targetAccessKeyId, targetKeySecret, targetInstanceName);
        TableMeta tableMeta = new TableMeta(targetTableName);
        response.getTableMeta().getPrimaryKeyList().forEach(
                item -> tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema(item.getName(), item.getType()))
        );
        TableOptions tableOptions = new TableOptions(-1, 1);
        CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);
        targetClient.createTable(request);

        // リソースをシャットダウンします。
        sourceClient.shutdown();
        targetClient.shutdown();
    }

    private static String createTunnel(TunnelClient client) {
        // トンネルを作成し、トンネル ID を返します。
        CreateTunnelRequest request = new CreateTunnelRequest(sourceTableName, tunnelName, TunnelType.BaseAndStream);
        CreateTunnelResponse response = client.createTunnel(request);
        return response.getTunnelId();
    }

    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            if(input.getRecords().isEmpty())
                return;

            System.out.print("* Start to consume " + input.getRecords().size() + " records... ");
            for (StreamRecord record : input.getRecords()) {
                switch (record.getRecordType()) {
                    // 行データを書き込みます。
                    case PUT:
                        RowPutChange putChange = new RowPutChange(targetTableName, record.getPrimaryKey());
                        putChange.addColumns(getColumnsFromRecord(record));
                        tableStoreWriter.addRowChange(putChange);
                        break;
                    // 行データを更新します。
                    case UPDATE:
                        RowUpdateChange updateChange = new RowUpdateChange(targetTableName, record.getPrimaryKey());
                        for (RecordColumn column : record.getColumns()) {
                            switch (column.getColumnType()) {
                                // 属性列を追加します。
                                case PUT:
                                    updateChange.put(column.getColumn().getName(), column.getColumn().getValue(), System.currentTimeMillis());
                                    break;
                                // 属性列のバージョンを削除します。
                                case DELETE_ONE_VERSION:
                                    updateChange.deleteColumn(column.getColumn().getName(),
                                            column.getColumn().getTimestamp());
                                    break;
                                // 属性列を削除します。
                                case DELETE_ALL_VERSION:
                                    updateChange.deleteColumns(column.getColumn().getName());
                                    break;
                                default:
                                    break;
                            }
                        }
                        tableStoreWriter.addRowChange(updateChange);
                        break;
                    // 行データを削除します。
                    case DELETE:
                        RowDeleteChange deleteChange = new RowDeleteChange(targetTableName, record.getPrimaryKey());
                        tableStoreWriter.addRowChange(deleteChange);
                        break;
                }
            }

            // バッファーをフラッシュします。
            tableStoreWriter.flush();
            System.out.println("Done.");
        }

        @Override
        public void shutdown() {
        }
    }

    public static List<Column> getColumnsFromRecord(StreamRecord record) {
        List<Column> retColumns = new ArrayList<>();
        for (RecordColumn recordColumn : record.getColumns()) {
            // 最大バージョン ドリフトを超えないように、データ バージョン番号を現在のタイムスタンプに置き換えます。
            Column column = new Column(recordColumn.getColumn().getName(), recordColumn.getColumn().getValue(), System.currentTimeMillis());
            retColumns.add(column);
        }
        return retColumns;
    }

    private static TableStoreWriter createTablesStoreWriter() {
        WriterConfig config = new WriterConfig();

        // 行レベルのコールバックで、成功した行と失敗した行をカウントし、失敗した行に関する情報を出力します。
        TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
            @Override
            public void onCompleted(RowChange rowChange, RowWriteResult rowWriteResult) {
                succeedRows.incrementAndGet();
            }

            @Override
            public void onFailed(RowChange rowChange, Exception exception) {
                failedRows.incrementAndGet();
                System.out.println("* Failed Row: " + rowChange.getTableName() + " | " + rowChange.getPrimaryKey() + " | " + exception.getMessage());
            }
        };

        ServiceCredentials credentials = new DefaultCredentials(targetAccessKeyId, targetKeySecret);
        return new DefaultTableStoreWriter(targetEndpoint, credentials, targetInstanceName,
                targetTableName, config, resultCallback);
    }
}

DataWorks を使用したデータの同期

DataWorks は、グラフィカルインターフェイスを使用して Tablestore テーブル間の同期タスクを設定できる、視覚的なデータ統合サービスを提供します。DataX などの他のツールを使用して、Tablestore テーブル間でデータを同期することもできます。

ステップ 1: 準備

説明

ソーステーブルとターゲットテーブルが異なるリージョンにある場合は、VPC ピアリング接続を作成して、リージョン間のネットワーク接続を確立する必要があります。

リージョン間のネットワーク接続のための VPC ピアリング接続の作成

次の例では、DataWorks ワークスペースとソーステーブルインスタンスが中国 (杭州) リージョンにあり、ターゲットテーブルが中国 (上海) リージョンにあるシナリオについて説明します。

  1. VPC を Tablestore インスタンスにアタッチします。

    1. Tablestore コンソールにログインします。上部のナビゲーションバーで、ターゲットテーブルが配置されているリージョンを選択します。

    2. インスタンスのエイリアスをクリックして、[インスタンス管理] ページに移動します。

    3. [ネットワーク管理] タブで、[VPC のアタッチ] をクリックします。VPC と vSwitch を選択し、VPC 名を入力して、[OK] をクリックします。

    4. VPC がアタッチされるのを待ちます。ページが自動的に更新され、VPC リストに [VPC ID][VPC エンドポイント] が表示されます。

      説明

      DataWorks コンソールで Tablestore データソースを追加するときは、この VPC エンドポイントを使用する必要があります。

      image

  2. DataWorks ワークスペースリソースグループの VPC 情報を取得します。

    1. DataWorks コンソールにログインします。上部のナビゲーションバーで、ワークスペースが配置されているリージョンを選択します。左側のナビゲーションウィンドウで、[ワークスペース] をクリックして [ワークスペースリスト] ページに移動します。

    2. ワークスペース名をクリックして [ワークスペース詳細] ページに移動します。左側のナビゲーションウィンドウで、[リソースグループ] をクリックして、ワークスペースにアタッチされているリソースグループを表示します。

    3. ターゲットリソースグループの右側にある [ネットワーク設定] をクリックします。[リソーススケジューリング & データ統合] セクションで、アタッチされている仮想プライベートクラウドの VPC ID を表示します。

  3. VPC ピアリング接続を作成し、ルートを設定します。

    1. VPC コンソールにログインします。左側のナビゲーションウィンドウで、[VPC ピアリング接続] をクリックし、[ピアリング接続の作成] をクリックします。

    2. [ピアリング接続の作成] ページで、ピアリング接続の名前を入力し、リクエスト元 VPC インスタンス、リクエスト先アカウントタイプ、リクエスト先リージョン、およびリクエスト先 VPC インスタンスを選択します。次に、[OK] をクリックします。

    3. [VPC ピアリング接続] ページで、VPC ピアリング接続を見つけ、[リクエスト元 VPC] および [リクエスト先 VPC] 列の [ルートエントリの設定] をクリックします。

      宛先 CIDR ブロックには、ピア VPC の CIDR ブロックを入力します。たとえば、リクエスト元 VPC のルートエントリを設定する場合は、リクエスト先 VPC の CIDR ブロックを入力します。リクエスト先 VPC のルートエントリを設定する場合は、リクエスト元 VPC の CIDR ブロックを入力します。

ステップ 2: Tablestore データソースの追加

ソーステーブルインスタンスとターゲットテーブルインスタンスの両方に Tablestore データソースを追加します。

  1. DataWorks コンソールにログインします。ターゲットリージョンに切り替えます。左側のナビゲーションウィンドウで、[データ統合] > [データ統合] を選択します。ドロップダウンリストからワークスペースを選択し、[データ統合へ移動] をクリックします。

  2. 左側のナビゲーションウィンドウで、[データソース] をクリックします。

  3. [データソースリスト] ページで、[データソースの追加] をクリックします。

  4. [データソースの追加] ダイアログボックスで、データソースタイプとして [Tablestore] を検索して選択します。

  5. [OTS データソースの追加] ダイアログボックスで、次の表の説明に従ってデータソースパラメーターを設定します。

    パラメーター

    説明

    データソース名

    データソース名は、文字、数字、アンダースコア (_) の組み合わせである必要があります。数字またはアンダースコア (_) で始めることはできません。

    データソースの説明

    データソースの簡単な説明。説明の長さは 80 文字を超えることはできません。

    リージョン

    Tablestore インスタンスが存在するリージョンを選択します。

    Tablestore インスタンス名

    Tablestore インスタンスの名前。

    エンドポイント

    Tablestore インスタンスのエンドポイント。VPC エンドポイントを使用します。

    AccessKey ID

    Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey Secret。

    AccessKey Secret

  6. リソースグループの接続性をテストします。

    データソースを作成するときは、リソースグループの接続性をテストして、同期タスクのリソースグループがデータソースに接続できることを確認する必要があります。そうしないと、データ同期タスクを実行できません。

    1. [接続設定] セクションで、リソースグループの [接続ステータス] 列にある [接続性のテスト] をクリックします。

    2. 接続性テストに合格すると、[接続ステータス][接続済み] に変わります。[完了] をクリックします。新しいデータソースがデータソースリストに表示されます。

      接続性テストが [失敗] した場合は、[接続診断ツール] を使用して問題をトラブルシューティングします。

ステップ 3: 同期タスクの設定と実行

タスクノードの作成

  1. [データ開発] ページに移動します。

    1. DataWorks コンソールにログインします。

    2. 上部のナビゲーションバーで、リソースグループとリージョンを選択します。

    3. 左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] を選択します。

    4. ワークスペースを選択し、[DataStudio へ移動] をクリックします。

  2. DataStudio コンソールの[データ開発] ページで、[プロジェクトフォルダー] の右側にある image アイコンをクリックし、次に [新規ノード] > [データ統合] > [バッチ同期] を選択します。

  3. [ノードの作成] ダイアログボックスで、[パス] を選択し、データソースとデータ宛先の両方を Tablestore に設定し、名前を入力して [確認] をクリックします。

同期タスクの設定

[プロジェクトフォルダ] で、新しく作成されたオフライン同期タスクノードをクリックします。コードレス UI またはコードエディタで同期タスクを設定できます。

コードレス UI (デフォルト)

次の項目を設定します:

  • [データソース]: ソースと宛先のデータソースを選択します。

  • [ランタイムリソース]: リソースグループを選択します。データソースの接続性は自動的にテストされます。

  • データソース

    • [テーブル]: ドロップダウンリストから、ソースデータテーブルを選択します。

    • [プライマリキー範囲 (開始)]: データ読み取り操作の開始プライマリキー。フォーマットは JSON 配列です。inf_min は負の無限大を示します。

      プライマリキーが id という名前の int プライマリキー列と name という名前の string プライマリキー列で構成されている場合、次の設定が例です:

      特定のプライマリキー範囲

      完全データ

      [
        {
          "type": "int",
          "value": "000"
        },
        {
          "type": "string",
          "value": "aaa"
        }
      ]
      [
        {
          "type": "inf_min"
        },
        {
          "type": "inf_min"
        }
      ]
    • [プライマリキー範囲 (終了)]: データ読み取り操作の終了プライマリキー。フォーマットは JSON 配列です。inf_max は正の無限大を示します。

      プライマリキーが id という名前の int プライマリキー列と name という名前の string プライマリキー列で構成されている場合、次の設定が例です:

      特定のプライマリキー範囲

      完全データ

      [
        {
          "type": "int",
          "value": "999"
        },
        {
          "type": "string",
          "value": "zzz"
        }
      ]
      [
        {
          "type": "inf_max"
        },
        {
          "type": "inf_max"
        }
      ]
    • [シャード設定]: カスタムシャード設定。フォーマットは JSON 配列です。通常の状況では、このパラメーターを設定する必要はありません。[] に設定できます。

      Tablestore データストレージでホットスポットが発生し、Tablestore Reader の自動シャーディングポリシーが効果的でない場合は、カスタムシャーディングルールを使用できます。シャーディングは、開始と終了のプライマリキー範囲内のシャードポイントを指定します。すべてのプライマリキーではなく、シャードキーのみを設定する必要があります。

  • データ送信先

    • [テーブル]: ドロップダウンリストから、宛先データテーブルを選択します。

    • [プライマリキー情報]: 宛先データテーブルのプライマリキー情報。フォーマットは JSON 配列です。

      プライマリキーが id という名前の int プライマリキー列と name という名前の string プライマリキー列で構成されている場合、次の設定が例です:

      [
        {
          "name": "id",
          "type": "int"
        },
        {
          "name": "name",
          "type": "string"
        }
      ]
    • [書き込みモード]: Tablestore にデータを書き込むモード。次のモードがサポートされています:

      • PutRow: 行データを書き込みます。ターゲット行が存在しない場合は、新しい行が追加されます。ターゲット行が存在する場合は、元の行が上書きされます。

      • UpdateRow: 行データを更新します。行が存在しない場合は、新しい行が追加されます。行が存在する場合は、リクエストに基づいて行の指定された列の値が追加、変更、または削除されます。

  • [宛先フィールドマッピング]: ソースデータテーブルから宛先データテーブルへのフィールドマッピングを設定します。各行は JSON フォーマットのフィールドを表します。

    • [ソースフィールド]: これには、ソースデータテーブルのプライマリキー情報を含める必要があります。

      プライマリキーが id という名前の int プライマリキー列と name という名前の string プライマリキー列で構成され、属性列に age という名前の int フィールドが含まれる場合、次の設定が例です:

      {"name":"id","type":"int"}
      {"name":"name","type":"string"}
      {"name":"age","type":"int"}
    • [ターゲットフィールド]: これには、宛先データテーブルのプライマリキー情報を含める必要はありません。

      プライマリキーが id という名前の int プライマリキー列と name という名前の string プライマリキー列で構成され、属性列に age という名前の int フィールドが含まれる場合、次の設定が例です:

      {"name":"age","type":"int"}

設定が完了したら、ページ上部の [保存] をクリックします。

コードエディタ

ページ上部の [コードエディタ] をクリックします。表示されるページでスクリプトを編集します。

次の例は、プライマリキーが id という名前の int プライマリキー列と name という名前の string プライマリキー列で構成され、属性列に age という名前の int フィールドが含まれるテーブルの設定を示しています。タスクを設定する際は、サンプルスクリプト内の datasourcetable の名前を実際の値に置き換えてください。

完全データ

{
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "ots",
            "parameter": {
                "datasource": "source_data",
                "column": [
                    {
                        "name": "id",
                        "type": "int"
                    },
                    {
                        "name": "name",
                        "type": "string"
                    },
                    {
                        "name": "age",
                        "type": "int"
                    }
                ],
                "range": {
                    "begin": [
                        {
                            "type": "inf_min"
                        },
                        {
                            "type": "inf_min"
                        }
                    ],
                    "end": [
                        {
                            "type": "inf_max"
                        },
                        {
                            "type": "inf_max"
                        }
                    ],
                    "split": []
                },
                "table": "source_table",
                "newVersion": "true"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "ots",
            "parameter": {
                "datasource": "target_data",
                "column": [
                    {
                        "name": "age",
                        "type": "int"
                    }
                ],
                "writeMode": "UpdateRow",
                "table": "target_table",
                "newVersion": "true",
                "primaryKey": [
                    {
                        "name": "id",
                        "type": "int"
                    },
                    {
                        "name": "name",
                        "type": "string"
                    }
                ]
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "concurrent": 2,
            "throttle": false
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

特定のプライマリキー範囲

{
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "ots",
            "parameter": {
                "datasource": "source_data",
                "column": [
                    {
                        "name": "id",
                        "type": "int"
                    },
                    {
                        "name": "name",
                        "type": "string"
                    },
                    {
                        "name": "age",
                        "type": "int"
                    }
                ],
                "range": {
                    "begin": [
                        {
                            "type": "int",
                            "value": "000"
                        },
                        {
                            "type": "string",
                            "value": "aaa"
                        }
                    ],
                    "end": [
                        {
                            "type": "int",
                            "value": "999"
                        },
                        {
                            "type": "string",
                            "value": "zzz"
                        }
                    ],
                    "split": []
                },
                "table": "source_table",
                "newVersion": "true"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "ots",
            "parameter": {
                "datasource": "target_data",
                "column": [
                    {
                        "name": "age",
                        "type": "int"
                    }
                ],
                "writeMode": "UpdateRow",
                "table": "target_table",
                "newVersion": "true",
                "primaryKey": [
                    {
                        "name": "id",
                        "type": "int"
                    },
                    {
                        "name": "name",
                        "type": "string"
                    }
                ]
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "concurrent": 2,
            "throttle": false
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

スクリプトの編集が完了したら、ページ上部の [保存] をクリックします。

同期タスクの実行

ページ上部の [実行] をクリックして同期タスクを開始します。タスクを初めて実行するときは、[デバッグ設定] を確認する必要があります。

ステップ 4: 同期結果の表示

同期タスクの実行後、ログで実行ステータスを表示し、Tablestore コンソールで同期結果を確認できます。

  1. ページ下部でタスクの実行ステータスと結果を表示します。次のログ情報は、同期タスクが正常に実行されたことを示しています。

    2025-11-18 11:16:23 INFO Shell run successfully!
    2025-11-18 11:16:23 INFO Current task status: FINISH
    2025-11-18 11:16:23 INFO Cost time is: 77.208s
  2. ターゲットテーブルのデータを表示します。

    1. Tablestore コンソールに移動します。上部のナビゲーションバーで、リソースグループとリージョンを選択します。

    2. インスタンスのエイリアスをクリックします。[データテーブルリスト] で、ターゲットデータテーブルをクリックします。

    3. [データ管理] をクリックして、ターゲットデータテーブルのデータを表示します。

コマンドラインインターフェイスを使用したデータの同期

このメソッドでは、ソーステーブルからローカルの JSON ファイルに手動でデータをエクスポートし、そのファイルをターゲットテーブルにインポートする必要があります。このメソッドは少量のデータの移行にのみ適しており、大規模なデータ移行には推奨されません。

ステップ 1: 準備

ステップ 2: ソーステーブルデータのエクスポート

  1. コマンドラインインターフェイスを起動し、`config` コマンドを実行して、ソーステーブルが配置されているインスタンスのアクセス情報を設定します。詳細については、「起動とアクセス情報の設定」をご参照ください。

    コマンドを実行する前に、`endpoint`、`instance`、`id`、および `key` を、ソーステーブルが配置されているインスタンスのエンドポイント、インスタンス名、AccessKey ID、および AccessKey Secret に置き換えてください。
    config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************
  2. データをエクスポートします。

    1. use コマンドを実行して、ソーステーブルを選択します。次の例では source_table を使用します。

      use --wc -t source_table
    2. ソーステーブルからローカルの JSON ファイルにデータをエクスポートします。詳細については、「データのエクスポート」をご参照ください。

      scan -o /tmp/sourceData.json

ステップ 3: ターゲットテーブルへのデータのインポート

  1. `config` コマンドを実行して、ターゲットテーブルが配置されているインスタンスのアクセス情報を設定します。

    コマンドを実行する前に、`endpoint`、`instance`、`id`、および `key` を、ターゲットテーブルが配置されているインスタンスのエンドポイント、インスタンス名、AccessKey ID、および AccessKey Secret に置き換えてください。
    config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************
  2. データをインポートします。

    1. use コマンドを実行して、ターゲットテーブルを選択します。次の例では target_table を使用します。

      use --wc -t target_table
    2. ローカルの JSON ファイルからターゲットテーブルにデータをインポートします。詳細については、「データのインポート」をご参照ください。

      import -i /tmp/sourceData.json 

よくある質問