All Products
Search
Document Center

Data Transmission Service:Consume tracked data using an SDK

Last Updated:Dec 31, 2025

After you configure a change tracking channel by creating a tracking task and a consumer group, you can use the software development kit (SDK) provided by DTS to consume the tracked data. This topic describes how to use the sample code.

Note

Prerequisites

Notes

  • When you consume tracked data, call the `commit` method of `DefaultUserRecord` to commit the offset information. If you do not commit the offset, data may be consumed repeatedly.

  • Different consumption processes are independent of each other.

  • In the console, Current Offset indicates the offset that the tracking task has subscribed to, not the offset committed by the client.

Procedure

  1. Download the SDK sample code file and decompress it.

  2. Check the SDK code version.

    1. Go to the directory where you decompressed the SDK sample code.

    2. Use a text editor to open the pom.xml file in the directory.

    3. Update the version of the change tracking SDK to the latest version.

      Note

      You can view the latest Maven dependency on the dts-new-subscribe-sdk page.

      Location of the SDK version parameter (click to expand)

      <name>dts-new-subscribe-sdk</name>
      <url>https://www.aliyun.com/product/dts</url>
      <description>The Aliyun new Subscribe SDK for Java used for accessing Data Transmission Service</description>
      <packaging>jar</packaging>
      <groupId>com.aliyun.dts</groupId>
      <artifactId>dts-new-subscribe-sdk</artifactId>
      <version>2.1.4</version>
  3. Edit the SDK code.

    1. Use an IDE to open the decompressed file.

    2. Open the Java file for the mode that the SDK client uses.

      Note

      The path of the Java file is aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/.

      Usage mode

      Java file

      Description

      Scenarios

      ASSIGN mode

      DTSConsumerAssignDemo.java

      To ensure that messages are globally ordered, DTS assigns only one partition, partition 0, to each tracking topic. If the SDK client uses the ASSIGN mode, start only one SDK client.

      Only one SDK client in a consumer group consumes tracked data.

      SUBSCRIBE mode

      DTSConsumerSubscribeDemo.java

      To ensure that messages are globally ordered, DTS assigns only one partition, partition 0, to each tracking topic. If the SDK client uses the SUBSCRIBE mode, you can start multiple SDK clients in the same consumer group to implement disaster recovery. The principle is that if a client that is consuming data in the consumer group fails, another SDK client is randomly and automatically assigned to partition 0 to continue data consumption.

      Multiple SDK clients in the same consumer group consume tracked data at the same time. This applies to disaster recovery scenarios.

    3. Set the parameters in the Java code.

      Sample code

      ******        
          public static void main(String[] args) {
              // kafka broker url
              String brokerUrl = "dts-cn-***.com:18001";
              // topic to consume, partition is 0
              String topic = "cn_***_version2";
              // user password and sid for auth
              String sid = "dts***";
              String userName = "dts***";
              String password = "DTS***";
              // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019))
              String initCheckpoint = "1740472***";
              // when use subscribe mode, group config is required. kafka consumer group is enabled
              ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE;
        
              DTSConsumerSubscribeDemo consumerDemo = new DTSConsumerSubscribeDemo(brokerUrl, topic, sid, userName, password, initCheckpoint, subscribeMode);
              consumerDemo.start();
          }
      ******

      Parameter

      Description

      How to obtain

      brokerUrl

      The network address and port of the change tracking channel.

      Note
      • If the server where you deploy the SDK client, such as an ECS instance, and the change tracking instance are in the same virtual private cloud (VPC), consume data over the VPC to reduce network latency.

      • We do not recommend using a public endpoint because of potential network instability.

      In the DTS console, click the target subscription instance ID. On the Basic Information page, obtain the network address and port from the Network section.

      topic

      The topic for the change tracking channel subscription.

      In the DTS console, click the target subscription instance ID. On the Basic Information page, obtain the Topic from the Basic Information section.

      sid

      The ID of the consumer group.

      In the DTS console, click the target subscription instance ID. On the Consume Data page, obtain the Consumer Group ID/Name and the consumer group's Account information.

      userName

      The username of the consumer group.

      Warning

      If you do not use the client provided in this topic, set the username in the <Username of the consumer group>-<Consumer group ID> format. For example, dtstest-dtsae******bpv. Otherwise, the connection fails.

      password

      The password for the account.

      The password that you set for the consumer group username when you created the consumer group.

      initCheckpoint

      The consumer offset. This is the timestamp when the SDK client consumes the first data record. The format is a UNIX timestamp. For example, 1620962769.

      Note

      The consumer offset information can be used for the following purposes:

      • If the application is interrupted, you can pass the consumed offset to continue data consumption and prevent data loss.

      • When the tracking client starts, you can pass the required consumer offset to adjust the tracking offset and consume data as needed.

      The consumer offset must be within the timestamp range of the change tracking instance and must be converted to a UNIX timestamp.

      Note

      You can view the timestamp range of the target subscription instance in the Data Range column of the tracking task list.

      subscribeMode

      The usage mode of the SDK client. You do not need to modify this parameter.

      • ConsumerContext.ConsumerSubscribeMode.ASSIGN: ASSIGN mode.

      • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: SUBSCRIBE mode.

      None

  4. Open the project structure in your IDE and make sure that the OpenJDK version for the project is 1.8.

  5. Run the client code.

    Note

    When you run the code for the first time, the IDE takes some time to automatically load the required plug-ins and dependencies.

    Example of a runtime result (click to expand)

    Normal runtime result

    If the following result is returned, the client is running as expected and can track data changes in the source database.

    ******
    [2025-02-25 18:47:22.991] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.consumer.KafkaConsumer:1587] - [Consumer clientId=consumer-dtsl5vy2ao5250****-1, groupId=dtsl5vy2ao5250****] Seeking to offset 8200 for partition cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0
    [2025-02-25 18:47:22.993] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap:116] - RecordFetcher consumer:  subscribe for [cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0] with checkpoint [Checkpoint[ topicPartition: cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0timestamp: 174048****, offset: 8200, info: 174048****]] start
    [2025-02-25 18:47:23.011] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174048044****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8200]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8201]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    ******

    Normal subscription results

    If the following result is returned, the client has successfully tracked a data change (an UPDATE operation) in the source database.

    ******
    [2025-02-25 18:48:24.905] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8413]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [UPDATE]
    Schema info [{, 
    recordFields= [{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}, {fieldName='name', rawDataTypeNum=253, isPrimaryKey=false, isUniqueKey=false, fieldPosition=1}], 
    databaseName='dtsdb', 
    tableName='person', 
    primaryIndexInfo [[indexType=PrimaryKey, indexFields=[{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}], cardinality=0, nullable=true, isFirstUniqueIndex=false, name=null]], 
    uniqueIndexInfo [[]], 
    partitionFields = null}]
    Before image {[Field [id] [3]
    Field [name] [test1]
    ]}
    After image {[Field [id] [3]
    Field [name] [test2]
    ]}
    ******

    Abnormal runtime result

    If the following result is returned, the client cannot connect to the source database.

    ******
    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:22.002] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****-1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:22.509] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    [2025-02-25 18:22:23.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":1740478943160,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:27.192] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:27.618] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    ******

    The SDK client periodically collects and displays statistics information about data consumption, including the total number of sent and received data records, total data volume, and records per second (RPS).

    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}

    Parameter

    Description

    outCounts

    The total number of data records consumed by the SDK client.

    outBytes

    The total volume of data consumed by the SDK client, in bytes.

    outRps

    The number of requests per second for data consumption by the SDK client.

    outBps

    The number of bits transmitted per second for data consumption by the SDK client.

    count

    The total number of parameters in the metrics of the SDK client for data consumption.

    Note

    This does not include count itself.

    inBytes

    The total volume of data sent by the DTS server, in bytes.

    DStoreRecordQueue

    The current size of the data cache queue when the DTS server sends data.

    inCounts

    The total number of data records sent by the DTS server.

    inBps

    The number of bits transmitted per second when the DTS server sends data.

    inRps

    The number of requests per second when the DTS server sends data.

    __dt

    The timestamp when the SDK client receives the data, in milliseconds.

    DefaultUserRecordQueue

    The current size of the data cache queue after the data is serialized.

  6. Edit the code to consume the tracked data as needed.

    When you consume tracked data, you must manage consumer offsets to prevent data loss, minimize repeated consumption, and consume data as needed.

