すべてのプロダクト
Search
ドキュメントセンター

Tablestore:Tablestore SDK を使用したトンネルサービスの利用

最終更新日:Mar 18, 2025

このトピックでは、Tablestore SDK を使用してトンネルサービスを開始する方法について説明します。トンネルサービスを使用する前に、使用上の注意と API 操作をよく理解しておいてください。

使用上の注意

  • デフォルトでは、システムは TunnelWorkerConfig に基づいてデータを読み取り、処理するためのスレッドプールを起動します。単一のサーバーで複数の TunnelWorker を起動する場合は、すべての TunnelWorker を構成するために同じ TunnelWorkerConfig を使用することをお勧めします。

  • TunnelWorker は初期化のためにウォームアップ期間を必要とします。これは、TunnelWorkerConfig の heartbeatIntervalInSec パラメーターで指定されます。このパラメーターを構成するには、TunnelWorkerConfig の setHeartbeatIntervalInSec メソッドを使用します。デフォルト値:30。単位:秒。

  • 予期しない終了または手動による終了が原因で TunnelWorker クライアントがシャットダウンすると、TunnelWorker は次のいずれかの方法を使用してリソースを自動的にリサイクルします。スレッドプールを解放し、Channel クラスに登録した shutdown メソッドを自動的に呼び出し、トンネルをシャットダウンします。

  • トンネル内の増分ログの保存期間は、Stream ログの保存期間と同じです。Stream ログは最大 7 日間保存できます。したがって、トンネル内の増分ログは最大 7 日間保存できます。

  • 差分データまたは増分データを消費するためにトンネルを作成する場合は、次の点に注意してください。

    • 完全データ消費中に、トンネルが増分ログの保存期間(最大 7 日間)内に完全データの消費を完了できない場合、トンネルが増分ログの消費を開始するときに OTSTunnelExpired エラーが発生します。その結果、トンネルは増分ログを消費できません。

      指定されたタイムウィンドウ内にトンネルが完全データ消費を完了できないと推定される場合は、Tablestore テクニカルサポートにご連絡ください。

    • 増分データ消費中に、トンネルが増分ログの保存期間(最大 7 日間)内に増分ログの消費を完了できない場合、トンネルは利用可能な最新のデータからデータを消費する可能性があります。この場合、特定のデータが消費されない可能性があります。

  • トンネルの有効期限が切れると、Tablestore はトンネルを無効にする場合があります。トンネルが無効状態のままで 30 日以上経過すると、トンネルは削除されます。削除されたトンネルは復元できません。

API 操作

操作

説明

CreateTunnel

トンネルを作成します。

ListTunnel

データテーブルに作成されたトンネルに関する情報をクエリします。

DescribeTunnel

トンネル内のチャネルに関する情報をクエリします。

DeleteTunnel

トンネルを削除します。

Tablestore SDK の使用

以下のプログラミング言語の Tablestore SDK を使用して、トンネルサービスを実装できます。

前提条件

トンネルサービスの使用

