All Products
Search
Document Center

Data Transmission Service:Consume subscribed data using an SDK

Last Updated:Jan 23, 2026

After you configure a change tracking channel by creating a tracking task and a consumer group, you can use the software development kit (SDK) provided by DTS to consume the subscribed data. This topic describes how to use the sample code.

Note

Prerequisites

Precautions

  • When you consume subscribed data, you must call the commit method of DefaultUserRecord to commit the offset information. Otherwise, data might be consumed repeatedly.

  • Different consumption processes are independent of each other.

  • In the console, Current Offset indicates the offset that the tracking task has subscribed to, not the offset committed by the client.

Procedure

  1. Download the sample SDK code files and decompress the package.

  2. Verify the SDK code version.

    1. Go to the directory where you decompressed the sample SDK code.

    2. Use a text editor to open the pom.xml file in the directory.

    3. Update the change tracking SDK to the latest version.

      Note

      You can find the latest Maven dependency on the dts-new-subscribe-sdk page.

      Location of the SDK version parameter (click to expand)

      <name>dts-new-subscribe-sdk</name>
      <url>https://www.aliyun.com/product/dts</url>
      <description>The Aliyun new Subscribe SDK for Java used for accessing Data Transmission Service</description>
      <packaging>jar</packaging>
      <groupId>com.aliyun.dts</groupId>
      <artifactId>dts-new-subscribe-sdk</artifactId>
      <version>2.1.4</version>
  3. Edit the SDK code.

    1. Use an integrated development environment (IDE) to open the decompressed files.

    2. Open the Java file that corresponds to the mode in which you want to use the SDK client.

      Note

      The path of the Java file is aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/.

      Usage mode

      Java file

      Description

      Scenarios

      ASSIGN mode

      DTSConsumerAssignDemo.java

      To ensure global message order, DTS assigns only one partition (partition 0) to each tracking topic. If you use the SDK client in ASSIGN mode, start only one SDK client.

      Only one SDK client in a consumer group consumes subscribed data.

      SUBSCRIBE mode

      DTSConsumerSubscribeDemo.java

      To ensure global message order, DTS assigns only one partition (partition 0) to each tracking topic. If you use the SDK client in SUBSCRIBE mode, you can start multiple SDK clients in the same consumer group for disaster recovery. If the client that is consuming data fails, another SDK client is randomly and automatically assigned to partition 0 to continue consumption.

      Multiple SDK clients in the same consumer group consume subscribed data. This is a data disaster recovery scenario.

    3. Set the parameters in the Java code.

      Sample code

      ******        
          public static void main(String[] args) {
              // The Kafka broker URL.
              String brokerUrl = "dts-cn-***.com:18001";
              // The topic from which to consume data. The partition is 0.
              String topic = "cn_***_version2";
              // The username, password, and SID for authentication.
              String sid = "dts***";
              String userName = "dts***";
              String password = "DTS***";
              // The initial checkpoint for the first seek. This is a UNIX timestamp. For example, set this parameter to 1566180200 if you want to start consumption from 10:03:21 (CST) on Monday, August 19, 2019.
              String initCheckpoint = "1740472***";
              // If you use the SUBSCRIBE mode, you must configure the group. The Kafka consumer group is enabled.
              ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE;
        
              DTSConsumerSubscribeDemo consumerDemo = new DTSConsumerSubscribeDemo(brokerUrl, topic, sid, userName, password, initCheckpoint, subscribeMode);
              consumerDemo.start();
          }
      ******

      Parameter

      Description

      How to obtain

      brokerUrl

      Specifies the network address and port number of the change tracking channel.

      Note
      • If the server where you deploy the SDK client, such as an ECS instance, and the change tracking instance are in the same virtual private cloud (VPC), you can consume data over the VPC to reduce network latency.

      • We do not recommend using a public endpoint because of potential network instability.

      In the DTS console, click the ID of the target subscription instance. On the Basic Information page, obtain the network address and port number from the Network section.

      topic

      The topic of the change tracking channel.

      In the DTS console, click the target subscription instance ID. On the Basic Information page, go to the Basic Information section and obtain the Topic.

      sid

      The ID of the consumer group.

      In the DTS console, click the target subscription instance ID. On the Consume Data page, obtain the Consumer Group ID/Name and Account.

      userName

      The username of the consumer group.

      Warning

      If you do not use the client provided in this topic, set the username in the <consumer group username>-<consumer group ID> format (for example, dtstest-dtsae******bpv). Otherwise, the connection will fail.

      password

      The password for the account.

      The password that you set for the consumer group username when you created the consumer group.

      initCheckpoint

      The consumer offset. This is the UNIX timestamp of the first data record for the SDK client to consume, such as 1620962769.

      Note

      You can use the consumer offset to:

      • Resume data consumption from a specific offset after your application is interrupted to prevent data loss.

      • Adjust the starting offset for data consumption as needed.

      The consumer offset must be within the timestamp range of the tracking instance and must be converted to a UNIX timestamp.

      Note

      The Data Range column in the tracking task list shows the timestamp range of the target subscription instance.

      subscribeMode

      The mode in which the SDK client is used. You do not need to modify this parameter.

      • ConsumerContext.ConsumerSubscribeMode.ASSIGN: ASSIGN mode.

      • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: SUBSCRIBE mode.

      N/A

  4. Open the project structure in your IDE and make sure that the OpenJDK version for the project is 1.8.

  5. Run the client code.

    Note

    When you run the code for the first time, the IDE needs some time to automatically load the required plugins and dependencies.

    Sample results (click to expand)

    Normal running results

    If the following result is returned, the client is running correctly and is ready to subscribe to data changes from the source database.

    ******
    [2025-02-25 18:47:22.991] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.consumer.KafkaConsumer:1587] - [Consumer clientId=consumer-dtsl5vy2ao5250****-1, groupId=dtsl5vy2ao5250****] Seeking to offset 8200 for partition cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0
    [2025-02-25 18:47:22.993] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap:116] - RecordFetcher consumer:  subscribe for [cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0] with checkpoint [Checkpoint[ topicPartition: cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0timestamp: 174048****, offset: 8200, info: 174048****]] start
    [2025-02-25 18:47:23.011] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174048044****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8200]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8201]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    ******

    Normal subscription results

    If the following result is returned, the client has successfully subscribed to data changes (an UPDATE operation) from the source database.

    ******
    [2025-02-25 18:48:24.905] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8413]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [UPDATE]
    Schema info [{, 
    recordFields= [{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}, {fieldName='name', rawDataTypeNum=253, isPrimaryKey=false, isUniqueKey=false, fieldPosition=1}], 
    databaseName='dtsdb', 
    tableName='person', 
    primaryIndexInfo [[indexType=PrimaryKey, indexFields=[{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}], cardinality=0, nullable=true, isFirstUniqueIndex=false, name=null]], 
    uniqueIndexInfo [[]], 
    partitionFields = null}]
    Before image {[Field [id] [3]
    Field [name] [test1]
    ]}
    After image {[Field [id] [3]
    Field [name] [test2]
    ]}
    ******

    Abnormal running results

    If the following result is returned, the client cannot connect to the source database.

    ******
    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:22.002] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****-1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:22.509] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    [2025-02-25 18:22:23.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":1740478943160,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:27.192] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:27.618] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    ******

    The SDK client periodically collects and displays statistics about data consumption. These statistics include the total number of data records sent and received, the total data volume, and the number of records per second (RPS).

    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}

    Parameter

    Description

    outCounts

    The total number of data records consumed by the SDK client.

    outBytes

    The total volume of data consumed by the SDK client, in bytes.

    outRps

    The number of requests per second for data consumption by the SDK client.

    outBps

    The number of bits transmitted per second for data consumption by the SDK client.

    count

    The total number of parameters in the data consumption information (metrics).

    Note

    This does not include count itself.

    inBytes

    The total volume of data sent by the DTS server, in bytes.

    DStoreRecordQueue

    The current size of the data cache queue when the DTS server sends data.

    inCounts

    The total number of data records sent by the DTS server.

    inBps

    The number of bits transmitted per second when the DTS server sends data.

    inRps

    The number of requests per second when the DTS server sends data.

    __dt

    The timestamp when the SDK client receives the data, in milliseconds.

    DefaultUserRecordQueue

    The size of the data cache queue after serialization.

  6. Edit the code to consume subscribed data as needed.

    When you consume subscribed data, you need to manage consumer offsets to prevent data loss, minimize data duplication, and enable on-demand consumption.

