全部产品
Search
文档中心

Tablestore:Gunakan Layanan Tunnel dengan menggunakan SDK Tablestore

更新时间:Jul 02, 2025

Topik ini menjelaskan cara memulai menggunakan Layanan Tunnel dengan SDK Tablestore. Sebelum menggunakan Layanan Tunnel, pastikan Anda memahami catatan penggunaan dan operasi API.

Catatan Penggunaan

  • Secara default, sistem memulai thread pool untuk membaca dan memproses data berdasarkan TunnelWorkerConfig. Jika Anda ingin menjalankan beberapa TunnelWorker pada satu server, disarankan untuk menggunakan TunnelWorkerConfig yang sama untuk mengonfigurasi semua TunnelWorker.

  • TunnelWorker memerlukan periode pemanasan untuk inisialisasi, yang ditentukan oleh parameter heartbeatIntervalInSec dalam TunnelWorkerConfig. Anda dapat menggunakan metode setHeartbeatIntervalInSec dalam TunnelWorkerConfig untuk mengonfigurasi parameter ini. Nilai default adalah 30 detik.

  • Jika klien TunnelWorker dimatikan karena keluar tak terduga atau terminasi manual, TunnelWorker secara otomatis mendaur ulang sumber daya dengan salah satu dari metode berikut: melepaskan thread pool, memanggil metode shutdown yang didaftarkan untuk kelas Channel, atau menutup tunnel.

  • Masa retensi log inkremen dalam tunnel sama dengan masa retensi log Stream. Log Stream dapat disimpan hingga tujuh hari, sehingga log inkremen dalam tunnel juga dapat disimpan hingga tujuh hari.

  • Jika Anda membuat tunnel untuk mengonsumsi data diferensial atau inkremen, perhatikan hal-hal berikut:

    • Selama konsumsi data penuh, jika tunnel gagal menyelesaikan konsumsi data penuh dalam periode retensi log inkremen (maksimal tujuh hari), kesalahan OTSTunnelExpired akan muncul ketika tunnel mulai mengonsumsi log inkremen. Akibatnya, tunnel tidak dapat mengonsumsi log inkremen.

      Jika Anda memperkirakan bahwa tunnel tidak dapat menyelesaikan konsumsi data penuh dalam jendela waktu yang ditentukan, hubungi dukungan teknis Tablestore.

    • Selama konsumsi data inkremen, jika tunnel gagal menyelesaikan konsumsi log inkremen dalam periode retensi log inkremen (maksimal tujuh hari), tunnel mungkin mengonsumsi data dari data terbaru yang tersedia. Dalam hal ini, data tertentu mungkin tidak dikonsumsi.

  • Setelah tunnel kedaluwarsa, Tablestore mungkin menonaktifkan tunnel. Jika tunnel tetap dinonaktifkan selama lebih dari 30 hari, tunnel akan dihapus dan tidak dapat dipulihkan.

Operasi API

Operasi

Deskripsi

CreateTunnel

Membuat tunnel.

ListTunnel

Mengquery informasi tentang tunnel yang dibuat untuk tabel data.

DescribeTunnel

Mengquery informasi tentang saluran dalam tunnel.

DeleteTunnel

Menghapus tunnel.

Gunakan SDK Tablestore

Anda dapat menggunakan SDK Tablestore untuk bahasa pemrograman berikut untuk mengimplementasikan Layanan Tunnel:

Prasyarat

Gunakan Layanan Tunnel