この例では、Tablestore SDK for Java を使用してトンネルサービスを開始します。

  1. トンネルクライアントを初期化します。

    説明

    TABLESTORE_ACCESS_KEY_ID および TABLESTORE_ACCESS_KEY_SECRET 環境変数が構成されていることを確認してください。 TABLESTORE_ACCESS_KEY_ID 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。 TABLESTORE_ACCESS_KEY_SECRET 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレットを指定します。

    // Tablestore インスタンスの名前を指定します。
    // Tablestore インスタンスのエンドポイントを指定します。例:https://instance.cn-hangzhou.ots.aliyuncs.com。
    // Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey シークレットを指定します。
    final String instanceName = "yourInstanceName";
    final String endPoint = "yourEndpoint";
    final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
  2. トンネルを作成します。

    トンネルを作成する前に、テスト用のデータテーブルを作成するか、既存のテーブルを準備します。Tablestore コンソールで、または SyncClient の createTable メソッドを使用してテーブルを作成できます。

    重要

    増分トンネルまたは差分トンネルを作成する場合は、タイムスタンプを指定するために次のルールに従う必要があります。

    • 増分データの開始タイムスタンプを指定しない場合、トンネルが作成された時刻が開始タイムスタンプとして使用されます。

    • 増分データの開始タイムスタンプと終了タイムスタンプを指定する場合、有効な値は [現在のシステム時刻 - Stream の有効期間 + 5 分、現在のシステム時刻] の範囲内である必要があります。単位:ミリ秒。

      • Stream の有効期間とは、増分ログの有効期間(ミリ秒単位)を指します。Stream の有効期間の最大値は 7 日間です。データテーブルに対して Stream を有効にするときに、Stream の有効期間を指定できます。Stream の有効期間を指定した後、期間を変更することはできません。

      • 終了タイムスタンプは開始タイムスタンプよりも後である必要があります。

    // 次のタイプのトンネルがサポートされています:TunnelType.BaseData、TunnelType.Stream、および TunnelType.BaseAndStream。
    // 次のサンプルコードは、BaseAndStream トンネルの作成方法の例を示しています。別のタイプのトンネルを作成するには、ビジネス要件に基づいて CreateTunnelRequest の TunnelType パラメーターを構成します。
    final String tableName = "testTable";
    final String tunnelName = "testTunnel";
    CreateTunnelRequest request = new TunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = tunnelClient.createTunnel(request);
    // tunnelId パラメーターを使用して TunnelWorker を初期化します。ListTunnel または DescribeTunnel 操作を呼び出して、トンネル ID を取得できます。
    String tunnelId = resp.getTunnelId();
    System.out.println("Create Tunnel, Id: " + tunnelId);
  3. カスタムデータ消費コールバックを指定して、自動データ消費を開始します。

    // データ消費のコールバックを指定して、process メソッドと shutdown メソッドを指定する IChannelProcessor 操作を呼び出します。
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            // ProcessRecordsInput パラメーターには、取得したデータが含まれています。
            System.out.println("Default record processor, would print records count");
            System.out.println(
                // NextToken パラメーターは、トンネルクライアントがデータをページ分割するために使用されます。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // データの消費と処理をシミュレートします。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    
    // デフォルトでは、システムは TunnelWorkerConfig に基づいてデータを読み取り、処理するためのスレッドプールを起動します。
    // 単一のサーバーで複数の TunnelWorker を起動する場合は、すべての TunnelWorker を構成するために同じ TunnelWorkerConfig を使用することをお勧めします。TunnelWorkerConfig は、より高度なパラメーターを提供します。
    TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    // TunnelWorker を構成し、自動データ処理を開始します。
    TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
    try {
        worker.connectAndWorking();
    } catch (Exception e) {
        e.printStackTrace();
        config.shutdown();
        worker.shutdown();
        tunnelClient.shutdown();
    }

TunnelWorkerConfig の構成

TunnelWorkerConfig を使用すると、ビジネス要件に基づいてトンネルクライアントのカスタムパラメーターを構成できます。次の表に、Tablestore SDK for Java のパラメーターを示します。

構成

パラメーター

説明

ハートビートの間隔とタイムアウト期間

heartbeatTimeoutInSec

ハートビートのタイムアウト期間。デフォルト値:300。単位:秒。

ハートビートタイムアウトが発生すると、トンネルサーバーは現在の TunnelClient インスタンスが使用不可であると見なします。この場合、トンネルクライアントはトンネルサーバーに再接続する必要があります。

heartbeatIntervalInSec

ハートビートの間隔。デフォルト値:30。最小値:5。単位:秒。

ハートビートを検出してアクティブなチャネルを監視し、チャネルのステータスを更新し、データ処理タスクを自動的に初期化できます。

チェックポイント間の間隔

checkpointIntervalInMillis

データが消費されるときのチェックポイント間の間隔。間隔はトンネルサーバーに記録されます。

デフォルト値:5000。単位:ミリ秒。

説明
  • 読み取るデータが異なるサーバーに保存されている場合、プロセスを実行するときにさまざまなエラーが発生する可能性があります。たとえば、環境要因によりサーバーが再起動する場合があります。したがって、トンネルサーバーはデータが処理された後に定期的にチェックポイントを記録します。タスクは再起動後、最後のチェックポイントからデータを処理します。特定の場合、トンネルサービスはデータを 1 回または複数回順次同期する場合があります。特定のデータが再処理される場合は、ビジネス処理ロジックを確認してください。

  • エラー発生時にデータが再処理されないようにするには、追加のチェックポイントを記録します。チェックポイントが多すぎると、システムスループットが低下する可能性があることに注意してください。ビジネス要件に基づいてチェックポイントを記録することをお勧めします。

カスタムクライアントタグ

clientTag

トンネルクライアント ID を生成するために使用されるカスタムクライアントタグ。このパラメーターを構成して、TunnelWorker を区別できます。

データ処理のカスタムコールバックの指定

channelProcessor

process メソッドと shutdown メソッドを含む、データを処理するためにユーザーが登録するコールバック。

データを読み取り、処理するためのスレッドプールの構成

readRecordsExecutor

データの読み取りに使用するスレッドプール。特別な要件がない場合は、デフォルト構成を使用します。

processRecordsExecutor

データの処理に使用するスレッドプール。特別な要件がない場合は、デフォルト構成を使用します。

説明
  • スレッドプールの構成を指定する場合は、スレッド数をトンネル内のチャネル数に設定することをお勧めします。こうすることで、CPU などの計算リソースを各チャネルにすばやく割り当てることができます。

  • Tablestore は、スループットを確保するために、プールのデフォルト構成で次の操作を実行します。

    • 少量のデータまたは少数のチャネルが存在する場合にリアルタイムスループットを確保するために、事前に 32 個のコアスレッドを割り当てます。

    • 大量のデータを処理する必要がある場合、または多数のチャネルが存在する場合に、キューの長さを短縮します。こうすることで、プールにスレッドを作成し、より多くの計算リソースを割り当てるポリシーがトリガーされます。

    • スレッドのキープアライブ時間を 60 秒に設定することをお勧めします。処理するデータ量が削減された場合は、スレッドリソースをリサイクルできます。

メモリ制御

maxChannelParallel

メモリ制御のためにデータを読み取り、処理するためのチャネルの最大同時実行レベル。

デフォルト値は -1 で、同時実行レベルが無制限であることを指定します。

説明

Tablestore SDK for Java V5.10.0 以降でこの機能がサポートされています。

最大バックオフ時間

maxRetryIntervalInMillis

トンネルの最大バックオフ時間を計算するためのベース値。最大バックオフ時間は、0.75 × maxRetryIntervalInMillis から 1.25 × maxRetryIntervalInMillis の範囲の乱数です。

デフォルト値:2000。最小値:200。単位:ミリ秒。

説明
  • Tablestore SDK for Java V5.4.0 以降でこの機能がサポートされています。

  • 処理するデータ量がエクスポートあたり 900 KB または 500 個未満の場合、トンネルクライアントは最大バックオフ時間に達するまで指数バックオフを使用します。

CLOSING チャネル検出

enableClosingChannelDetect

CLOSING チャネルのリアルタイム検出を有効にするかどうかを指定します。デフォルト値:false。CLOSING チャネルのリアルタイム検出が無効になっていることを指定します。

説明
  • Tablestore SDK for Java V5.13.13 以降でこの機能がサポートされています。

  • この機能を有効にしないと、多数のチャネルが存在するがクライアントリソースが不足している場合など、特定のシナリオでチャネルが一時停止し、消費が中断される可能性があります。

  • CLOSING チャネル:あるトンネルクライアントから別のトンネルクライアントに切り替えられているチャネル。

付録:完全なサンプルコード

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            System.out.println("Default record processor, would print records count");
            System.out.println(
                // NextToken パラメーターは、トンネルクライアントがデータをページ分割するために使用されます。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // データの消費と処理をシミュレートします。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    public static void main(String[] args) throws Exception {
        //1. トンネルクライアントを初期化します。
        // インスタンスの名前を指定します。
        final String instanceName = "yourInstanceName";
        // インスタンスのエンドポイントを指定します。
        final String endPoint = "yourEndpoint";
        // 環境変数から AccessKey ID と AccessKey シークレットを取得します。
        final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
        final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
        TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
        //2. トンネルを作成します。この手順を実行する前に、テスト用のテーブルを作成する必要があります。 Tablestore コンソールで、または SyncClient の createTable メソッドを使用してテーブルを作成できます。
        final String tableName = "testTable";
        final String tunnelName = "testTunnel";
        CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
        CreateTunnelResponse resp = tunnelClient.createTunnel(request);
        // tunnelId パラメーターを使用して TunnelWorker を初期化します。 ListTunnel または DescribeTunnel 操作を呼び出して、トンネル ID を取得できます。
        String tunnelId = resp.getTunnelId();
        System.out.println("Create Tunnel, Id: " + tunnelId);
        //3. カスタムデータ消費コールバックを指定して、自動データ消費を開始します。
        // TunnelWorkerConfig は、より高度なパラメーターを提供します。
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
        try {
            worker.connectAndWorking();
        } catch (Exception e) {
            e.printStackTrace();
            config.shutdown();
            worker.shutdown();
            tunnelClient.shutdown();
        }
    }
}