This topic describes how to run the SDK demo code that is provided by DTS.

Download the SDK demo code

Download and decompress the DtsSubscribeDemo package, use a text editor to open the pom.xml file, and then change the SDK version to the latest.

Note You can obtain the latest version of the change tracking SDK from the Maven website. For more information, visit the Maven page of the change tracking SDK.

Initialize a RegionContext object

A RegionContext object stores the settings of authentication credentials and network access mode. The following code shows how to initialize a RegionContext object.

import java.util.List;
import com.aliyun.drc.clusterclient.RegionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
   public static void main(String[] args) throws Exception {
      // Create a RegionContext object.
      RegionContext context = new RegionContext();
      // Specify the AccessKey ID and AccessKey secret of the Alibaba Cloud account.
      context.setAccessKey("<AccessKey>");
      context.setSecret("<AccessKeySecret>");
       // Specify whether to track data changes over the Internet.
      context.setUsePublicIp(true);
      // Specify whether to transfer data by using the binary format.
      context.setUseBinary(true);
      // Enable the network optimization feature.
      context.setDrcNet(true);

        ...
    }
}

			

Initialize a ClusterClient object

A ClusterClient object connects to a change tracking channel and receives incremental data. The following code shows how to initialize a ClusterClient object.

import java.util.List;
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
public class MainClass
{
      public static void main(String[] args) throws Exception {
        // Create a RegionContext object.
          RegionContext context = new RegionContext();
          context.setAccessKey("<AccessKey>");
          context.setSecret("<AccessKeySecret>");
          context.setUsePublicIp(true);

          // Create a ClusterClient object.
          final ClusterClient client = new DefaultClusterClient(context);

         ...
    }
}

			

Initialize a Listener object

A Listener object uses the notify() method to receive and consume the tracked data. The following code shows how to display the tracked data on the screen.

import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;

public class MainClass
{
    public static void main(String[] args) throws Exception {
        // Initialize the RegionContext object.
        ...
        // Initialize the ClusterClient object.
        ...
        ClusterListener listener = new ClusterListener(){
             @Override
             public void notify(List<ClusterMessage> messages) throws Exception {
                  for (ClusterMessage message : messages) {  
                    // Display the tracked data on the screen.
                      System.out.println(message.getRecord() + ":" + message.getRecord().getTablename() + ":"
                      + message.getRecord().getOpt());  
                      // Call the following method to send an ACK packet to the DTS server.
                      message.ackAsConsumed();
              }
      }
     }
}

			
Note The ackAsConsumed() method sends the checkpoint and timestamp of the latest data record that was consumed by the DTS SDK to the DTS server. If the SDK restarts due to an error, the SDK obtains the consumption checkpoint from the DTS server. The SDK resumes data consumption from the checkpoint. This ensures that the SDK does not consume duplicate data.

Start the ClusterClient object

import java.util.List;

import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
   public static void main(String[] args) throws Exception {
    // Initialize the RegionContext object.
    ...
    // Initialize the ClusterClient object.
    ...
    // Initialize the ClusterListener object.
    ...
    // Add a Listener class.
      client.addConcurrentListener(listener);
      // Specify the ID of the change tracking channel.
      client.askForGUID("dts_rdsr******_DSF");
      // Start a background thread. The main thread cannot exit.
      client.start();
}
			

Before you start a ClusterClient object, add a Listener class to the ClusterClient object. When the ClusterClient object pulls incremental data from the change tracking channel, it also calls the notify() method of the Listener class to consume data.

References

Use SDK samples to consume tracked data