All Products
Search
Document Center

Data Transmission Service:Use an SDK to consume tracked data

Last Updated:Aug 22, 2025

After you create a tracking task and a consumer group for a change tracking channel, you can use the Data Transmission Service (DTS) software development kit (SDK) to consume the tracked data. This topic describes how to use the provided sample code.

Note

Prerequisites

Usage notes

  • When you consume tracked data, you must call the commit method of DefaultUserRecord to commit the offset. Otherwise, data will be consumed repeatedly.

  • Different consumption processes are independent of each other.

  • The Current Offset in the console shows the current offset of the tracking task, not the offset submitted by the client.

Procedure

  1. Download the SDK sample code file, and then decompress the file.

  2. Verify the version of the SDK code.

    1. Go to the directory where you decompressed the file.

    2. Use a text editor to open the pom.xml file in the directory.

    3. Change the change tracking SDK version (version) to the latest version.

      Note

      You can view the latest Maven dependency on the dts-new-subscribe-sdk page.

      Location of the SDK version parameter (click to expand)

      <name>dts-new-subscribe-sdk</name>
      <url>https://www.aliyun.com/product/dts</url>
      <description>The Aliyun new Subscribe SDK for Java used for accessing Data Transmission Service</description>
      <packaging>jar</packaging>
      <groupId>com.aliyun.dts</groupId>
      <artifactId>dts-new-subscribe-sdk</artifactId>
      <version>2.1.4</version>
  3. Edit the SDK code.

    1. Use a code editor to open the decompressed file.

    2. Open the Java file that corresponds to the consumption mode of the SDK client.

      Note

      The path of the Java file is aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/.

      Usage mode

      Java file

      Description

      Scenarios

      ASSIGN mode

      DTSConsumerAssignDemo.java

      To ensure the global order of messages, DTS assigns only one partition, partition 0, to each tracking topic. When the SDK client is used in ASSIGN mode, we recommend that you start only one SDK client.

      Only one SDK client consumes tracked data in the same consumer group.

      SUBSCRIBE mode

      DTSConsumerSubscribeDemo.java

      To ensure the global order of messages, DTS assigns only one partition, partition 0, to each tracking topic. When the SDK client is used in SUBSCRIBE mode, you can start multiple SDK clients in one consumer group at the same time to implement disaster recovery. This works because if a client that is consuming data in the consumer group fails, other SDK clients are randomly and automatically assigned to partition 0 to continue consumption.

      Multiple SDK clients consume tracked data in the same consumer group. This is applicable to data disaster recovery scenarios.

    3. Set the parameters in the Java code.

      Sample code

      ******        
          public static void main(String[] args) {
              // kafka broker url
              String brokerUrl = "dts-cn-***.com:18001";
              // topic to consume, partition is 0
              String topic = "cn_***_version2";
              // user password and sid for auth
              String sid = "dts***";
              String userName = "dts***";
              String password = "DTS***";
              // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019))
              String initCheckpoint = "1740472***";
              // when use subscribe mode, group config is required. kafka consumer group is enabled
              ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE;
        
              DTSConsumerSubscribeDemo consumerDemo = new DTSConsumerSubscribeDemo(brokerUrl, topic, sid, userName, password, initCheckpoint, subscribeMode);
              consumerDemo.start();
          }
      ******

      Parameter

      Description

      How to obtain

      brokerUrl

      The network address and port number of the change tracking channel.

      Note
      • If the server on which you deploy the SDK client, such as an ECS instance, and the change tracking instance are in the same VPC, we recommend that you use the VPC address for data consumption to reduce network latency.

      • Because of potential network instability, we do not recommend using a public endpoint.

      In the DTS console, click the target subscription instance ID. This opens the Basic Information page, where you can obtain the endpoint and port number from the Network section.

      topic

      The tracking topic of the change tracking channel.

      In the DTS console, click the target subscription instance ID. On the Basic Information page, find the Topic in the Basic Information section.

      sid

      The consumer group ID.

      In the DTS console, click the target subscription instance ID. On the Consume Data page, obtain the Consumer Group ID/Name and the Account for the consumer group.

      userName

      The username for the consumer group account.

      Warning

      If you are not using the client provided in this topic, set the username to the <Username of the consumer group>-<Consumer group ID> format. Example: dtstest-dtsae******bpv. Otherwise, the connection will fail.

      password

      The password for the account.

      The password for the consumer group account that you set when you created the consumer group.

      initCheckpoint

      The consumer offset. This parameter specifies the timestamp of the first data record to be consumed by the SDK client. The value must be a UNIX timestamp, such as 1620962769.

      Note

      The consumer offset can be used for the following purposes:

      • Continue data consumption from the last consumed offset after an application interruption. This prevents data loss.

      • Start data consumption from a specific offset when the client starts. This lets you consume data from a specific point in time.

      The consumer offset must be within the timestamp range of the change tracking instance and must be converted to a UNIX timestamp.

      Note

      You can view the data range for the target change tracking instance in the Data Range column of the tracking task list.

      subscribeMode

      The consumption mode of the SDK client. You do not need to modify this parameter.

      • ConsumerContext.ConsumerSubscribeMode.ASSIGN: ASSIGN mode.

      • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: SUBSCRIBE mode.

      None

  4. Open the project structure in the code editor and ensure that the OpenJDK version for this project is 1.8.

  5. Run the client code.

    Note

    When you run the code for the first time, the code editor may take some time to automatically load the required plug-ins and dependencies.

    Sample results (click to expand)

    Normal run results

    If the following result is returned, the client is running as expected and can track data changes from the source database.

    ******
    [2025-02-25 18:47:22.991] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.consumer.KafkaConsumer:1587] - [Consumer clientId=consumer-dtsl5vy2ao5250****-1, groupId=dtsl5vy2ao5250****] Seeking to offset 8200 for partition cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0
    [2025-02-25 18:47:22.993] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap:116] - RecordFetcher consumer:  subscribe for [cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0] with checkpoint [Checkpoint[ topicPartition: cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0timestamp: 174048****, offset: 8200, info: 174048****]] start
    [2025-02-25 18:47:23.011] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174048044****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8200]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8201]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    ******

    Normal tracking results

    The following result indicates that the client successfully tracked a data change (an UPDATE operation) from the source database.

    ******
    [2025-02-25 18:48:24.905] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8413]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [UPDATE]
    Schema info [{, 
    recordFields= [{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}, {fieldName='name', rawDataTypeNum=253, isPrimaryKey=false, isUniqueKey=false, fieldPosition=1}], 
    databaseName='dtsdb', 
    tableName='person', 
    primaryIndexInfo [[indexType=PrimaryKey, indexFields=[{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}], cardinality=0, nullable=true, isFirstUniqueIndex=false, name=null]], 
    uniqueIndexInfo [[]], 
    partitionFields = null}]
    Before image {[Field [id] [3]
    Field [name] [test1]
    ]}
    After image {[Field [id] [3]
    Field [name] [test2]
    ]}
    ******

    Abnormal run results

    If the following result is returned, the client failed to connect to the source database.

    ******
    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:22.002] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****-1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:22.509] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    [2025-02-25 18:22:23.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":1740478943160,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:27.192] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:27.618] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    ******

    The SDK client periodically collects and displays statistics information about data consumption. This information includes the total number of data records sent and received, the total data volume, and the number of records per second (RPS) received.

    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}

    Parameter

    Description

    outCounts

    The total number of data records consumed by the SDK client.

    outBytes

    The total amount of data consumed by the SDK client, in bytes.

    outRps

    The RPS when the SDK client consumes data.

    outBps

    The number of bits transmitted per second when the SDK client consumes data.

    count

    The total number of parameters in the data consumption information (metrics) of the SDK client.

    Note

    This does not include count itself.

    inBytes

    The total amount of data sent by the DTS server, in bytes.

    DStoreRecordQueue

    The current size of the data cache queue when the DTS server sends data.

    inCounts

    The total number of data records sent by the DTS server.

    inBps

    The number of bits transmitted per second when the DTS server sends data.

    inRps

    The RPS when the DTS server sends data.

    __dt

    The current timestamp when the SDK client receives data, in milliseconds.

    DefaultUserRecordQueue

    The current size of the data cache queue after serialization.

  6. Edit the code to consume tracked data as needed.

    When you consume subscribed data, you must manage consumer offsets. This practice prevents data loss and duplication and allows for on-demand consumption.