FAQ

  • What can I do if I cannot connect to a subscription instance?

    Troubleshoot the issue based on the error message. For more information, see Troubleshooting.

  • What is the data format of a consumer offset after persistence?

    After a consumer offset is persisted, the data is returned in JSON format. The persisted consumer offset is a Unix timestamp that you can pass directly to the SDK. For example, in the returned data, the value 1700709977 for the "timestamp" key is the persisted consumer offset.

    {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}
  • Can a tracking task be consumed by multiple clients in parallel?

    No. Although the SUBSCRIBE mode allows multiple clients to run in parallel, only one client can consume data at a time.

  • Which version of the Kafka client is encapsulated in the SDK code?

    Version 2.0.0 and later of dts-new-subscribe-sdk encapsulate Kafka client (kafka-clients) 2.7.0. Versions earlier than 2.0.0 encapsulate Kafka client 1.0.0.

    Note

    If you use a dependency package vulnerability detection tool in your application development process and find that the Kafka client (kafka-clients) encapsulated by dts-new-subscribe-sdk has a security vulnerability, you can resolve this vulnerability by replacing the client with the 2.1.4-shaded version.

    <dependency>
        <groupId>com.aliyun.dts</groupId>
        <artifactId>dts-new-subscribe-sdk</artifactId>
        <version>2.1.4-shaded</version>
    </dependency>