FAQ

  • What should I do if I cannot connect to a subscription instance?

    Troubleshoot the issue based on the error message. For more information, see Troubleshooting.

  • What is the data format of a persisted consumer offset?

    After a consumer offset is persisted, the data is returned in JSON format. The persisted consumer offset is a UNIX timestamp that you can pass directly to the SDK. In the following returned data, 1700709977 after "timestamp" is the persisted consumer offset.

    {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}
  • Can multiple clients consume data in parallel for a tracking task?

    No. The SUBSCRIBE mode allows multiple clients to run in parallel, but only one client can consume data at a time.

  • Which version of the Kafka client is encapsulated in the SDK code?

    dts-new-subscribe-sdk versions 2.0.0 and later encapsulate Kafka client 2.7.0 (kafka-clients). Versions earlier than 2.0.0 encapsulate Kafka client 1.0.0.

Appendix

Manage consumer offsets

When the SDK client starts for the first time, restarts, or performs an internal retry, you must query and enter a consumer offset to start or resume data consumption. The consumer offset is the UNIX timestamp when the SDK client consumes the first data record.

To reset the consumer offset of the client, you can query and modify the consumer offset based on the SDK usage mode. The following table describes how to query and modify the consumer offset.

Scenario

SDK usage mode

Offset management method

