This topic describes how to use Tablestore SDK for Java to start Tunnel Service. Example code is provided in the References section at the end of this topic.

  1. Initialize a TunnelClient instance.
    // Set endPoint to the endpoint of the Tablestore instance. Example: https://instance.cn-hangzhou.ots.aliyuncs.com. // Set accessKeyId and accessKeySecret to the AccessKey ID and AccessKey secret used to access Tablestore. // Set instanceName to the name of the target instance. final String endPoint = "";
    final String accessKeyId = "";
    final String accessKeySecret = "";
    final String instanceName = "";
    TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
  2. Create a tunnel.

    Create a table for testing or prepare an existing table before you create the tunnel. To create a table for testing, you can use the createTable method in the SyncClient class or go to the Tablestore console.

    // You can create three types of tunnels by using TunnelType.BaseData, TunnelType.Stream, and TunnelType.BaseAndStream. // The following code provides an example on how to create a differential tunnel. To create a tunnel of another type, set TunnelType in CreateTunnelRequest as needed. final String tableName = "testTable";
    final String tunnelName = "testTunnel";
    CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = tunnelClient.createTunnel(request);
    // Use tunnelId to initialize TunnelWorker. You can call the ListTunnel or DescribeTunnel operation to obtain the tunnel ID. String tunnelId = resp.getTunnelId(); 
    System.out.println("Create Tunnel, Id: " + tunnelId);
  3. Customize the data consumption callback to start automatic data consumption.
    // Customize the data consumption callback or call the IChannelProcessor operation. Specify the process and shutdown methods. private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            // ProcessRecordsInput includes the data that you have obtained.        System.out.println("Default record processor, would print records count");
            System.out.println(
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // Mock Record Process.
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    
    // By default, TunnelWorkerConfig starts the thread pool to read and process data. A single server starts multiple TunnelWorkers. // We recommend that you share the same TunnelWorkerConfig. TunnelWorkerConfig provides more advanced parameters. TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    // Configure TunnelWorker and start automatic data processing. TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
    try {
        worker.connectAndWorking();
    } catch (Exception e) {
        e.printStackTrace();
        worker.shutdown();
        tunnelClient.shutdown();
    }
  4. Detail analysis
    • By default, TunnelWorkerConfig starts the thread pool to read and process data. A single server starts multiple TunnelWorkers. We recommend that you share the same TunnelWorkerConfig.
    • When you create a differential tunnel to consume full and incremental data, the tunnel retains incremental logs for a maximum of seven days. The specific expiration time of incremental logs is the same as that of incremental data for a table. If the tunnel does not consume full data within seven days, the OTSTunnelExpired error occurs when the tunnel starts to consume incremental data. As a result, the tunnel cannot consume incremental data. If you estimate that the tunnel cannot consume full data within seven days, submit a ticket or join DingTalk group 11789671 to request technical support.
    • TunnelWorker requires warm-up time during initialization. The heartbeatIntervalInSec parameter in TunnelWorkerConfig determines the warm-up time. You can use the setHeartbeatIntervalInSec method in TunnelWorkerConfig to set this parameter. The default value is 30s and the minimum value is 5s. For more information, see Description of the data consumption framework and Configure the data consumption framework.
    • When the mode switches from the full channel to the incremental channel, the full channel is closed and the incremental channel is started. This process requires the time for initialization. The heartbeatIntervalInSec parameter determines the initialization time.
    • When the TunnelWorker client is abnormally shut down due to an exceptional exit or manual termination, TunnelWorker automatically recycles resources in the following ways: 1. Release the thread pool. 2. Automatically call the shutdown method that you have registered for the Channel class. 3. Shut down the tunnel.
  5. References
    import com.alicloud.openservices.tablestore.TunnelClient;
    import com.alicloud.openservices.tablestore.TunnelClient;
    import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
    import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
    import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
    import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
    import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
    import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
    import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
    public class TunnelQuickStart {
        private static class SimpleProcessor implements IChannelProcessor {
            @Override
            public void process(ProcessRecordsInput input) {
                System.out.println("Default record processor, would print records count");
                System.out.println(
                    String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
                try {
                    // Mock Record Process.
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            @Override
            public void shutdown() {
                System.out.println("Mock shutdown");
            }
        }
        public static void main() throws Exception {        // 1. Initialize a TunnelClient instance.        final String endPoint = "";
            final String accessKeyId = "";
            final String accessKeySecret = "";
            final String instanceName = "";
            TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
            // 2. Create a tunnel. You must use the createTable method in SyncClient or go to the Tablestore console to create a table for testing in advance.        final String tableName = "testTable";
            final String tunnelName = "testTunnel";
            CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
            CreateTunnelResponse resp = tunnelClient.createTunnel(request);
            // Use tunnelId to initialize TunnelWorker. You can call the ListTunnel or DescribeTunnel operation to obtain the tunnel ID.        String tunnelId = resp.getTunnelId();
            System.out.println("Create Tunnel, Id: " + tunnelId);
            // 3. Customize the data consumption callback to start automatic data consumption.        // TunnelWorkerConfig contains more advanced parameters. For more information, see the description in the related topic.        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
            TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
            try {
                worker.connectAndWorking();
            } catch (Exception e) {
                e.printStackTrace();
                worker.shutdown();
                tunnelClient.shutdown();
            }
        }
    }