Tablestore menyediakan berbagai metode untuk memigrasikan atau menyinkronkan data antar tabel. Anda dapat menggunakan Tunnel Service, DataWorks, DataX, atau antarmuka baris perintah untuk menyinkronkan data dari satu tabel ke tabel lainnya.
Prasyarat
Dapatkan nama instans, titik akhir, dan ID wilayah untuk tabel sumber dan target.
Buat AccessKey untuk Akun Alibaba Cloud Anda atau Pengguna RAM dengan izin Tablestore.
Sinkronisasi data menggunakan SDK
Anda dapat menyinkronkan data antar tabel menggunakan Tunnel Service. Metode ini mendukung sinkronisasi data dalam wilayah yang sama, lintas wilayah berbeda, dan lintas akun berbeda. Tunnel Service menangkap perubahan data dan menyinkronkannya ke tabel target secara real time. Contoh berikut menunjukkan cara menggunakan Java SDK untuk mengimplementasikan sinkronisasi ini.
Sebelum menjalankan kode, ganti nama tabel, nama instans, dan titik akhir dengan nilai aktual untuk tabel sumber dan target. Kemudian, konfigurasikan ID AccessKey dan Rahasia AccessKey sebagai variabel lingkungan.
import com.alicloud.openservices.tablestore.*;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.model.tunnel.*;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class TableSynchronization {
// Konfigurasi tabel sumber: nama tabel, nama instans, titik akhir, ID AccessKey, Rahasia AccessKey
final static String sourceTableName = "sourceTableName";
final static String sourceInstanceName = "sourceInstanceName";
final static String sourceEndpoint = "sourceEndpoint";
final static String sourceAccessKeyId = System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_ID");
final static String sourceKeySecret = System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_SECRET");
// Konfigurasi tabel target: nama tabel, nama instans, titik akhir, ID AccessKey, Rahasia AccessKey
final static String targetTableName = "targetTableName";
final static String targetInstanceName = "targetInstanceName";
final static String targetEndpoint = "targetEndpoint";
final static String targetAccessKeyId = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_ID");
final static String targetKeySecret = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_SECRET");
// Nama tunnel
static String tunnelName = "source_table_tunnel";
// TablestoreWriter: Alat untuk penulisan data dengan konkurensi tinggi.
static TableStoreWriter tableStoreWriter;
// Statistik untuk baris yang berhasil dan gagal.
static AtomicLong succeedRows = new AtomicLong();
static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) {
// Buat tabel target.
createTargetTable();
System.out.println("Buat tabel target: Selesai.");
// Inisialisasi TunnelClient.
TunnelClient tunnelClient = new TunnelClient(sourceEndpoint, sourceAccessKeyId, sourceKeySecret, sourceInstanceName);
// Buat tunnel.
String tunnelId = createTunnel(tunnelClient);
System.out.println("Buat tunnel: Selesai.");
// Inisialisasi TablestoreWriter.
tableStoreWriter = createTablesStoreWriter();
// Sinkronisasi data melalui tunnel.
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
try {
System.out.println("Menghubungkan ke tunnel dan bekerja...");
worker.connectAndWorking();
// Pantau status tunnel. Ketika status berubah dari sinkronisasi data penuh ke sinkronisasi inkremental, sinkronisasi data selesai.
while (true) {
if (tunnelClient.describeTunnel(new DescribeTunnelRequest(sourceTableName, tunnelName)).getTunnelInfo().getStage().equals(TunnelStage.ProcessStream)) {
break;
}
Thread.sleep(5000);
}
// Hasil sinkronisasi.
System.out.println("Sinkronisasi data selesai.");
System.out.println("* Baris berhasil: " + succeedRows.get());
System.out.println("* Baris gagal: " + failedRows.get());
// Hapus tunnel.
tunnelClient.deleteTunnel(new DeleteTunnelRequest(sourceTableName, tunnelName));
// Matikan sumber daya.
worker.shutdown();
config.shutdown();
tunnelClient.shutdown();
tableStoreWriter.close();
}catch(Exception e){
e.printStackTrace();
worker.shutdown();
config.shutdown();
tunnelClient.shutdown();
tableStoreWriter.close();
}
}
private static void createTargetTable() throws ClientException {
// Kueri informasi tabel sumber.
SyncClient sourceClient = new SyncClient(sourceEndpoint, sourceAccessKeyId, sourceKeySecret, sourceInstanceName);
DescribeTableResponse response = sourceClient.describeTable(new DescribeTableRequest(sourceTableName));
// Buat tabel target.
SyncClient targetClient = new SyncClient(targetEndpoint, targetAccessKeyId, targetKeySecret, targetInstanceName);
TableMeta tableMeta = new TableMeta(targetTableName);
response.getTableMeta().getPrimaryKeyList().forEach(
item -> tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema(item.getName(), item.getType()))
);
TableOptions tableOptions = new TableOptions(-1, 1);
CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);
targetClient.createTable(request);
// Matikan sumber daya.
sourceClient.shutdown();
targetClient.shutdown();
}
private static String createTunnel(TunnelClient client) {
// Buat tunnel dan kembalikan ID tunnel.
CreateTunnelRequest request = new CreateTunnelRequest(sourceTableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse response = client.createTunnel(request);
return response.getTunnelId();
}
private static class SimpleProcessor implements IChannelProcessor {
@Override
public void process(ProcessRecordsInput input) {
if(input.getRecords().isEmpty())
return;
System.out.print("* Mulai mengonsumsi " + input.getRecords().size() + " catatan... ");
for (StreamRecord record : input.getRecords()) {
switch (record.getRecordType()) {
// Tulis data baris.
case PUT:
RowPutChange putChange = new RowPutChange(targetTableName, record.getPrimaryKey());
putChange.addColumns(getColumnsFromRecord(record));
tableStoreWriter.addRowChange(putChange);
break;
// Perbarui data baris.
case UPDATE:
RowUpdateChange updateChange = new RowUpdateChange(targetTableName, record.getPrimaryKey());
for (RecordColumn column : record.getColumns()) {
switch (column.getColumnType()) {
// Tambahkan kolom atribut.
case PUT:
updateChange.put(column.getColumn().getName(), column.getColumn().getValue(), System.currentTimeMillis());
break;
// Hapus satu versi kolom atribut.
case DELETE_ONE_VERSION:
updateChange.deleteColumn(column.getColumn().getName(),
column.getColumn().getTimestamp());
break;
// Hapus kolom atribut.
case DELETE_ALL_VERSION:
updateChange.deleteColumns(column.getColumn().getName());
break;
default:
break;
}
}
tableStoreWriter.addRowChange(updateChange);
break;
// Hapus data baris.
case DELETE:
RowDeleteChange deleteChange = new RowDeleteChange(targetTableName, record.getPrimaryKey());
tableStoreWriter.addRowChange(deleteChange);
break;
}
}
// Flush buffer.
tableStoreWriter.flush();
System.out.println("Selesai.");
}
@Override
public void shutdown() {
}
}
public static List<Column> getColumnsFromRecord(StreamRecord record) {
List<Column> retColumns = new ArrayList<>();
for (RecordColumn recordColumn : record.getColumns()) {
// Ganti nomor versi data dengan timestamp saat ini untuk mencegah melebihi deviasi versi maksimum.
Column column = new Column(recordColumn.getColumn().getName(), recordColumn.getColumn().getValue(), System.currentTimeMillis());
retColumns.add(column);
}
return retColumns;
}
private static TableStoreWriter createTablesStoreWriter() {
WriterConfig config = new WriterConfig();
// Callback tingkat baris untuk menghitung baris yang berhasil dan gagal serta mencetak informasi tentang baris yang gagal.
TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
@Override
public void onCompleted(RowChange rowChange, RowWriteResult rowWriteResult) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(RowChange rowChange, Exception exception) {
failedRows.incrementAndGet();
System.out.println("* Baris Gagal: " + rowChange.getTableName() + " | " + rowChange.getPrimaryKey() + " | " + exception.getMessage());
}
};
ServiceCredentials credentials = new DefaultCredentials(targetAccessKeyId, targetKeySecret);
return new DefaultTableStoreWriter(targetEndpoint, credentials, targetInstanceName,
targetTableName, config, resultCallback);
}
}Sinkronisasi data menggunakan DataWorks
DataWorks menyediakan layanan integrasi data visual yang memungkinkan Anda mengonfigurasi tugas sinkronisasi antar tabel Tablestore melalui antarmuka grafis. Anda juga dapat menggunakan alat lain, seperti DataX, untuk menyinkronkan data antar tabel Tablestore.
Langkah 1: Persiapan
Buat tabel data target. Pastikan struktur kunci primer tabel target, termasuk tipe data dan urutan kolom kunci primer, identik dengan tabel sumber.
Aktifkan DataWorks dan buat ruang kerja di wilayah tempat tabel sumber atau target berada.
Buat kelompok sumber daya arsitektur tanpa server dan sambungkan ke ruang kerja. Untuk informasi lebih lanjut tentang penagihan, lihat Penagihan kelompok sumber daya arsitektur tanpa server.
Jika tabel sumber dan target berada di wilayah berbeda, Anda harus membuat koneksi peering VPC untuk membangun konektivitas jaringan lintas wilayah.
Langkah 2: Tambahkan sumber data Tablestore
Tambahkan sumber data Tablestore untuk instans tabel sumber maupun instans tabel target.
Masuk ke Konsol DataWorks. Alihkan ke wilayah tujuan. Di panel navigasi kiri, pilih . Dari daftar drop-down, pilih ruang kerja dan klik Go To Data Integration.
Di panel navigasi kiri, klik Data Source.
Di halaman Data Source List, klik Add Data Source.
Di kotak dialog Add Data Source, cari dan pilih Tablestore sebagai tipe sumber data.
Di kotak dialog Add OTS Data Source, konfigurasikan parameter sumber data seperti dijelaskan dalam tabel berikut.
Parameter
Deskripsi
Data Source Name
Nama sumber data harus merupakan kombinasi huruf, angka, dan garis bawah (_). Tidak boleh dimulai dengan angka atau garis bawah (_).
Data Source Description
Deskripsi singkat tentang sumber data. Panjang deskripsi tidak boleh melebihi 80 karakter.
Region
Pilih wilayah tempat instans Tablestore berada.
Tablestore Instance Name
Nama instans Tablestore.
Endpoint
Titik akhir instans Tablestore. Gunakan titik akhir VPC.
AccessKey ID
ID AccessKey dan Rahasia AccessKey dari Akun Alibaba Cloud atau Pengguna RAM.
AccessKey Secret
Uji konektivitas kelompok sumber daya.
Saat membuat sumber data, Anda harus menguji konektivitas kelompok sumber daya untuk memastikan bahwa kelompok sumber daya untuk tugas sinkronisasi dapat terhubung ke sumber data. Jika tidak, tugas sinkronisasi data tidak dapat dijalankan.
Di bagian Connection Configuration, klik Test Connectivity di kolom Connectivity Status untuk kelompok sumber daya.
Setelah pengujian konektivitas berhasil, Connectivity Status berubah menjadi Connected. Klik Selesai. Sumber data baru muncul di daftar sumber data.
Jika pengujian konektivitas Fails, gunakan Connectivity Diagnostic Tool untuk memecahkan masalah.
Langkah 3: Konfigurasi dan jalankan tugas sinkronisasi
Buat node tugas
Buka halaman Data Development.
Masuk ke Konsol DataWorks.
Di bilah navigasi atas, pilih kelompok sumber daya dan wilayah.
Di panel navigasi kiri, pilih .
Pilih ruang kerja dan klik Go To DataStudio.
Di halaman Data Development Konsol DataStudio, klik ikon
di sebelah kanan Project Folder, lalu pilih .Di kotak dialog Create Node, pilih Path, atur baik Sumber Data maupun Tujuan Data ke Tablestore, masukkan nama, lalu klik Confirm.
Konfigurasi tugas sinkronisasi
Di Project Folder, klik node tugas sinkronisasi offline yang baru dibuat. Anda dapat mengonfigurasi tugas sinkronisasi di Antarmuka Tanpa Kode atau editor kode.
Antarmuka Tanpa Kode (default)
Konfigurasikan item berikut:
Data Source: Pilih sumber data untuk sumber dan tujuan.
Runtime Resource: Pilih kelompok sumber daya. Konektivitas sumber data diuji secara otomatis.
Data Source:
Table: Dari daftar drop-down, pilih tabel data sumber.
Primary Key Range (start): Kunci primer awal untuk operasi pembacaan data. Formatnya adalah larik JSON.
inf_minmenunjukkan tak hingga negatif.Jika kunci primer terdiri dari kolom kunci primer
intbernamaiddan kolom kunci primerstringbernamaname, konfigurasi berikut adalah contohnya:Rentang kunci primer tertentu
Data lengkap
[ { "type": "int", "value": "000" }, { "type": "string", "value": "aaa" } ][ { "type": "inf_min" }, { "type": "inf_min" } ]Primary Key Range (end): Kunci primer akhir untuk operasi pembacaan data. Formatnya adalah larik JSON.
inf_maxmenunjukkan tak hingga positif.Jika kunci primer terdiri dari kolom kunci primer
intbernamaiddan kolom kunci primerstringbernamaname, konfigurasi berikut adalah contohnya:Rentang kunci primer tertentu
Data lengkap
[ { "type": "int", "value": "999" }, { "type": "string", "value": "zzz" } ][ { "type": "inf_max" }, { "type": "inf_max" } ]Shard Configuration: Konfigurasi shard kustom. Formatnya adalah larik JSON. Dalam kondisi normal, Anda tidak perlu mengonfigurasi parameter ini. Anda dapat mengaturnya ke
[].Jika terjadi hot spot dalam penyimpanan data Tablestore dan kebijakan sharding otomatis Tablestore Reader tidak efektif, Anda dapat menggunakan aturan sharding kustom. Sharding menentukan titik shard dalam rentang kunci primer awal dan akhir. Anda hanya perlu mengonfigurasi kunci shard, bukan semua kunci primer.
Data Destination:
Table: Dari daftar drop-down, pilih tabel data tujuan.
Primary Key Information: Informasi kunci primer tabel data tujuan. Formatnya adalah larik JSON.
Jika kunci primer terdiri dari kolom kunci primer
intbernamaiddan kolom kunci primerstringbernamaname, konfigurasi berikut adalah contohnya:[ { "name": "id", "type": "int" }, { "name": "name", "type": "string" } ]Write Mode: Mode untuk menulis data ke Tablestore. Mode berikut didukung:
PutRow: Menulis data baris. Jika baris target tidak ada, baris baru ditambahkan. Jika baris target ada, baris asli ditimpa.
UpdateRow: Memperbarui data baris. Jika baris tidak ada, baris baru ditambahkan. Jika baris ada, nilai kolom tertentu dalam baris ditambahkan, dimodifikasi, atau dihapus berdasarkan permintaan.
Destination Field Mapping: Konfigurasikan pemetaan bidang dari tabel data sumber ke tabel data tujuan. Setiap baris merepresentasikan satu bidang dalam format JSON.
Source Field: Harus mencakup informasi kunci primer tabel data sumber.
Jika kunci primer terdiri dari kolom kunci primer
intbernamaiddan kolom kunci primerstringbernamaname, dan kolom atribut mencakup bidangintbernamaage, konfigurasi berikut adalah contohnya:{"name":"id","type":"int"} {"name":"name","type":"string"} {"name":"age","type":"int"}Target Field: Tidak perlu mencakup informasi kunci primer tabel data tujuan.
Jika kunci primer terdiri dari kolom kunci primer
intbernamaiddan kolom kunci primerstringbernamaname, dan kolom atribut mencakup bidangintbernamaage, konfigurasi berikut adalah contohnya:{"name":"age","type":"int"}
Setelah menyelesaikan konfigurasi, klik Save di bagian atas halaman.
Editor kode
Klik Code Editor di bagian atas halaman. Edit skrip di halaman yang muncul.
Contoh berikut menunjukkan konfigurasi untuk tabel di mana kunci primer terdiri dari kolom kunci primerintbernamaiddan kolom kunci primerstringbernamaname, dan kolom atribut mencakup bidangintbernamaage. Saat mengonfigurasi tugas, ganti namadatasourcedantabledalam contoh skrip dengan nilai aktual Anda.
Data lengkap
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "ots",
"parameter": {
"datasource": "source_data",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
}
],
"range": {
"begin": [
{
"type": "inf_min"
},
{
"type": "inf_min"
}
],
"end": [
{
"type": "inf_max"
},
{
"type": "inf_max"
}
],
"split": []
},
"table": "source_table",
"newVersion": "true"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "ots",
"parameter": {
"datasource": "target_data",
"column": [
{
"name": "age",
"type": "int"
}
],
"writeMode": "UpdateRow",
"table": "target_table",
"newVersion": "true",
"primaryKey": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
]
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 2,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Rentang kunci primer tertentu
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "ots",
"parameter": {
"datasource": "source_data",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
}
],
"range": {
"begin": [
{
"type": "int",
"value": "000"
},
{
"type": "string",
"value": "aaa"
}
],
"end": [
{
"type": "int",
"value": "999"
},
{
"type": "string",
"value": "zzz"
}
],
"split": []
},
"table": "source_table",
"newVersion": "true"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "ots",
"parameter": {
"datasource": "target_data",
"column": [
{
"name": "age",
"type": "int"
}
],
"writeMode": "UpdateRow",
"table": "target_table",
"newVersion": "true",
"primaryKey": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
]
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 2,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Setelah selesai mengedit skrip, klik Save di bagian atas halaman.
Jalankan tugas sinkronisasi
Klik Run di bagian atas halaman untuk memulai tugas sinkronisasi. Pertama kali menjalankan tugas, Anda harus mengonfirmasi Debug Configuration.
Langkah 4: Lihat hasil sinkronisasi
Setelah tugas sinkronisasi dijalankan, Anda dapat melihat status eksekusi di log dan memeriksa hasil sinkronisasi di Konsol Tablestore.
Lihat status dan hasil tugas di bagian bawah halaman. Informasi log berikut menunjukkan bahwa tugas sinkronisasi berhasil dijalankan.
2025-11-18 11:16:23 INFO Shell run successfully! 2025-11-18 11:16:23 INFO Current task status: FINISH 2025-11-18 11:16:23 INFO Cost time is: 77.208sLihat data di tabel target.
Buka Konsol Tablestore. Di bilah navigasi atas, pilih kelompok sumber daya dan wilayah.
Klik alias instans. Di Data Table List, klik tabel data target.
Klik Data Management untuk melihat data di tabel data target.
Sinkronisasi data menggunakan antarmuka baris perintah
Metode ini mengharuskan Anda mengekspor data dari tabel sumber ke file JSON lokal secara manual, lalu mengimpor file tersebut ke tabel target. Metode ini hanya cocok untuk memigrasikan data dalam jumlah kecil dan tidak disarankan untuk migrasi data skala besar.
Langkah 1: Persiapan
Buat tabel data target. Pastikan struktur kunci primernya, termasuk nama, tipe data, dan urutan kolomnya, identik dengan tabel sumber.
Langkah 2: Ekspor data tabel sumber
Jalankan antarmuka baris perintah dan jalankan perintah `config` untuk mengonfigurasi informasi akses instans tempat tabel sumber berada. Untuk informasi lebih lanjut, lihat Mulai dan konfigurasikan informasi akses.
Sebelum menjalankan perintah, ganti `endpoint`, `instance`, `id`, dan `key` dengan titik akhir, nama instans, ID AccessKey, dan Rahasia AccessKey instans tempat tabel sumber berada.
config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************Ekspor data.
Jalankan perintah
useuntuk memilih tabel sumber. Contoh berikut menggunakansource_table.use --wc -t source_tableEkspor data dari tabel sumber ke file JSON lokal. Untuk informasi lebih lanjut, lihat Ekspor data.
scan -o /tmp/sourceData.json
Langkah 3: Impor data ke tabel target
Jalankan perintah `config` untuk mengonfigurasi informasi akses instans tempat tabel target berada.
Sebelum menjalankan perintah, ganti `endpoint`, `instance`, `id`, dan `key` dengan titik akhir, nama instans, ID AccessKey, dan Rahasia AccessKey instans tempat tabel target berada.
config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************Impor data.
Jalankan perintah
useuntuk memilih tabel target. Contoh berikut menggunakantarget_table.use --wc -t target_tableImpor data dari file JSON lokal ke tabel target. Untuk informasi lebih lanjut, lihat Impor data.
import -i /tmp/sourceData.json