FAQ

  • What do I do if I cannot connect to the change tracking instance?

    Troubleshoot the issue based on the error message. For more information, see Troubleshooting.

  • What is the data format of a consumer offset after persistence?

    After a consumer offset is persisted, it is returned in JSON format. The persisted consumer offset is a UNIX timestamp that you can pass directly to the SDK. In the following example, 1700709977 after "timestamp" is the persisted consumer offset.

    {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}
  • Does a tracking task support parallel consumption by multiple clients?

    No. Although the SUBSCRIBE mode allows multiple clients to run in parallel, only one client can actively consume data at any given time.

  • Which version of the Kafka client is encapsulated in the SDK code?

    Version 2.0.0 and later of dts-new-subscribe-sdk encapsulate version 2.7.0 of the Kafka client (kafka-clients). Versions earlier than 2.0.0 encapsulate version 1.0.0 of the Kafka client.

Appendix

Manage consumer offsets

When the SDK client starts for the first time, restarts, or performs an internal retry, you must query and pass the consumer offset to start or resume data consumption. The consumer offset is the UNIX timestamp of the first data record that the SDK client consumes.

To reset the consumer offset of the client, you can query and modify the consumer offset based on the consumption mode (SDK usage mode) as described in the following table.

Scenario

SDK usage mode

