全部产品
Search
文档中心

Tablestore:Mulai menggunakan

更新时间:Jul 06, 2025

Tunnel Service memungkinkan Anda mengonsumsi data dalam tabel. Topik ini menjelaskan cara mulai menggunakan Tunnel Service dengan Tablestore SDK for Java.

Catatan penggunaan

  • Secara default, sistem memulai thread pool untuk membaca dan memproses data berdasarkan TunnelWorkerConfig. Jika Anda ingin menjalankan beberapa TunnelWorkers pada satu server, disarankan menggunakan TunnelWorkerConfig yang sama untuk mengonfigurasi semua TunnelWorkers.

  • TunnelWorker membutuhkan periode pemanasan untuk inisialisasi, yang ditentukan oleh parameter heartbeatIntervalInSec di TunnelWorkerConfig. Anda dapat menggunakan metode setHeartbeatIntervalInSec di TunnelWorkerConfig untuk mengonfigurasi parameter ini. Nilai default adalah 30 detik.

  • Ketika klien TunnelWorker dimatikan karena keluar tak terduga atau terminasi manual, TunnelWorker secara otomatis mendaur ulang sumber daya melalui salah satu dari metode berikut: melepaskan thread pool, memanggil metode shutdown yang telah didaftarkan untuk kelas Channel, atau menutup tunnel.

  • Periode retensi log inkremental di tunnel sama dengan periode retensi log Stream. Log Stream dapat disimpan hingga tujuh hari, sehingga log inkremental di tunnel juga dapat disimpan hingga tujuh hari.

  • Jika Anda membuat tunnel untuk mengonsumsi data diferensial atau inkremental, perhatikan hal-hal berikut:

    • Selama konsumsi data penuh, jika tunnel gagal menyelesaikan konsumsi data penuh dalam periode retensi log inkremental (maksimal tujuh hari), kesalahan OTSTunnelExpired akan terjadi ketika tunnel mulai mengonsumsi log inkremental. Akibatnya, tunnel tidak dapat mengonsumsi log inkremental.

      Jika Anda memperkirakan bahwa tunnel tidak dapat menyelesaikan konsumsi data penuh dalam jangka waktu tertentu, hubungi dukungan teknis Tablestore.

    • Selama konsumsi data inkremental, jika tunnel gagal menyelesaikan konsumsi log inkremental dalam periode retensi log inkremental (maksimal tujuh hari), tunnel mungkin mengonsumsi data dari data terbaru yang tersedia. Dalam kasus ini, data tertentu mungkin tidak dikonsumsi.

  • Setelah tunnel kedaluwarsa, Tablestore dapat menonaktifkan tunnel. Jika tunnel tetap dinonaktifkan selama lebih dari 30 hari, tunnel tersebut akan dihapus. Anda tidak dapat memulihkan tunnel yang telah dihapus.

Prasyarat

  • Operasi berikut dilakukan di Konsol Resource Access Management (RAM):

    • Pengguna RAM dibuat dan kebijakan AliyunOTSFullAccess dilekatkan pada pengguna RAM untuk memberikan izin kepada pengguna RAM mengelola Tablestore. Untuk informasi lebih lanjut, lihat Buat pengguna RAM dan Berikan izin kepada pengguna RAM.

      Catatan

      Dalam lingkungan bisnis nyata, disarankan hanya memberikan izin yang diperlukan kepada pengguna RAM berdasarkan prinsip hak istimewa minimal. Ini membantu mencegah risiko keamanan yang disebabkan oleh izin pengguna yang berlebihan.

    • Pair AccessKey dibuat untuk pengguna RAM. Untuk informasi lebih lanjut, lihat Buat pair AccessKey.

      Peringatan

      Jika pair AccessKey akun Alibaba Cloud Anda bocor, semua sumber daya dalam akun tersebut terpapar pada risiko potensial. Disarankan menggunakan pair AccessKey pengguna RAM untuk melakukan operasi guna mencegah kebocoran pair AccessKey akun Alibaba Cloud Anda.

