This topic describes how to run the SDK sample code provided by DTS. Here are some examples for your reference.

Download SDK demo code

Download the SDK demo code and decompress it, use a text editor to open pom.xml, then change the SDK version to the latest.

Note You can obtain the latest data subscription SDK version from Maven. For more information, see Maven page of the change tracking SDK.

Initialize RegionContext

RegionContext is mainly used to store security authentication credentials and network access mode settings. The following code provides an example of how to initialize RegionContext.

import java.util.List;
import com.aliyun.drc.clusterclient.RegionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
   public static void main(String[] args) throws Exception {
      // Create RegionContext
      RegionContext context = new RegionContext();
      // Configure the AccessKey ID and AccessKey Secret of the Alibaba Cloud account
      context.setAccessKey("<AccessKey>");
      context.setSecret("<AccessKeySecret>");
       // Indicates whether the data subscription channel is connected through the Internet.
      context.setUsePublicIp(true);
      // Set the transmission data format to binary format
      context.setUseBinary(true);
      // Enable network optimization mode
      context.setDrcNet(true);

        ...
    }
}

            

Initialize ClusterClient

ClusterClient is mainly used to connect a change tracking channel and receive incremental data. The following code shows how to initialize.

import java.util.List;
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
public class MainClass
{
      public static void main(String[] args) throws Exception {
        // Create RegionContext
          RegionContext context = new RegionContext();
          context.setAccessKey("<AccessKey>");
          context.setSecret("<AccessKeySecret>");
          context.setUsePublicIp(true);

          // Create ClusterClient
          final ClusterClient client = new DefaultClusterClient(context);

         ...
    }
}

            

Initialize Listeners

Listener can receive subscribed data and complete data consumption by defining notify function. The following code provides an example of the simple consumption logic, the ClusterClient would print the subscribed incremental data to the screen.

import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;

public class MainClass
{
    public static void main(String[] args) throws Exception {
        // Initialize RegionContext
        ………
        // Initialize ClusterClient
        ………
        ClusterListener listener = new ClusterListener(){
             @Override
             public void notify(List<ClusterMessage> messages) throws Exception {
                  for (ClusterMessage message : messages) {  
                    // Print the subscription change data to the screen
                      System.out.println(message.getRecord() + ":" + message.getRecord().getTablename() + ":"
                      + message.getRecord().getOpt());  
                      // Send an ACK confirmation message to the DTS server after consumption. This message must be called.
                      message.ackAsConsumed();
              }
      }
     }
}

            
Note ackAsConsumed() would send the consumption checkpoint and timestamp of the latest data record consumed by the SDK to the DTS server. So, if the SDK fails, the system automatically obtains the consumption checkpoint from the DTS server and continues consumption to avoid consuming duplicate data.

Start ClusterClient

import java.util.List;

import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
   public static void main(String[] args) throws Exception {
    // Initialize RegionContext
    ...
    // Initialize ClusterClient
    ...
    // Initialize ClusterListener
    ...
    // Add listeners.
      client.addConcurrentListener(listener);
      // Set subscription channel ID
      client.askForGUID("dts_rdsr******_DSF");
      // Start Background thread (main thread cannot exit)
      client.start();
}
            

At startup ClusterClient before, you need to Listener add to ClusterClient medium, When ClusterClient obtains incremental data from the change tracking Channel, it would use the notify function which also called by Listener to execute data consumption command.

References

Use SDK to consume data