Offset management method

Query a consumer offset

ASSIGN mode, SUBSCRIBE mode

  • The SDK client saves the message offset every 5 seconds and submits it to the DTS server. To query the most recent consumer offset, you can check the following locations:

    • The localCheckpointStore file on the server where the SDK client is located.

    • The Data Consumption interface of the subscription channel.

  • If you configure an external persistent shared storage medium, such as a database, using setUserRegisteredStore(new UserMetaStore()) in the consumerContext.java file, the storage medium saves the message offset every 5 seconds for you to query.

When starting the SDK client for the first time, pass a consumer offset to consume data.

ASSIGN mode, SUBSCRIBE mode

Based on the usage mode of your SDK client, select the DTSConsumerAssignDemo.java or DTSConsumerSubscribeDemo.java file. Then, configure the consumer offset (initCheckpoint) to consume data.

The SDK client needs to re-pass the last recorded consumer offset to continue data consumption due to an internal retry.

ASSIGN mode

Search for the last recorded consumer offset in the following order. Once found, the offset information is returned:

  1. The external storage medium that you configured using setUserRegisteredStore(new UserMetaStore()) in the consumerContext.java file.

  2. The localCheckpointStore file on the server where the SDK client is located.

  3. The start timestamp that you passed in the DTSConsumerSubscribeDemo.java file using initCheckpoint.

SUBSCRIBE mode

Search for the last recorded consumer offset in the following order. Once found, the offset information is returned:

  1. The external storage medium that you configured using setUserRegisteredStore(new UserMetaStore()) in the consumerContext.java file.

  2. The offset saved by the DTS Server (incremental data collection module).

    Note

    The offset is updated only after the SDK client calls the commit method.

  3. The start timestamp that you passed in the DTSConsumerSubscribeDemo.java file using initCheckpoint.

  4. Use the starting offset of the DTS Server (new incremental data collection module).

    Important

    If a switchover of the incremental data collection module occurs, the new module cannot save the client's last consumer offset, which may cause you to start consuming subscribed data from an older offset. We recommend that you persistently store consumer offsets on the client.