Query a consumer offset

ASSIGN mode, SUBSCRIBE mode

  • The SDK client saves the message offset every 5 seconds and commits it to the DTS server. To query the latest consumer offset, check one of the following locations:

    • The localCheckpointStore file on the server where the SDK client is located.

    • The Data Consumption interface of the subscription channel.

  • If you configure an external persistent shared storage medium, such as a database, in the setUserRegisteredStore(new UserMetaStore()) method of the consumerContext.java file, the storage medium saves the message offset every 5 seconds for you to query.

The SDK client starts for the first time and requires a consumer offset to consume data.

ASSIGN mode, SUBSCRIBE mode

Based on the SDK client usage mode, select the DTSConsumerAssignDemo.java or DTSConsumerSubscribeDemo.java file and configure the consumer offset (initCheckpoint) to start consumption.

The SDK client needs to re-enter the last recorded consumer offset to continue data consumption due to an internal retry.

ASSIGN mode

Search for the last recorded consumer offset in the following order. If an offset is found, it is returned:

  1. The external storage medium that you configured in the setUserRegisteredStore(new UserMetaStore()) method of the consumerContext.java file.

  2. The localCheckpointStore file on the server where the SDK client is located.

  3. The start timestamp that you passed in for initCheckpoint in the DTSConsumerSubscribeDemo.java file.

SUBSCRIBE mode

Search for the last recorded consumer offset in the following order. If an offset is found, it is returned:

  1. The external storage medium that you configured in the setUserRegisteredStore(new UserMetaStore()) method of the consumerContext.java file.

  2. The offset saved on the DTS Server (incremental data collection module).

    Note

    This offset is updated only after the SDK client calls the commit method to update the consumer offset.

  3. The start timestamp that you passed in for initCheckpoint in the DTSConsumerSubscribeDemo.java file.

  4. The start offset of the DTS Server (new incremental data collection module).

    Important

    If the incremental data collection module is switched, the new incremental data collection module cannot save the last consumer offset of the client. This may cause data to be consumed from an earlier offset. We recommend that you persistently store consumer offsets on the client.