Appendix

Manage consumer offsets

When the SDK client starts for the first time, restarts, or retries internally, you must query and pass a consumer offset to start or resume data consumption. The consumer offset is the UNIX timestamp of the first data record that the SDK client will consume.

To reset the consumer offset of the client, you can query and modify the consumer offset based on the consumption mode (SDK usage mode) as described in the following table.

Scenario

SDK usage mode

Offset management method

Query a consumer offset

ASSIGN mode, SUBSCRIBE mode

  • Because the SDK client saves the message offset every 5 seconds and commits it to the DTS server, you can query the last consumer offset from one of the following locations:

    • The localCheckpointStore file on the server where the SDK client is located.

    • The Data Consumption page of the change tracking channel.

  • If you have configured an external persistent shared storage medium, such as a database, in the consumerContext.java file using setUserRegisteredStore(new UserMetaStore()), this storage medium saves the message offset every 5 seconds for you to query.

The SDK client starts for the first time. You must pass a consumer offset to consume data.

ASSIGN mode, SUBSCRIBE mode

Depending on the usage pattern of the SDK client, select the Java file DTSConsumerAssignDemo.java or DTSConsumerSubscribeDemo.java, and configure the consumer offset (initCheckpoint) to consume data.

The SDK client retries internally. You must pass the last recorded consumer offset to resume data consumption.

ASSIGN mode

Search for the last recorded consumer offset in the following order. If an offset is found, the offset information is returned:

  1. The external storage medium that you configured using setUserRegisteredStore(new UserMetaStore()) in the consumerContext.java file.

  2. The localCheckpointStore file on the server where the SDK client is located.

  3. The start timestamp that you pass to initCheckpoint in the DTSConsumerSubscribeDemo.java file.

SUBSCRIBE mode

Search for the last recorded consumer offset in the following order. If an offset is found, the offset information is returned:

  1. The external storage medium that you configured in the consumerContext.java file using setUserRegisteredStore(new UserMetaStore()).

  2. The offset saved on the DTS Server (incremental data ingestion module).

    Note

    This offset is updated only after the SDK client calls the commit method to update the consumer offset.

  3. The start timestamp that you pass to initCheckpoint in the DTSConsumerSubscribeDemo.java file.

  4. The start offset of the DTS Server (new incremental data ingestion module).

    Important

    If the incremental data ingestion module is switched, the new module cannot save the client's last consumer offset. This may cause data consumption to start from an older offset. We recommend that you persistently store the consumer offset on the client.