After restarting the SDK client, re-pass the last recorded consumer offset to continue data consumption.

ASSIGN mode

Query the consumer offset based on the setForceUseCheckpoint configuration in the consumerContext.java file. Once found, the offset information is returned:

  • If set to true, each time the SDK client is restarted, the passed initCheckpoint is forcibly used as the consumer offset.

  • If set to false or not configured, search for the last recorded consumer offset in the following order:

    1. The localCheckpointStore file on the server where the SDK client is located.

    2. The offset saved by the DTS Server (incremental data collection module).

      Note

      The offset is updated only after the SDK client calls the commit method.

    3. The external storage medium that you configured using setUserRegisteredStore(new UserMetaStore()) in the consumerContext.java file.

SUBSCRIBE mode

In this mode, the setForceUseCheckpoint configuration in the consumerContext.java file does not take effect. Search for the last recorded consumer offset in the following order:

  1. The external storage medium that you configured using setUserRegisteredStore(new UserMetaStore()) in the consumerContext.java file.

  2. The offset saved by the DTS Server (incremental data collection module).

    Note

    The offset is updated only after the SDK client calls the commit method.

  3. The start timestamp that you passed in the DTSConsumerSubscribeDemo.java file using initCheckpoint.

  4. Use the starting offset of the DTS Server (new incremental data collection module).

Use persistent storage for consumer offsets

If a disaster recovery mechanism is triggered for the incremental data collection module, especially in SUBSCRIBE mode, the new module cannot retrieve the client's last consumer offset. This may cause the client to start consuming data from an older offset, which results in repeated consumption of historical data. For example, assume that before a service switchover, the offset range of the old module is from 08:00:00 on November 11, 2023, to 08:00:00 on November 12, 2023. The client's consumer offset is at the end of this range: 08:00:00 on November 12, 2023. After the switchover, the offset range of the new module is from 10:00:00 on November 8, 2023, to 08:01:00 on November 12, 2023. In this scenario, the client starts consuming data from the new module's starting offset (10:00:00 on November 8, 2023), which causes historical data to be consumed again.

To avoid repeated consumption of historical data in this switchover scenario, we recommend that you configure a persistent storage method for consumer offsets on the client. The following section provides a sample method that you can modify based on your requirements.

  1. Create a UserMetaStore() method that inherits from and implements the AbstractUserMetaStore() method.

    For example, to use a MySQL database to store offset information, the Java sample code is as follows:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
    			  String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
    			  String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
    						ResultSet rs = pres.executeQuery()
                              
                String checkpoint = rs.getString("checkpoint");
              
                return checkpoint;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    }
    
  2. In the consumerContext.java file, configure the external storage medium using the setUserRegisteredStore(new UserMetaStore()) method.

Troubleshooting

Fault

Error message

Cause

Solution

Connection failed

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

The brokerUrl is incorrect.

Enter valid values for the brokerUrl, userName, and password parameters. For more information, see Parameter description.

telnet real node *** failed, please check the network

Cannot connect to the real IP address through the broker address.

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

The username or password is incorrect.

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

In the consumerContext.java file, setUseCheckpoint is set to true, but the consumer offset is not within the timestamp range of the change tracking instance.

Specify a consumer offset within the data range of the change tracking instance. For more information, see the parameter description table in this topic.

Data consumption slows down

None

  • You can analyze the cause of slow data consumption by querying the size of the DStoreRecordQueue and DefaultUserRecordQueue queues in the statistics information.

    • If the DStoreRecordQueue parameter remains 0, the DTS server is pulling data at a slower speed.

    • If the DefaultUserRecordQueue parameter remains at the default value of 512, the SDK client is consuming data at a slower speed.

  • As needed, modify the consumer offset (initCheckpoint) in the code to reset the offset.