このトピックでは、Tablestore SDK を使用してトンネルサービスを開始する方法について説明します。トンネルサービスを使用する前に、使用上の注意と API 操作をよく理解しておいてください。
使用上の注意
デフォルトでは、システムは TunnelWorkerConfig に基づいてデータを読み取り、処理するためのスレッドプールを起動します。単一のサーバーで複数の TunnelWorker を起動する場合は、すべての TunnelWorker を構成するために同じ TunnelWorkerConfig を使用することをお勧めします。
TunnelWorker は初期化のためにウォームアップ期間を必要とします。これは、TunnelWorkerConfig の heartbeatIntervalInSec パラメーターで指定されます。このパラメーターを構成するには、TunnelWorkerConfig の setHeartbeatIntervalInSec メソッドを使用します。デフォルト値:30。単位:秒。
予期しない終了または手動による終了が原因で TunnelWorker クライアントがシャットダウンすると、TunnelWorker は次のいずれかの方法を使用してリソースを自動的にリサイクルします。スレッドプールを解放し、Channel クラスに登録した shutdown メソッドを自動的に呼び出し、トンネルをシャットダウンします。
トンネル内の増分ログの保存期間は、Stream ログの保存期間と同じです。Stream ログは最大 7 日間保存できます。したがって、トンネル内の増分ログは最大 7 日間保存できます。
差分データまたは増分データを消費するためにトンネルを作成する場合は、次の点に注意してください。
完全データ消費中に、トンネルが増分ログの保存期間(最大 7 日間)内に完全データの消費を完了できない場合、トンネルが増分ログの消費を開始するときに
OTSTunnelExpiredエラーが発生します。その結果、トンネルは増分ログを消費できません。指定されたタイムウィンドウ内にトンネルが完全データ消費を完了できないと推定される場合は、Tablestore テクニカルサポートにご連絡ください。
増分データ消費中に、トンネルが増分ログの保存期間(最大 7 日間)内に増分ログの消費を完了できない場合、トンネルは利用可能な最新のデータからデータを消費する可能性があります。この場合、特定のデータが消費されない可能性があります。
トンネルの有効期限が切れると、Tablestore はトンネルを無効にする場合があります。トンネルが無効状態のままで 30 日以上経過すると、トンネルは削除されます。削除されたトンネルは復元できません。
API 操作
操作 | 説明 |
CreateTunnel | トンネルを作成します。 |
ListTunnel | データテーブルに作成されたトンネルに関する情報をクエリします。 |
DescribeTunnel | トンネル内のチャネルに関する情報をクエリします。 |
DeleteTunnel | トンネルを削除します。 |
Tablestore SDK の使用
以下のプログラミング言語の Tablestore SDK を使用して、トンネルサービスを実装できます。
前提条件
RAM ユーザーが作成され、
AliyunOTSFullAccessポリシーが RAM ユーザーにアタッチされて、Tablestore を管理する権限が付与されています。RAM ユーザーの AccessKey ペアが作成されています。詳細については、「RAM ユーザーの AccessKey ペアを使用して Tablestore にアクセスする」をご参照ください。データテーブルが作成されています。詳細については、「データテーブルの操作」をご参照ください。
トンネルサービスの使用
この例では、Tablestore SDK for Java を使用してトンネルサービスを開始します。
トンネルクライアントを初期化します。
説明TABLESTORE_ACCESS_KEY_IDおよびTABLESTORE_ACCESS_KEY_SECRET環境変数が構成されていることを確認してください。 TABLESTORE_ACCESS_KEY_ID 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。 TABLESTORE_ACCESS_KEY_SECRET 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレットを指定します。// Tablestore インスタンスの名前を指定します。 // Tablestore インスタンスのエンドポイントを指定します。例:https://instance.cn-hangzhou.ots.aliyuncs.com。 // Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey シークレットを指定します。 final String instanceName = "yourInstanceName"; final String endPoint = "yourEndpoint"; final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID"); final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET"); TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);トンネルを作成します。
トンネルを作成する前に、テスト用のデータテーブルを作成するか、既存のテーブルを準備します。Tablestore コンソールで、または SyncClient の createTable メソッドを使用してテーブルを作成できます。
重要増分トンネルまたは差分トンネルを作成する場合は、タイムスタンプを指定するために次のルールに従う必要があります。
増分データの開始タイムスタンプを指定しない場合、トンネルが作成された時刻が開始タイムスタンプとして使用されます。
増分データの開始タイムスタンプと終了タイムスタンプを指定する場合、有効な値は
[現在のシステム時刻 - Stream の有効期間 + 5 分、現在のシステム時刻]の範囲内である必要があります。単位:ミリ秒。Stream の有効期間とは、増分ログの有効期間(ミリ秒単位)を指します。Stream の有効期間の最大値は 7 日間です。データテーブルに対して Stream を有効にするときに、Stream の有効期間を指定できます。Stream の有効期間を指定した後、期間を変更することはできません。
終了タイムスタンプは開始タイムスタンプよりも後である必要があります。
// 次のタイプのトンネルがサポートされています:TunnelType.BaseData、TunnelType.Stream、および TunnelType.BaseAndStream。 // 次のサンプルコードは、BaseAndStream トンネルの作成方法の例を示しています。別のタイプのトンネルを作成するには、ビジネス要件に基づいて CreateTunnelRequest の TunnelType パラメーターを構成します。 final String tableName = "testTable"; final String tunnelName = "testTunnel"; CreateTunnelRequest request = new TunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream); CreateTunnelResponse resp = tunnelClient.createTunnel(request); // tunnelId パラメーターを使用して TunnelWorker を初期化します。ListTunnel または DescribeTunnel 操作を呼び出して、トンネル ID を取得できます。 String tunnelId = resp.getTunnelId(); System.out.println("Create Tunnel, Id: " + tunnelId);カスタムデータ消費コールバックを指定して、自動データ消費を開始します。
// データ消費のコールバックを指定して、process メソッドと shutdown メソッドを指定する IChannelProcessor 操作を呼び出します。 private static class SimpleProcessor implements IChannelProcessor { @Override public void process(ProcessRecordsInput input) { // ProcessRecordsInput パラメーターには、取得したデータが含まれています。 System.out.println("Default record processor, would print records count"); System.out.println( // NextToken パラメーターは、トンネルクライアントがデータをページ分割するために使用されます。 String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken())); try { // データの消費と処理をシミュレートします。 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void shutdown() { System.out.println("Mock shutdown"); } } // デフォルトでは、システムは TunnelWorkerConfig に基づいてデータを読み取り、処理するためのスレッドプールを起動します。 // 単一のサーバーで複数の TunnelWorker を起動する場合は、すべての TunnelWorker を構成するために同じ TunnelWorkerConfig を使用することをお勧めします。TunnelWorkerConfig は、より高度なパラメーターを提供します。 TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); // TunnelWorker を構成し、自動データ処理を開始します。 TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); config.shutdown(); worker.shutdown(); tunnelClient.shutdown(); }
TunnelWorkerConfig の構成
TunnelWorkerConfig を使用すると、ビジネス要件に基づいてトンネルクライアントのカスタムパラメーターを構成できます。次の表に、Tablestore SDK for Java のパラメーターを示します。
構成 | パラメーター | 説明 |
ハートビートの間隔とタイムアウト期間 | heartbeatTimeoutInSec | ハートビートのタイムアウト期間。デフォルト値:300。単位:秒。 ハートビートタイムアウトが発生すると、トンネルサーバーは現在の TunnelClient インスタンスが使用不可であると見なします。この場合、トンネルクライアントはトンネルサーバーに再接続する必要があります。 |
heartbeatIntervalInSec | ハートビートの間隔。デフォルト値:30。最小値:5。単位:秒。 ハートビートを検出してアクティブなチャネルを監視し、チャネルのステータスを更新し、データ処理タスクを自動的に初期化できます。 | |
チェックポイント間の間隔 | checkpointIntervalInMillis | データが消費されるときのチェックポイント間の間隔。間隔はトンネルサーバーに記録されます。 デフォルト値:5000。単位:ミリ秒。 説明
|
カスタムクライアントタグ | clientTag | トンネルクライアント ID を生成するために使用されるカスタムクライアントタグ。このパラメーターを構成して、TunnelWorker を区別できます。 |
データ処理のカスタムコールバックの指定 | channelProcessor | process メソッドと shutdown メソッドを含む、データを処理するためにユーザーが登録するコールバック。 |
データを読み取り、処理するためのスレッドプールの構成 | readRecordsExecutor | データの読み取りに使用するスレッドプール。特別な要件がない場合は、デフォルト構成を使用します。 |
processRecordsExecutor | データの処理に使用するスレッドプール。特別な要件がない場合は、デフォルト構成を使用します。 説明
| |
メモリ制御 | maxChannelParallel | メモリ制御のためにデータを読み取り、処理するためのチャネルの最大同時実行レベル。 デフォルト値は -1 で、同時実行レベルが無制限であることを指定します。 説明 Tablestore SDK for Java V5.10.0 以降でこの機能がサポートされています。 |
最大バックオフ時間 | maxRetryIntervalInMillis | トンネルの最大バックオフ時間を計算するためのベース値。最大バックオフ時間は、0.75 × maxRetryIntervalInMillis から 1.25 × maxRetryIntervalInMillis の範囲の乱数です。 デフォルト値:2000。最小値:200。単位:ミリ秒。 説明
|
CLOSING チャネル検出 | enableClosingChannelDetect | CLOSING チャネルのリアルタイム検出を有効にするかどうかを指定します。デフォルト値:false。CLOSING チャネルのリアルタイム検出が無効になっていることを指定します。 説明
|
付録:完全なサンプルコード
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
private static class SimpleProcessor implements IChannelProcessor {
@Override
public void process(ProcessRecordsInput input) {
System.out.println("Default record processor, would print records count");
System.out.println(
// NextToken パラメーターは、トンネルクライアントがデータをページ分割するために使用されます。
String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
try {
// データの消費と処理をシミュレートします。
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void shutdown() {
System.out.println("Mock shutdown");
}
}
public static void main(String[] args) throws Exception {
//1. トンネルクライアントを初期化します。
// インスタンスの名前を指定します。
final String instanceName = "yourInstanceName";
// インスタンスのエンドポイントを指定します。
final String endPoint = "yourEndpoint";
// 環境変数から AccessKey ID と AccessKey シークレットを取得します。
final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
//2. トンネルを作成します。この手順を実行する前に、テスト用のテーブルを作成する必要があります。 Tablestore コンソールで、または SyncClient の createTable メソッドを使用してテーブルを作成できます。
final String tableName = "testTable";
final String tunnelName = "testTunnel";
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse resp = tunnelClient.createTunnel(request);
// tunnelId パラメーターを使用して TunnelWorker を初期化します。 ListTunnel または DescribeTunnel 操作を呼び出して、トンネル ID を取得できます。
String tunnelId = resp.getTunnelId();
System.out.println("Create Tunnel, Id: " + tunnelId);
//3. カスタムデータ消費コールバックを指定して、自動データ消費を開始します。
// TunnelWorkerConfig は、より高度なパラメーターを提供します。
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
try {
worker.connectAndWorking();
} catch (Exception e) {
e.printStackTrace();
config.shutdown();
worker.shutdown();
tunnelClient.shutdown();
}
}
}