The SDK client is restarted and needs to re-enter the last recorded consumer offset to continue data consumption.

ASSIGN mode

Query the consumer offset based on the setForceUseCheckpoint configuration in the consumerContext.java file. If an offset is found, it is returned:

  • If the parameter is set to true, the passed `initCheckpoint` is forcibly used as the consumer offset each time the SDK client is restarted.

  • If the parameter is set to false or is not configured, search for the last recorded consumer offset in the following order:

    1. The localCheckpointStore file on the server where the SDK client is located.

    2. The offset saved on the DTS Server (incremental data collection module).

      Note

      This offset is updated only after the SDK client calls the commit method to update the consumer offset.

    3. The external storage medium that you configured in the setUserRegisteredStore(new UserMetaStore()) method of the consumerContext.java file.

SUBSCRIBE mode

In this mode, the setForceUseCheckpoint configuration in the consumerContext.java file does not take effect. Search for the last recorded consumer offset in the following order:

  1. The external storage medium that you configured in the setUserRegisteredStore(new UserMetaStore()) method of the consumerContext.java file.

  2. The offset saved on the DTS Server (incremental data collection module).

    Note

    This offset is updated only after the SDK client calls the commit method to update the consumer offset.

  3. The start timestamp that you passed in for initCheckpoint in the DTSConsumerSubscribeDemo.java file.

  4. The start offset of the DTS Server (new incremental data collection module).

Persistently store consumer offsets

If the disaster recovery mechanism is triggered for the incremental data collection module, especially in SUBSCRIBE mode, the new incremental data collection module cannot save the last consumer offset of the client. This may cause the client to start consuming tracked data from an earlier offset, resulting in repeated consumption of historical data. For example, before the incremental data service is switched, the offset range of the old incremental data collection module is from 08:00:00 on November 11, 2023, to 08:00:00 on November 12, 2023, and the consumer offset of the client is 08:00:00 on November 12, 2023. After the incremental data service is switched, the offset range of the new incremental data collection module is from 10:00:00 on November 08, 2023, to 08:01:00 on November 12, 2023. In this case, the client starts consumption from the start offset of the new incremental data collection module (10:00:00 on November 08, 2023). This causes repeated consumption of historical data.

To prevent repeated consumption of historical data in such a switchover scenario, you can configure a persistent storage method for consumer offsets on the client. The following sample method shows how to do this. You can modify it as needed.

  1. Create a UserMetaStore() method that inherits from and implements the AbstractUserMetaStore() method.

    The following sample Java code shows how to use a MySQL database to store offset information:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
    			  String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
    			  String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
    						ResultSet rs = pres.executeQuery()
                              
                String checkpoint = rs.getString("checkpoint");
              
                return checkpoint;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    }
    
  2. In the consumerContext.java file, configure an external storage medium in the setUserRegisteredStore(new UserMetaStore()) method.

Troubleshooting

Exception

Error message

Cause

Solution

Connection failure

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

The brokerUrl is incorrect.

Enter the correct brokerUrl, userName, and password. For more information about how to obtain these parameters, see Metric description.

telnet real node *** failed, please check the network

The real IP address cannot be connected through the broker address.

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

The username and password are incorrect.

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

In the consumerContext.java file, setUseCheckpoint is set to true, but the consumer offset is not within the timestamp range of the change tracking instance.

Enter a consumer offset that is within the timestamp range of the change tracking instance. For more information about how to query the range, see Metric description.

The subscription consumption rate decreases.

None

  • Analyze the cause of slow data consumption by checking the queue sizes of the DStoreRecordQueue and DefaultUserRecordQueue parameters in the statistics information.

    • If the DStoreRecordQueue parameter remains 0, the speed at which the DTS server pulls data is slow.

    • If the DefaultUserRecordQueue parameter remains at the default value of 512, the speed at which the SDK client consumes data is slow.

  • Modify the consumer offset (initCheckpoint) in the code to reset the offset as needed.