Mulai menggunakan Tunnel Service

Gunakan Tablestore SDK for Java untuk mulai menggunakan Tunnel Service.

  1. Inisialisasi klien tunnel.

    Catatan

    Pastikan variabel lingkungan TABLESTORE_ACCESS_KEY_ID dan TABLESTORE_ACCESS_KEY_SECRET dikonfigurasi. Variabel lingkungan TABLESTORE_ACCESS_KEY_ID menentukan ID AccessKey akun Alibaba Cloud atau pengguna RAM Anda. Variabel lingkungan TABLESTORE_ACCESS_KEY_SECRET menentukan Rahasia AccessKey akun Alibaba Cloud atau pengguna RAM Anda.

    // Tentukan nama instance Tablestore.
    // Tentukan titik akhir instance Tablestore. Contoh: https://instance.cn-hangzhou.ots.aliyuncs.com.
    // Tentukan ID AccessKey dan Rahasia AccessKey akun Alibaba Cloud atau pengguna RAM Anda.
    final String instanceName = "yourInstanceName";
    final String endPoint = "yourEndpoint";
    final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
  2. Buat tunnel.

    Sebelum membuat tunnel, buat tabel data untuk pengujian atau siapkan tabel yang ada. Anda dapat membuat tabel di Konsol Tablestore atau menggunakan metode createTable dari SyncClient.

    Penting

    Ketika membuat tunnel Stream atau BaseAndStream, patuhi aturan berikut untuk menentukan timestamp:

    • Jika Anda tidak menentukan timestamp awal untuk data inkremental, timestamp awal adalah waktu pembuatan tunnel.

    • Jika Anda menentukan timestamp awal dan timestamp akhir untuk data inkremental, timestamp awal atau timestamp akhir harus berada dalam rentang [Waktu sistem saat ini - Periode validitas Stream + 5 menit, Waktu sistem saat ini] dalam milidetik.

      • Periode validitas Stream mengacu pada periode validitas log inkremental dalam milidetik. Periode validitas Stream maksimum adalah tujuh hari. Anda dapat menentukan periode validitas Stream ketika mengaktifkan Stream untuk tabel data. Setelah menentukan periode validitas Stream, Anda tidak dapat memodifikasi periode tersebut.

      • Timestamp akhir harus lebih besar dari timestamp awal.

    // Anda dapat membuat tiga jenis tunnel: TunnelType.BaseData, TunnelType.Stream, dan TunnelType.BaseAndStream.
    // Kode berikut memberikan contoh cara membuat tunnel BaseAndStream. Untuk membuat tunnel jenis lain, atur TunnelType di CreateTunnelRequest ke jenis yang diinginkan.
    final String tableName = "testTable";
    final String tunnelName = "testTunnel";
    CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = tunnelClient.createTunnel(request);
    // Gunakan tunnelId untuk menginisialisasi TunnelWorker. Panggil operasi ListTunnel atau DescribeTunnel untuk mendapatkan ID tunnel.
    String tunnelId = resp.getTunnelId();
    System.out.println("Buat Tunnel, Id: " + tunnelId);
  3. Tentukan callback konsumsi data kustom untuk memulai konsumsi data otomatis. Tabel berikut menjelaskan konfigurasi TunnelClient.

    // Tentukan callback untuk konsumsi data untuk memanggil operasi IChannelProcessor, yang menentukan metode process dan shutdown.
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            // Parameter ProcessRecordsInput berisi data yang Anda peroleh.
            System.out.println("Prosesor rekaman default, akan mencetak jumlah rekaman");
            System.out.println(
                // Parameter NextToken digunakan oleh klien tunnel untuk membagi halaman data.
                String.format("Proses %d rekaman, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // Simulasikan konsumsi dan pemrosesan data.
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    
    // Secara default, sistem memulai thread pool untuk membaca dan memproses data berdasarkan TunnelWorkerConfig. Jika Anda menggunakan satu server, beberapa TunnelWorkers dimulai.
    // Kami sarankan Anda mengonfigurasi TunnelWorkers menggunakan TunnelWorkerConfig yang sama. TunnelWorkerConfig menyediakan parameter tingkat lanjut.
    TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    // Konfigurasikan TunnelWorkers dan mulai pemrosesan data otomatis.
    TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
    try {
        worker.connectAndWorking();
    } catch (Exception e) {
        e.printStackTrace();
        config.shutdown();
        worker.shutdown();
        tunnelClient.shutdown();
    }

Konfigurasikan TunnelWorkerConfig

TunnelWorkerConfig memungkinkan Anda mengonfigurasi parameter kustom untuk klien tunnel sesuai kebutuhan bisnis Anda. Tabel berikut menjelaskan parameter di Tablestore SDK for Java.

Konfigurasi

Parameter

Deskripsi

Interval dan periode timeout untuk heartbeat

heartbeatTimeoutInSec

Periode timeout untuk heartbeat. Nilai default: 300. Satuan: detik.

Ketika terjadi timeout heartbeat, server tunnel menganggap instance TunnelClient saat ini tidak tersedia. Dalam kasus ini, klien tunnel harus menyambungkan kembali ke server tunnel.

heartbeatIntervalInSec

Interval untuk heartbeat. Nilai default: 30. Nilai minimum: 5. Satuan: detik.

Anda dapat mendeteksi heartbeat untuk memantau saluran aktif, memperbarui status saluran, dan secara otomatis menginisialisasi tugas pemrosesan data.

Interval antara checkpoint

checkpointIntervalInMillis

Interval antara checkpoint ketika data dikonsumsi. Interval dicatat di server tunnel.

Nilai default: 5000. Satuan: milidetik.

Catatan
  • Jika data yang ingin Anda baca disimpan di server yang berbeda, berbagai kesalahan mungkin terjadi ketika Anda menjalankan proses. Misalnya, server mungkin restart karena faktor lingkungan. Oleh karena itu, server tunnel secara berkala mencatat checkpoint setelah data diproses. Tugas memproses data dari checkpoint terakhir setelah tugas dimulai ulang. Dalam kasus tertentu, Tunnel Service mungkin menyinkronkan data secara berurutan sekali atau beberapa kali. Jika data tertentu diproses ulang, periksa logika pemrosesan bisnis.

  • Untuk mencegah data diproses ulang ketika kesalahan terjadi, catat checkpoint tambahan. Perhatikan bahwa jumlah checkpoint yang berlebihan dapat mengurangi throughput sistem. Kami sarankan Anda mencatat checkpoint berdasarkan kebutuhan bisnis Anda.

Tag klien kustom

clientTag

Tag klien kustom yang digunakan untuk menghasilkan ID klien tunnel. Anda dapat mengonfigurasi parameter ini untuk membedakan TunnelWorkers.

Tentukan callback kustom untuk pemrosesan data

channelProcessor

Callback yang didaftarkan oleh pengguna untuk memproses data, termasuk metode process dan shutdown.

Konfigurasi thread pool untuk membaca dan memproses data

readRecordsExecutor

Thread pool yang ingin Anda gunakan untuk membaca data. Jika Anda tidak memiliki persyaratan khusus, gunakan konfigurasi default.

processRecordsExecutor

Thread pool yang ingin Anda gunakan untuk memproses data. Jika Anda tidak memiliki persyaratan khusus, gunakan konfigurasi default.

Catatan
  • Ketika Anda menentukan konfigurasi untuk thread pool, kami sarankan Anda menyetel jumlah thread ke jumlah saluran di tunnel. Dengan cara ini, sumber daya komputasi, seperti CPU, dapat dialokasikan dengan cepat ke setiap saluran.

  • Tablestore melakukan operasi berikut pada konfigurasi default pool untuk memastikan throughput:

    • Alokasikan 32 core thread sebelumnya untuk memastikan throughput real-time ketika jumlah data kecil atau jumlah saluran sedikit.

    • Kurangi panjang antrian ketika jumlah data besar harus diproses atau ketika jumlah saluran banyak. Dengan cara ini, kebijakan dipicu untuk membuat thread di pool dan mengalokasikan lebih banyak sumber daya komputasi.

    • Kami sarankan Anda menyetel waktu keep-alive thread menjadi 60 detik. Jika jumlah data yang ingin Anda proses berkurang, Anda dapat mendaur ulang sumber daya thread.

Kontrol memori

maxChannelParallel

Tingkat konkurensi maksimum saluran untuk membaca dan memproses data untuk kontrol memori.

Nilai default adalah -1, yang menentukan bahwa tingkat konkurensi tidak terbatas.

Catatan

Tablestore SDK for Java V5.10.0 dan versi lebih baru mendukung fitur ini.

Waktu backoff maksimum

maxRetryIntervalInMillis

Nilai dasar untuk menghitung waktu backoff maksimum untuk tunnel. Waktu backoff maksimum adalah angka acak yang berkisar antara 0.75 × maxRetryIntervalInMillis hingga 1.25 × maxRetryIntervalInMillis.

Nilai default: 2000. Nilai minimum: 200. Satuan: milidetik.

Catatan
  • Tablestore SDK for Java V5.4.0 dan versi lebih baru mendukung fitur ini.

  • Jika jumlah data yang ingin Anda proses kurang dari 900 KB atau 500 bagian per ekspor, klien tunnel menggunakan backoff eksponensial hingga waktu backoff maksimum tercapai.

Pendeteksian saluran CLOSING

enableClosingChannelDetect

Menentukan apakah akan mengaktifkan pendeteksian real-time untuk saluran CLOSING. Nilai default: false, yang menentukan bahwa pendeteksian real-time untuk saluran CLOSING dinonaktifkan.

Catatan
  • Tablestore SDK for Java V5.13.13 dan versi lebih baru mendukung fitur ini.

  • Jika Anda tidak mengaktifkan fitur ini, saluran mungkin ditangguhkan dan konsumsi dapat terganggu dalam skenario tertentu, seperti ketika banyak saluran ada tetapi sumber daya klien tidak mencukupi.

  • Saluran CLOSING: saluran yang sedang beralih dari satu klien tunnel ke klien lain.

Lampiran: Contoh kode

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            System.out.println("Prosesor rekaman default, akan mencetak jumlah rekaman");
            System.out.println(
                // Parameter NextToken digunakan oleh klien Tunnel untuk membagi halaman data.
                String.format("Proses %d rekaman, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // Simulasikan konsumsi dan pemrosesan data.
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    public static void main(String[] args) throws Exception {
        //1. Inisialisasi klien tunnel.
        // Tentukan nama instance.
        final String instanceName = "yourInstanceName";
        // Tentukan titik akhir instance.
        final String endPoint = "yourEndpoint";
        // Dapatkan ID AccessKey dan Rahasia AccessKey dari variabel lingkungan.
        final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
        final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
        TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
        //2. Buat tunnel. Sebelum Anda melakukan langkah ini, Anda harus membuat tabel untuk pengujian. Anda dapat membuat tabel di Konsol Tablestore atau menggunakan metode createTable dari SyncClient.
        final String tableName = "testTable";
        final String tunnelName = "testTunnel";
        CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
        CreateTunnelResponse resp = tunnelClient.createTunnel(request);
        // Gunakan parameter tunnelId untuk menginisialisasi TunnelWorker. Anda dapat memanggil operasi ListTunnel atau DescribeTunnel untuk mendapatkan ID tunnel.
        String tunnelId = resp.getTunnelId();
        System.out.println("Buat Tunnel, Id: " + tunnelId);
        //3. Tentukan fungsi callback kustom untuk memulai konsumsi data otomatis.
        // TunnelWorkerConfig menyediakan parameter tingkat lanjut.
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
        try {
            worker.connectAndWorking();
        } catch (Exception e) {
            e.printStackTrace();
            config.shutdown();
            worker.shutdown();
            tunnelClient.shutdown();
        }
    }
}