The SDK client is restarted. You must pass the last recorded consumer offset to resume data consumption.

ASSIGN mode

Based on the setForceUseCheckpoint configuration in the consumerContext.java file, the consumer offset is queried, and if found, the offset information is returned:

  • When set to true, the SDK client will use the passed-in initCheckpoint as the consumer offset every time it is restarted.

  • When configured as false or not configured, find the consumer offset of the previous record in the following order:

    1. The localCheckpointStore file on the server where the SDK client is located.

    2. The offset saved on the DTS Server (incremental data ingestion module).

      Note

      This offset is updated only after the SDK client calls the commit method to update the consumer offset.

    3. The external storage medium that you configured in the consumerContext.java file using setUserRegisteredStore(new UserMetaStore()).

SUBSCRIBE mode

In this mode, in the consumerContext.java file, the setForceUseCheckpoint configuration does not take effect. Find the consumer offset of the previous record in the following order:

  1. The external storage medium configured in the consumerContext.java file with setUserRegisteredStore(new UserMetaStore()).

  2. The offset saved on the DTS Server (incremental data ingestion module).

    Note

    This offset is updated only after the SDK client calls the commit method to update the consumer offset.

  3. The start timestamp that you pass to initCheckpoint in the DTSConsumerSubscribeDemo.java file.

  4. The start offset of the DTS Server (new incremental data ingestion module).

Store the consumer offset with persistence

If a disaster recovery switchover occurs for the incremental data ingestion module, the new module cannot save the client's last consumer offset. This is especially true in SUBSCRIBE mode. As a result, the client may start consuming data from an earlier offset, leading to repeated consumption of historical data. For example, assume that before a service switchover, the offset range of the old module is from 08:00:00 on November 11, 2023, to 08:00:00 on November 12, 2023, and the client's consumer offset is 08:00:00 on November 12, 2023. After the switchover, the offset range of the new module is from 10:00:00 on November 08, 2023, to 08:01:00 on November 12, 2023. In this scenario, the client starts consumption from the start offset of the new module (10:00:00 on November 08, 2023), which results in repeated data consumption.

To prevent repeated consumption of historical data in this switchover scenario, we recommend that you configure a persistent storage method for the consumer offset on the client. The following sample method is provided for reference. You can modify it as needed.

  1. Create a UserMetaStore() method that inherits and implements the AbstractUserMetaStore() method.

    For example, you can use a MySQL database to store offset information. The following sample Java code shows how to do this:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
            String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
            String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                rs = pres.executeQuery();
                              
                if (rs.next()) {
                    String checkpoint = rs.getString("checkpoint");
                    return checkpoint;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
            return null;
        }
    }
    
  2. In the consumerContext.java file, call the setUserRegisteredStore(new UserMetaStore()) method to configure the external storage medium.

Troubleshooting

Exception

Error message

Cause

Solution

Connection failure

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

The brokerUrl is incorrect.

Enter the correct brokerUrl, userName, and password. For more information, see Parameter descriptions.

telnet real node *** failed, please check the network

You cannot connect to the real IP address using the broker address.

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

The username or password is incorrect.

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

In the consumerContext.java file, setUseCheckpoint is set to true, but the consumer offset is not within the timestamp range of the subscription instance.

Enter a consumer offset within the timestamp range of the subscription instance. For more information about the query method, see Parameter description.

Slowdown in subscription consumption

N/A

  • Analyze why data consumption has slowed down by querying the parameters in the statistics information for the size of the DStoreRecordQueue and DefaultUserRecordQueue queues.

    • If the value of DStoreRecordQueue remains 0, the speed at which the DTS server pulls data is slow.

    • If the value of DefaultUserRecordQueue remains at the default value of 512, the speed at which the SDK client consumes data is slow.

  • Reset the offset by modifying the consumer offset (initCheckpoint) in the code as needed.