Topik ini menjelaskan cara memulai menggunakan Layanan Tunnel dengan SDK Tablestore. Sebelum menggunakan Layanan Tunnel, pastikan Anda memahami catatan penggunaan dan operasi API.
Catatan Penggunaan
Secara default, sistem memulai thread pool untuk membaca dan memproses data berdasarkan TunnelWorkerConfig. Jika Anda ingin menjalankan beberapa TunnelWorker pada satu server, disarankan untuk menggunakan TunnelWorkerConfig yang sama untuk mengonfigurasi semua TunnelWorker.
TunnelWorker memerlukan periode pemanasan untuk inisialisasi, yang ditentukan oleh parameter heartbeatIntervalInSec dalam TunnelWorkerConfig. Anda dapat menggunakan metode setHeartbeatIntervalInSec dalam TunnelWorkerConfig untuk mengonfigurasi parameter ini. Nilai default adalah 30 detik.
Jika klien TunnelWorker dimatikan karena keluar tak terduga atau terminasi manual, TunnelWorker secara otomatis mendaur ulang sumber daya dengan salah satu dari metode berikut: melepaskan thread pool, memanggil metode shutdown yang didaftarkan untuk kelas Channel, atau menutup tunnel.
Masa retensi log inkremen dalam tunnel sama dengan masa retensi log Stream. Log Stream dapat disimpan hingga tujuh hari, sehingga log inkremen dalam tunnel juga dapat disimpan hingga tujuh hari.
Jika Anda membuat tunnel untuk mengonsumsi data diferensial atau inkremen, perhatikan hal-hal berikut:
Selama konsumsi data penuh, jika tunnel gagal menyelesaikan konsumsi data penuh dalam periode retensi log inkremen (maksimal tujuh hari), kesalahan
OTSTunnelExpiredakan muncul ketika tunnel mulai mengonsumsi log inkremen. Akibatnya, tunnel tidak dapat mengonsumsi log inkremen.Jika Anda memperkirakan bahwa tunnel tidak dapat menyelesaikan konsumsi data penuh dalam jendela waktu yang ditentukan, hubungi dukungan teknis Tablestore.
Selama konsumsi data inkremen, jika tunnel gagal menyelesaikan konsumsi log inkremen dalam periode retensi log inkremen (maksimal tujuh hari), tunnel mungkin mengonsumsi data dari data terbaru yang tersedia. Dalam hal ini, data tertentu mungkin tidak dikonsumsi.
Setelah tunnel kedaluwarsa, Tablestore mungkin menonaktifkan tunnel. Jika tunnel tetap dinonaktifkan selama lebih dari 30 hari, tunnel akan dihapus dan tidak dapat dipulihkan.
Operasi API
Operasi | Deskripsi |
CreateTunnel | Membuat tunnel. |
ListTunnel | Mengquery informasi tentang tunnel yang dibuat untuk tabel data. |
DescribeTunnel | Mengquery informasi tentang saluran dalam tunnel. |
DeleteTunnel | Menghapus tunnel. |
Gunakan SDK Tablestore
Anda dapat menggunakan SDK Tablestore untuk bahasa pemrograman berikut untuk mengimplementasikan Layanan Tunnel:
Prasyarat
Pengguna RAM dibuat dan kebijakan
AliyunOTSFullAccessdilampirkan ke pengguna RAM untuk memberikan izin kepada pengguna RAM mengelola Tablestore. Pasangan AccessKey dibuat untuk pengguna RAM. Untuk informasi lebih lanjut, lihat Gunakan pasangan AccessKey pengguna RAM untuk mengakses Tablestore.Tabel data dibuat. Untuk informasi lebih lanjut, lihat Operasi pada tabel data.
Gunakan Layanan Tunnel
Dalam contoh ini, SDK Tablestore untuk Java digunakan untuk memulai dengan Layanan Tunnel.
Inisialisasi klien tunnel.
CatatanPastikan variabel lingkungan
TABLESTORE_ACCESS_KEY_IDdanTABLESTORE_ACCESS_KEY_SECRETtelah dikonfigurasi. Variabel lingkungan TABLESTORE_ACCESS_KEY_ID menentukan ID AccessKey akun Alibaba Cloud atau pengguna RAM Anda. Variabel lingkungan TABLESTORE_ACCESS_KEY_SECRET menentukan rahasia AccessKey akun Alibaba Cloud atau pengguna RAM Anda.// Tentukan nama instance Tablestore. // Tentukan titik akhir instance Tablestore. Contoh: https://instance.cn-hangzhou.ots.aliyuncs.com. // Tentukan ID AccessKey dan rahasia AccessKey akun Alibaba Cloud atau pengguna RAM Anda. final String instanceName = "yourInstanceName"; final String endPoint = "yourEndpoint"; final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID"); final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET"); TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);Buat tunnel.
Sebelum membuat tunnel, buat tabel data untuk pengujian atau siapkan tabel yang sudah ada. Anda dapat membuat tabel di konsol Tablestore atau dengan menggunakan metode createTable dari SyncClient.
PentingKetika membuat tunnel inkremen atau diferensial, patuhi aturan berikut untuk menentukan timestamp:
Jika Anda tidak menentukan timestamp awal untuk data inkremen, waktu saat tunnel dibuat digunakan sebagai timestamp awal.
Jika Anda menentukan timestamp awal dan akhir untuk data inkremen, nilai valid harus berada dalam rentang
[Waktu sistem saat ini - Periode validitas Stream + 5 menit, Waktu sistem saat ini]. Satuan: milidetik.Periode validitas Stream merujuk pada periode validitas log inkremen dalam milidetik. Periode validitas Stream maksimum adalah tujuh hari. Anda dapat menentukan periode validitas Stream saat mengaktifkan Stream untuk tabel data. Setelah menentukan periode validitas Stream, Anda tidak dapat memodifikasi periode tersebut.
Timestamp akhir harus lebih lambat dari timestamp awal.
// Jenis tunnel berikut didukung: TunnelType.BaseData, TunnelType.Stream, dan TunnelType.BaseAndStream. // Kode sampel berikut memberikan contoh tentang cara membuat tunnel BaseAndStream. Untuk membuat jenis tunnel lainnya, konfigurasikan parameter TunnelType dalam CreateTunnelRequest sesuai dengan kebutuhan bisnis Anda. final String tableName = "testTable"; final String tunnelName = "testTunnel"; CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream); CreateTunnelResponse resp = tunnelClient.createTunnel(request); // Gunakan parameter tunnelId untuk menginisialisasi TunnelWorker. Anda dapat memanggil operasi ListTunnel atau DescribeTunnel untuk mendapatkan ID tunnel. String tunnelId = resp.getTunnelId(); System.out.println("Buat Tunnel, Id: " + tunnelId);Tentukan panggilan balik konsumsi data kustom untuk memulai konsumsi data otomatis.
// Tentukan panggilan balik untuk konsumsi data untuk memanggil operasi IChannelProcessor, yang menentukan metode process dan shutdown. private static class SimpleProcessor implements IChannelProcessor { @Override public void process(ProcessRecordsInput input) { // Parameter ProcessRecordsInput berisi data yang Anda peroleh. System.out.println("Prosesor rekaman default, akan mencetak jumlah rekaman"); System.out.println( // Parameter NextToken digunakan oleh klien tunnel untuk paginasi data. String.format("Proses %d rekaman, NextToken: %s", input.getRecords().size(), input.getNextToken())); try { // Simulasikan konsumsi dan pemrosesan data. Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void shutdown() { System.out.println("Mock shutdown"); } } // Secara default, sistem memulai thread pool untuk membaca dan memproses data berdasarkan TunnelWorkerConfig. // Jika Anda ingin memulai beberapa TunnelWorker pada satu server, kami sarankan Anda menggunakan TunnelWorkerConfig yang sama untuk mengonfigurasi semua TunnelWorker. TunnelWorkerConfig menyediakan parameter tingkat lanjut. TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); // Konfigurasikan TunnelWorker dan mulai pemrosesan data otomatis. TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); config.shutdown(); worker.shutdown(); tunnelClient.shutdown(); }
Konfigurasikan TunnelWorkerConfig
TunnelWorkerConfig memungkinkan Anda mengonfigurasi parameter kustom untuk klien tunnel sesuai kebutuhan bisnis Anda. Tabel berikut menjelaskan parameter dalam SDK Tablestore untuk Java.
Konfigurasi | Parameter | Deskripsi |
Interval dan periode timeout untuk heartbeat | heartbeatTimeoutInSec | Periode timeout untuk heartbeat. Nilai default: 300. Satuan: detik. Ketika terjadi timeout heartbeat, server tunnel menganggap instance TunnelClient saat ini tidak tersedia. Dalam hal ini, klien tunnel harus menyambungkan ulang ke server tunnel. |
heartbeatIntervalInSec | Interval untuk heartbeat. Nilai default: 30. Nilai minimum: 5. Satuan: detik. Anda dapat mendeteksi heartbeat untuk memantau saluran aktif, memperbarui status saluran, dan secara otomatis menginisialisasi tugas pemrosesan data. | |
Interval antar checkpoint | checkpointIntervalInMillis | Interval antar checkpoint saat data dikonsumsi. Interval dicatat di server tunnel. Nilai default: 5000. Satuan: milidetik. Catatan
|
Tag klien kustom | clientTag | Tag klien kustom yang digunakan untuk menghasilkan ID klien tunnel. Anda dapat mengonfigurasi parameter ini untuk membedakan TunnelWorker. |
Tentukan panggilan balik kustom untuk pemrosesan data | channelProcessor | Panggilan balik yang didaftarkan oleh pengguna untuk memproses data, termasuk metode process dan shutdown. |
Konfigurasi thread pool untuk membaca dan memproses data | readRecordsExecutor | Thread pool yang ingin Anda gunakan untuk membaca data. Jika Anda tidak memiliki persyaratan khusus, gunakan konfigurasi default. |
processRecordsExecutor | Thread pool yang ingin Anda gunakan untuk memproses data. Jika Anda tidak memiliki persyaratan khusus, gunakan konfigurasi default. Catatan
| |
Kontrol memori | maxChannelParallel | Tingkat konkurensi maksimum saluran untuk membaca dan memproses data untuk kontrol memori. Nilai default adalah -1, yang menentukan bahwa tingkat konkurensi tidak terbatas. Catatan SDK Tablestore untuk Java V5.10.0 dan versi lebih baru mendukung fitur ini. |
Waktu backoff maksimum | maxRetryIntervalInMillis | Nilai dasar untuk menghitung waktu backoff maksimum untuk tunnel. Waktu backoff maksimum adalah angka acak yang berkisar antara 0.75 × maxRetryIntervalInMillis hingga 1.25 × maxRetryIntervalInMillis. Nilai default: 2000. Nilai minimum: 200. Satuan: milidetik. Catatan
|
Pendeteksian saluran CLOSING | enableClosingChannelDetect | Menentukan apakah akan mengaktifkan pendeteksian real-time untuk saluran CLOSING. Nilai default: false, yang menentukan bahwa pendeteksian real-time untuk saluran CLOSING dinonaktifkan. Catatan
|
Lampiran: Kode sampel lengkap
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
private static class SimpleProcessor implements IChannelProcessor {
@Override
public void process(ProcessRecordsInput input) {
System.out.println("Prosesor rekaman default, akan mencetak jumlah rekaman");
System.out.println(
// Parameter NextToken digunakan oleh klien tunnel untuk paginasi data.
String.format("Proses %d rekaman, NextToken: %s", input.getRecords().size(), input.getNextToken()));
try {
// Simulasikan konsumsi dan pemrosesan data.
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void shutdown() {
System.out.println("Mock shutdown");
}
}
public static void main(String[] args) throws Exception {
//1. Inisialisasi klien tunnel.
// Tentukan nama instance.
final String instanceName = "yourInstanceName";
// Tentukan titik akhir instance.
final String endPoint = "yourEndpoint";
// Dapatkan ID AccessKey dan rahasia AccessKey dari variabel lingkungan.
final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
//2. Buat tunnel. Sebelum Anda melakukan langkah ini, Anda harus membuat tabel untuk pengujian. Anda dapat membuat tabel di konsol Tablestore atau dengan menggunakan metode createTable dari SyncClient.
final String tableName = "testTable";
final String tunnelName = "testTunnel";
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse resp = tunnelClient.createTunnel(request);
// Gunakan parameter tunnelId untuk menginisialisasi TunnelWorker. Anda dapat memanggil operasi ListTunnel atau DescribeTunnel untuk mendapatkan ID tunnel.
String tunnelId = resp.getTunnelId();
System.out.println("Buat Tunnel, Id: " + tunnelId);
//3. Tentukan panggilan balik konsumsi data kustom untuk memulai konsumsi data otomatis.
// TunnelWorkerConfig menyediakan parameter tingkat lanjut.
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
try {
worker.connectAndWorking();
} catch (Exception e) {
e.printStackTrace();
config.shutdown();
worker.shutdown();
tunnelClient.shutdown();
}
}
}