Tunnel Service memungkinkan Anda mengonsumsi data dalam tabel. Topik ini menjelaskan cara mulai menggunakan Tunnel Service dengan Tablestore SDK for Java.
Catatan penggunaan
Secara default, sistem memulai thread pool untuk membaca dan memproses data berdasarkan TunnelWorkerConfig. Jika Anda ingin menjalankan beberapa TunnelWorkers pada satu server, disarankan menggunakan TunnelWorkerConfig yang sama untuk mengonfigurasi semua TunnelWorkers.
TunnelWorker membutuhkan periode pemanasan untuk inisialisasi, yang ditentukan oleh parameter heartbeatIntervalInSec di TunnelWorkerConfig. Anda dapat menggunakan metode setHeartbeatIntervalInSec di TunnelWorkerConfig untuk mengonfigurasi parameter ini. Nilai default adalah 30 detik.
Ketika klien TunnelWorker dimatikan karena keluar tak terduga atau terminasi manual, TunnelWorker secara otomatis mendaur ulang sumber daya melalui salah satu dari metode berikut: melepaskan thread pool, memanggil metode shutdown yang telah didaftarkan untuk kelas Channel, atau menutup tunnel.
Periode retensi log inkremental di tunnel sama dengan periode retensi log Stream. Log Stream dapat disimpan hingga tujuh hari, sehingga log inkremental di tunnel juga dapat disimpan hingga tujuh hari.
Jika Anda membuat tunnel untuk mengonsumsi data diferensial atau inkremental, perhatikan hal-hal berikut:
Selama konsumsi data penuh, jika tunnel gagal menyelesaikan konsumsi data penuh dalam periode retensi log inkremental (maksimal tujuh hari), kesalahan
OTSTunnelExpiredakan terjadi ketika tunnel mulai mengonsumsi log inkremental. Akibatnya, tunnel tidak dapat mengonsumsi log inkremental.Jika Anda memperkirakan bahwa tunnel tidak dapat menyelesaikan konsumsi data penuh dalam jangka waktu tertentu, hubungi dukungan teknis Tablestore.
Selama konsumsi data inkremental, jika tunnel gagal menyelesaikan konsumsi log inkremental dalam periode retensi log inkremental (maksimal tujuh hari), tunnel mungkin mengonsumsi data dari data terbaru yang tersedia. Dalam kasus ini, data tertentu mungkin tidak dikonsumsi.
Setelah tunnel kedaluwarsa, Tablestore dapat menonaktifkan tunnel. Jika tunnel tetap dinonaktifkan selama lebih dari 30 hari, tunnel tersebut akan dihapus. Anda tidak dapat memulihkan tunnel yang telah dihapus.
Prasyarat
Operasi berikut dilakukan di Konsol Resource Access Management (RAM):
Pengguna RAM dibuat dan kebijakan
AliyunOTSFullAccessdilekatkan pada pengguna RAM untuk memberikan izin kepada pengguna RAM mengelola Tablestore. Untuk informasi lebih lanjut, lihat Buat pengguna RAM dan Berikan izin kepada pengguna RAM.CatatanDalam lingkungan bisnis nyata, disarankan hanya memberikan izin yang diperlukan kepada pengguna RAM berdasarkan prinsip hak istimewa minimal. Ini membantu mencegah risiko keamanan yang disebabkan oleh izin pengguna yang berlebihan.
Pair AccessKey dibuat untuk pengguna RAM. Untuk informasi lebih lanjut, lihat Buat pair AccessKey.
PeringatanJika pair AccessKey akun Alibaba Cloud Anda bocor, semua sumber daya dalam akun tersebut terpapar pada risiko potensial. Disarankan menggunakan pair AccessKey pengguna RAM untuk melakukan operasi guna mencegah kebocoran pair AccessKey akun Alibaba Cloud Anda.
Operasi berikut dilakukan di Konsol Tablestore:
Tabel data dibuat. Untuk informasi lebih lanjut, lihat Gunakan Konsol Tablestore, Gunakan CLI Tablestore, dan Gunakan SDK Tablestore.
Titik akhir instance tempat tabel data berada diperoleh. Untuk informasi lebih lanjut, lihat Inisialisasi klien Tablestore.
Kredensial akses dikonfigurasi. Untuk informasi lebih lanjut, lihat Konfigurasikan kredensial akses.
Mulai menggunakan Tunnel Service
Gunakan Tablestore SDK for Java untuk mulai menggunakan Tunnel Service.
Inisialisasi klien tunnel.
CatatanPastikan variabel lingkungan
TABLESTORE_ACCESS_KEY_IDdanTABLESTORE_ACCESS_KEY_SECRETdikonfigurasi. Variabel lingkungan TABLESTORE_ACCESS_KEY_ID menentukan ID AccessKey akun Alibaba Cloud atau pengguna RAM Anda. Variabel lingkungan TABLESTORE_ACCESS_KEY_SECRET menentukan Rahasia AccessKey akun Alibaba Cloud atau pengguna RAM Anda.// Tentukan nama instance Tablestore. // Tentukan titik akhir instance Tablestore. Contoh: https://instance.cn-hangzhou.ots.aliyuncs.com. // Tentukan ID AccessKey dan Rahasia AccessKey akun Alibaba Cloud atau pengguna RAM Anda. final String instanceName = "yourInstanceName"; final String endPoint = "yourEndpoint"; final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID"); final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET"); TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);Buat tunnel.
Sebelum membuat tunnel, buat tabel data untuk pengujian atau siapkan tabel yang ada. Anda dapat membuat tabel di Konsol Tablestore atau menggunakan metode createTable dari SyncClient.
PentingKetika membuat tunnel Stream atau BaseAndStream, patuhi aturan berikut untuk menentukan timestamp:
Jika Anda tidak menentukan timestamp awal untuk data inkremental, timestamp awal adalah waktu pembuatan tunnel.
Jika Anda menentukan timestamp awal dan timestamp akhir untuk data inkremental, timestamp awal atau timestamp akhir harus berada dalam rentang
[Waktu sistem saat ini - Periode validitas Stream + 5 menit, Waktu sistem saat ini]dalam milidetik.Periode validitas Stream mengacu pada periode validitas log inkremental dalam milidetik. Periode validitas Stream maksimum adalah tujuh hari. Anda dapat menentukan periode validitas Stream ketika mengaktifkan Stream untuk tabel data. Setelah menentukan periode validitas Stream, Anda tidak dapat memodifikasi periode tersebut.
Timestamp akhir harus lebih besar dari timestamp awal.
// Anda dapat membuat tiga jenis tunnel: TunnelType.BaseData, TunnelType.Stream, dan TunnelType.BaseAndStream. // Kode berikut memberikan contoh cara membuat tunnel BaseAndStream. Untuk membuat tunnel jenis lain, atur TunnelType di CreateTunnelRequest ke jenis yang diinginkan. final String tableName = "testTable"; final String tunnelName = "testTunnel"; CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream); CreateTunnelResponse resp = tunnelClient.createTunnel(request); // Gunakan tunnelId untuk menginisialisasi TunnelWorker. Panggil operasi ListTunnel atau DescribeTunnel untuk mendapatkan ID tunnel. String tunnelId = resp.getTunnelId(); System.out.println("Buat Tunnel, Id: " + tunnelId);Tentukan callback konsumsi data kustom untuk memulai konsumsi data otomatis. Tabel berikut menjelaskan konfigurasi TunnelClient.
// Tentukan callback untuk konsumsi data untuk memanggil operasi IChannelProcessor, yang menentukan metode process dan shutdown. private static class SimpleProcessor implements IChannelProcessor { @Override public void process(ProcessRecordsInput input) { // Parameter ProcessRecordsInput berisi data yang Anda peroleh. System.out.println("Prosesor rekaman default, akan mencetak jumlah rekaman"); System.out.println( // Parameter NextToken digunakan oleh klien tunnel untuk membagi halaman data. String.format("Proses %d rekaman, NextToken: %s", input.getRecords().size(), input.getNextToken())); try { // Simulasikan konsumsi dan pemrosesan data. Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void shutdown() { System.out.println("Mock shutdown"); } } // Secara default, sistem memulai thread pool untuk membaca dan memproses data berdasarkan TunnelWorkerConfig. Jika Anda menggunakan satu server, beberapa TunnelWorkers dimulai. // Kami sarankan Anda mengonfigurasi TunnelWorkers menggunakan TunnelWorkerConfig yang sama. TunnelWorkerConfig menyediakan parameter tingkat lanjut. TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); // Konfigurasikan TunnelWorkers dan mulai pemrosesan data otomatis. TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); config.shutdown(); worker.shutdown(); tunnelClient.shutdown(); }
Konfigurasikan TunnelWorkerConfig
TunnelWorkerConfig memungkinkan Anda mengonfigurasi parameter kustom untuk klien tunnel sesuai kebutuhan bisnis Anda. Tabel berikut menjelaskan parameter di Tablestore SDK for Java.
Konfigurasi | Parameter | Deskripsi |
Interval dan periode timeout untuk heartbeat | heartbeatTimeoutInSec | Periode timeout untuk heartbeat. Nilai default: 300. Satuan: detik. Ketika terjadi timeout heartbeat, server tunnel menganggap instance TunnelClient saat ini tidak tersedia. Dalam kasus ini, klien tunnel harus menyambungkan kembali ke server tunnel. |
heartbeatIntervalInSec | Interval untuk heartbeat. Nilai default: 30. Nilai minimum: 5. Satuan: detik. Anda dapat mendeteksi heartbeat untuk memantau saluran aktif, memperbarui status saluran, dan secara otomatis menginisialisasi tugas pemrosesan data. | |
Interval antara checkpoint | checkpointIntervalInMillis | Interval antara checkpoint ketika data dikonsumsi. Interval dicatat di server tunnel. Nilai default: 5000. Satuan: milidetik. Catatan
|
Tag klien kustom | clientTag | Tag klien kustom yang digunakan untuk menghasilkan ID klien tunnel. Anda dapat mengonfigurasi parameter ini untuk membedakan TunnelWorkers. |
Tentukan callback kustom untuk pemrosesan data | channelProcessor | Callback yang didaftarkan oleh pengguna untuk memproses data, termasuk metode process dan shutdown. |
Konfigurasi thread pool untuk membaca dan memproses data | readRecordsExecutor | Thread pool yang ingin Anda gunakan untuk membaca data. Jika Anda tidak memiliki persyaratan khusus, gunakan konfigurasi default. |
processRecordsExecutor | Thread pool yang ingin Anda gunakan untuk memproses data. Jika Anda tidak memiliki persyaratan khusus, gunakan konfigurasi default. Catatan
| |
Kontrol memori | maxChannelParallel | Tingkat konkurensi maksimum saluran untuk membaca dan memproses data untuk kontrol memori. Nilai default adalah -1, yang menentukan bahwa tingkat konkurensi tidak terbatas. Catatan Tablestore SDK for Java V5.10.0 dan versi lebih baru mendukung fitur ini. |
Waktu backoff maksimum | maxRetryIntervalInMillis | Nilai dasar untuk menghitung waktu backoff maksimum untuk tunnel. Waktu backoff maksimum adalah angka acak yang berkisar antara 0.75 × maxRetryIntervalInMillis hingga 1.25 × maxRetryIntervalInMillis. Nilai default: 2000. Nilai minimum: 200. Satuan: milidetik. Catatan
|
Pendeteksian saluran CLOSING | enableClosingChannelDetect | Menentukan apakah akan mengaktifkan pendeteksian real-time untuk saluran CLOSING. Nilai default: false, yang menentukan bahwa pendeteksian real-time untuk saluran CLOSING dinonaktifkan. Catatan
|
Lampiran: Contoh kode
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
private static class SimpleProcessor implements IChannelProcessor {
@Override
public void process(ProcessRecordsInput input) {
System.out.println("Prosesor rekaman default, akan mencetak jumlah rekaman");
System.out.println(
// Parameter NextToken digunakan oleh klien Tunnel untuk membagi halaman data.
String.format("Proses %d rekaman, NextToken: %s", input.getRecords().size(), input.getNextToken()));
try {
// Simulasikan konsumsi dan pemrosesan data.
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void shutdown() {
System.out.println("Mock shutdown");
}
}
public static void main(String[] args) throws Exception {
//1. Inisialisasi klien tunnel.
// Tentukan nama instance.
final String instanceName = "yourInstanceName";
// Tentukan titik akhir instance.
final String endPoint = "yourEndpoint";
// Dapatkan ID AccessKey dan Rahasia AccessKey dari variabel lingkungan.
final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
//2. Buat tunnel. Sebelum Anda melakukan langkah ini, Anda harus membuat tabel untuk pengujian. Anda dapat membuat tabel di Konsol Tablestore atau menggunakan metode createTable dari SyncClient.
final String tableName = "testTable";
final String tunnelName = "testTunnel";
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse resp = tunnelClient.createTunnel(request);
// Gunakan parameter tunnelId untuk menginisialisasi TunnelWorker. Anda dapat memanggil operasi ListTunnel atau DescribeTunnel untuk mendapatkan ID tunnel.
String tunnelId = resp.getTunnelId();
System.out.println("Buat Tunnel, Id: " + tunnelId);
//3. Tentukan fungsi callback kustom untuk memulai konsumsi data otomatis.
// TunnelWorkerConfig menyediakan parameter tingkat lanjut.
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
try {
worker.connectAndWorking();
} catch (Exception e) {
e.printStackTrace();
config.shutdown();
worker.shutdown();
tunnelClient.shutdown();
}
}
}