Dalam contoh ini, SDK Tablestore untuk Java digunakan untuk memulai dengan Layanan Tunnel.

  1. Inisialisasi klien tunnel.

    Catatan

    Pastikan variabel lingkungan TABLESTORE_ACCESS_KEY_ID dan TABLESTORE_ACCESS_KEY_SECRET telah dikonfigurasi. Variabel lingkungan TABLESTORE_ACCESS_KEY_ID menentukan ID AccessKey akun Alibaba Cloud atau pengguna RAM Anda. Variabel lingkungan TABLESTORE_ACCESS_KEY_SECRET menentukan rahasia AccessKey akun Alibaba Cloud atau pengguna RAM Anda.

    // Tentukan nama instance Tablestore.
    // Tentukan titik akhir instance Tablestore. Contoh: https://instance.cn-hangzhou.ots.aliyuncs.com.
    // Tentukan ID AccessKey dan rahasia AccessKey akun Alibaba Cloud atau pengguna RAM Anda.
    final String instanceName = "yourInstanceName";
    final String endPoint = "yourEndpoint";
    final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
  2. Buat tunnel.

    Sebelum membuat tunnel, buat tabel data untuk pengujian atau siapkan tabel yang sudah ada. Anda dapat membuat tabel di konsol Tablestore atau dengan menggunakan metode createTable dari SyncClient.

    Penting

    Ketika membuat tunnel inkremen atau diferensial, patuhi aturan berikut untuk menentukan timestamp:

    • Jika Anda tidak menentukan timestamp awal untuk data inkremen, waktu saat tunnel dibuat digunakan sebagai timestamp awal.

    • Jika Anda menentukan timestamp awal dan akhir untuk data inkremen, nilai valid harus berada dalam rentang [Waktu sistem saat ini - Periode validitas Stream + 5 menit, Waktu sistem saat ini]. Satuan: milidetik.

      • Periode validitas Stream merujuk pada periode validitas log inkremen dalam milidetik. Periode validitas Stream maksimum adalah tujuh hari. Anda dapat menentukan periode validitas Stream saat mengaktifkan Stream untuk tabel data. Setelah menentukan periode validitas Stream, Anda tidak dapat memodifikasi periode tersebut.

      • Timestamp akhir harus lebih lambat dari timestamp awal.

    // Jenis tunnel berikut didukung: TunnelType.BaseData, TunnelType.Stream, dan TunnelType.BaseAndStream.
    // Kode sampel berikut memberikan contoh tentang cara membuat tunnel BaseAndStream. Untuk membuat jenis tunnel lainnya, konfigurasikan parameter TunnelType dalam CreateTunnelRequest sesuai dengan kebutuhan bisnis Anda.
    final String tableName = "testTable";
    final String tunnelName = "testTunnel";
    CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = tunnelClient.createTunnel(request);
    // Gunakan parameter tunnelId untuk menginisialisasi TunnelWorker. Anda dapat memanggil operasi ListTunnel atau DescribeTunnel untuk mendapatkan ID tunnel.
    String tunnelId = resp.getTunnelId();
    System.out.println("Buat Tunnel, Id: " + tunnelId);
  3. Tentukan panggilan balik konsumsi data kustom untuk memulai konsumsi data otomatis.

    // Tentukan panggilan balik untuk konsumsi data untuk memanggil operasi IChannelProcessor, yang menentukan metode process dan shutdown.
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            // Parameter ProcessRecordsInput berisi data yang Anda peroleh.
            System.out.println("Prosesor rekaman default, akan mencetak jumlah rekaman");
            System.out.println(
                // Parameter NextToken digunakan oleh klien tunnel untuk paginasi data.
                String.format("Proses %d rekaman, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // Simulasikan konsumsi dan pemrosesan data.
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    
    // Secara default, sistem memulai thread pool untuk membaca dan memproses data berdasarkan TunnelWorkerConfig.
    // Jika Anda ingin memulai beberapa TunnelWorker pada satu server, kami sarankan Anda menggunakan TunnelWorkerConfig yang sama untuk mengonfigurasi semua TunnelWorker. TunnelWorkerConfig menyediakan parameter tingkat lanjut.
    TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    // Konfigurasikan TunnelWorker dan mulai pemrosesan data otomatis.
    TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
    try {
        worker.connectAndWorking();
    } catch (Exception e) {
        e.printStackTrace();
        config.shutdown();
        worker.shutdown();
        tunnelClient.shutdown();
    }

Konfigurasikan TunnelWorkerConfig

TunnelWorkerConfig memungkinkan Anda mengonfigurasi parameter kustom untuk klien tunnel sesuai kebutuhan bisnis Anda. Tabel berikut menjelaskan parameter dalam SDK Tablestore untuk Java.

Konfigurasi

Parameter

Deskripsi

Interval dan periode timeout untuk heartbeat

heartbeatTimeoutInSec

Periode timeout untuk heartbeat. Nilai default: 300. Satuan: detik.

Ketika terjadi timeout heartbeat, server tunnel menganggap instance TunnelClient saat ini tidak tersedia. Dalam hal ini, klien tunnel harus menyambungkan ulang ke server tunnel.

heartbeatIntervalInSec

Interval untuk heartbeat. Nilai default: 30. Nilai minimum: 5. Satuan: detik.

Anda dapat mendeteksi heartbeat untuk memantau saluran aktif, memperbarui status saluran, dan secara otomatis menginisialisasi tugas pemrosesan data.

Interval antar checkpoint

checkpointIntervalInMillis

Interval antar checkpoint saat data dikonsumsi. Interval dicatat di server tunnel.

Nilai default: 5000. Satuan: milidetik.

Catatan
  • Jika data yang ingin Anda baca disimpan di server yang berbeda, berbagai kesalahan mungkin terjadi saat Anda menjalankan proses. Misalnya, server mungkin restart karena faktor lingkungan. Oleh karena itu, server tunnel secara berkala mencatat checkpoint setelah data diproses. Tugas memproses data dari checkpoint terakhir setelah tugas di-restart. Dalam kasus tertentu, Layanan Tunnel mungkin mensinkronkan data secara berurutan sekali atau beberapa kali. Jika data tertentu diproses ulang, periksa logika pemrosesan bisnis.

  • Untuk mencegah data diproses ulang saat terjadi kesalahan, catat checkpoint tambahan. Perhatikan bahwa jumlah checkpoint yang berlebihan dapat mengurangi throughput sistem. Kami sarankan Anda mencatat checkpoint berdasarkan kebutuhan bisnis Anda.

Tag klien kustom

clientTag

Tag klien kustom yang digunakan untuk menghasilkan ID klien tunnel. Anda dapat mengonfigurasi parameter ini untuk membedakan TunnelWorker.

Tentukan panggilan balik kustom untuk pemrosesan data

channelProcessor

Panggilan balik yang didaftarkan oleh pengguna untuk memproses data, termasuk metode process dan shutdown.

Konfigurasi thread pool untuk membaca dan memproses data

readRecordsExecutor

Thread pool yang ingin Anda gunakan untuk membaca data. Jika Anda tidak memiliki persyaratan khusus, gunakan konfigurasi default.

processRecordsExecutor

Thread pool yang ingin Anda gunakan untuk memproses data. Jika Anda tidak memiliki persyaratan khusus, gunakan konfigurasi default.

Catatan
  • Saat Anda menentukan konfigurasi untuk thread pool, kami sarankan Anda menyetel jumlah thread ke jumlah saluran dalam tunnel. Dengan cara ini, sumber daya komputasi, seperti CPU, dapat dialokasikan dengan cepat ke setiap saluran.

  • Tablestore melakukan operasi berikut pada konfigurasi default pool untuk memastikan throughput:

    • Alokasikan 32 thread inti sebelumnya untuk memastikan throughput real-time saat jumlah data kecil atau jumlah saluran sedikit.

    • Kurangi panjang antrian saat jumlah data besar harus diproses atau saat jumlah saluran banyak. Dengan cara ini, kebijakan dipicu untuk membuat thread dalam pool dan mengalokasikan lebih banyak sumber daya komputasi.

    • Kami sarankan Anda menyetel waktu keep-alive thread menjadi 60 detik. Jika jumlah data yang ingin Anda proses berkurang, Anda dapat mendaur ulang sumber daya thread.

Kontrol memori

maxChannelParallel

Tingkat konkurensi maksimum saluran untuk membaca dan memproses data untuk kontrol memori.

Nilai default adalah -1, yang menentukan bahwa tingkat konkurensi tidak terbatas.

Catatan

SDK Tablestore untuk Java V5.10.0 dan versi lebih baru mendukung fitur ini.

Waktu backoff maksimum

maxRetryIntervalInMillis

Nilai dasar untuk menghitung waktu backoff maksimum untuk tunnel. Waktu backoff maksimum adalah angka acak yang berkisar antara 0.75 × maxRetryIntervalInMillis hingga 1.25 × maxRetryIntervalInMillis.

Nilai default: 2000. Nilai minimum: 200. Satuan: milidetik.

Catatan
  • SDK Tablestore untuk Java V5.4.0 dan versi lebih baru mendukung fitur ini.

  • Jika jumlah data yang ingin Anda proses kurang dari 900 KB atau 500 potongan per ekspor, klien tunnel menggunakan backoff eksponensial hingga waktu backoff maksimum tercapai.

Pendeteksian saluran CLOSING

enableClosingChannelDetect

Menentukan apakah akan mengaktifkan pendeteksian real-time untuk saluran CLOSING. Nilai default: false, yang menentukan bahwa pendeteksian real-time untuk saluran CLOSING dinonaktifkan.

Catatan
  • SDK Tablestore untuk Java V5.13.13 dan versi lebih baru mendukung fitur ini.

  • Jika Anda tidak mengaktifkan fitur ini, saluran mungkin ditangguhkan dan konsumsi dapat terganggu dalam skenario tertentu, seperti ketika banyak saluran ada tetapi sumber daya klien tidak mencukupi.

  • Saluran CLOSING: saluran yang sedang beralih dari satu klien tunnel ke klien lainnya.

Lampiran: Kode sampel lengkap

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            System.out.println("Prosesor rekaman default, akan mencetak jumlah rekaman");
            System.out.println(
                // Parameter NextToken digunakan oleh klien tunnel untuk paginasi data.
                String.format("Proses %d rekaman, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // Simulasikan konsumsi dan pemrosesan data.
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    public static void main(String[] args) throws Exception {
        //1. Inisialisasi klien tunnel.
        // Tentukan nama instance.
        final String instanceName = "yourInstanceName";
        // Tentukan titik akhir instance.
        final String endPoint = "yourEndpoint";
         // Dapatkan ID AccessKey dan rahasia AccessKey dari variabel lingkungan.
        final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
        final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
        TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
        //2. Buat tunnel. Sebelum Anda melakukan langkah ini, Anda harus membuat tabel untuk pengujian. Anda dapat membuat tabel di konsol Tablestore atau dengan menggunakan metode createTable dari SyncClient.
        final String tableName = "testTable";
        final String tunnelName = "testTunnel";
        CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
        CreateTunnelResponse resp = tunnelClient.createTunnel(request);
        // Gunakan parameter tunnelId untuk menginisialisasi TunnelWorker. Anda dapat memanggil operasi ListTunnel atau DescribeTunnel untuk mendapatkan ID tunnel.
        String tunnelId = resp.getTunnelId();
        System.out.println("Buat Tunnel, Id: " + tunnelId);
        //3. Tentukan panggilan balik konsumsi data kustom untuk memulai konsumsi data otomatis.
        // TunnelWorkerConfig menyediakan parameter tingkat lanjut.
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
        try {
            worker.connectAndWorking();
        } catch (Exception e) {
            e.printStackTrace();
            config.shutdown();
            worker.shutdown();
            tunnelClient.shutdown();
        }
    }
}