All Products
Search
Document Center

Data Transmission Service:Consume subscribed data using an SDK

Last Updated:Mar 28, 2026

Use the DTS SDK for Java to consume change data captured by a change tracking channel. Download the SDK, configure the client, and run it to start receiving database change events.

For Python and Go examples, see dts-subscribe-demo. For PolarDB-X 1.0 or DMS LogicDB data sources, see Use an SDK to consume subscribed data from PolarDB-X 1.0.

Prerequisites

Before you begin, ensure that you have:

Usage notes

  • Call DefaultUserRecord.commit() after processing each record to commit the offset. Skipping this step causes repeated data consumption on restart.

  • Different consumption processes are independent of each other.

  • Current Offset in the DTS console shows the offset the change tracking task has subscribed to, not the offset committed by your client.

Choose a consumption mode

Before setting up the SDK client, decide which mode fits your use case:

ModeJava fileBehaviorWhen to use
ASSIGNDTSConsumerAssignDemo.javaAssigns partition 0 to a single client, guaranteeing global message orderOne SDK client per consumer group
SUBSCRIBEDTSConsumerSubscribeDemo.javaAllows multiple clients in the same consumer group. If the active client fails, another is automatically assigned to partition 0Multiple clients for disaster recovery (only one consumes at a time)
DTS assigns only one partition (partition 0) to each change tracking topic to ensure global message order. In SUBSCRIBE mode, multiple clients can run in parallel for failover, but only one actively consumes data.

Set up and run the SDK client

Step 1: Download and update the SDK

  1. Download the aliyun-dts-subscribe-sdk-java repository and decompress the package.

  2. Open pom.xml in the decompressed directory and update dts-new-subscribe-sdk to the latest version. Find the latest version on the dts-new-subscribe-sdk Maven page. The version entry in pom.xml looks like this:

    <groupId>com.aliyun.dts</groupId>
    <artifactId>dts-new-subscribe-sdk</artifactId>
    <version>2.1.4</version>

Step 2: Configure the SDK client

  1. Open the decompressed project in your integrated development environment (IDE).

  2. Navigate to aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/ and open the Java file for your chosen mode: DTSConsumerAssignDemo.java or DTSConsumerSubscribeDemo.java.

  3. Set the connection parameters in the main method:

    ParameterDescriptionWhere to find it
    brokerUrlNetwork address and port of the change tracking channel. Use the virtual private cloud (VPC) address when the SDK client and change tracking instance are in the same VPC to reduce latency. Avoid public endpoints due to potential network instability.DTS console → subscription instance ID → Basic Information page → Network section
    topicTopic of the change tracking channelDTS console → subscription instance ID → Basic Information page → Basic Information section → Topic
    sidConsumer group IDDTS console → subscription instance ID → Consume Data page → Consumer Group ID/Name
    userNameConsumer group username. If you are not using the provided SDK client, format the username as <consumer group username>-<consumer group ID> (for example, dtstest-dtsae******bpv).DTS console → subscription instance ID → Consume Data page → Account
    passwordConsumer group passwordThe password set when you created the consumer group
    initCheckpointUNIX timestamp of the first record to consume. Must be within the Data Range shown in the tracking task list. Use this to resume from a specific point after an interruption or to adjust the starting offset.Convert the target time to a UNIX timestamp
    subscribeModeSDK client mode. Set to ASSIGN or SUBSCRIBE to match the Java file you opened. Do not change this after initial configuration.N/A
    public static void main(String[] args) {
        // Kafka broker address and port of the change tracking channel
        String brokerUrl = "dts-cn-***.com:18001";
        // Topic of the change tracking channel (partition 0)
        String topic = "cn_***_version2";
        // Consumer group credentials
        String sid = "dts***";
        String userName = "dts***";
        String password = "DTS***";
        // Starting offset: UNIX timestamp of the first record to consume
        // Example: 1566180200 starts consumption from 10:03:21 CST on Monday, August 19, 2019
        String initCheckpoint = "1740472***";
        // Consumption mode — matches the file you opened
        ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE;
    
        DTSConsumerSubscribeDemo consumerDemo = new DTSConsumerSubscribeDemo(brokerUrl, topic, sid, userName, password, initCheckpoint, subscribeMode);
        consumerDemo.start();
    }
  4. In your IDE, open the project structure settings and confirm that the project uses OpenJDK 1.8.

Step 3: Run the client

Run the project. The IDE downloads required plugins and dependencies on the first run, which may take a moment.

Normal running results

When the client is running correctly and ready to consume change data, you should see output like this:

[2025-02-25 18:47:22.991] [INFO ] [...KafkaRecordFetcher] - [Consumer clientId=consumer-dtsl5vy2ao5250****-1, groupId=dtsl5vy2ao5250****] Seeking to offset 8200 for partition cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0
[2025-02-25 18:47:23.226] [INFO ] [...EtlRecordProcessor] [...DefaultRecordPrintListener:49] -
RecordID [8200]
RecordTimestamp [174048****]
Source [{"sourceType": "MySQL", "version": "8.0.36"}]
RecordType [HEARTBEAT]

Normal subscription results

When the client successfully receives a data change (for example, an UPDATE), the output includes the before and after images:

[2025-02-25 18:48:24.905] [INFO ] [...EtlRecordProcessor] [...DefaultRecordPrintListener:49] -
RecordID [8413]
RecordTimestamp [174048****]
Source [{"sourceType": "MySQL", "version": "8.0.36"}]
RecordType [UPDATE]
Schema info [{, recordFields= [{fieldName='id', ...}, {fieldName='name', ...}], databaseName='dtsdb', tableName='person', ...}]
Before image {[Field [id] [3] Field [name] [test1]]}
After image {[Field [id] [3] Field [name] [test2]]}

Abnormal running results

If the following result is returned, the client cannot connect to the source database.

******
[2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}
[2025-02-25 18:22:22.002] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****-1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
[2025-02-25 18:22:22.509] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
[2025-02-25 18:22:23.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":1740478943160,"DefaultUserRecordQueue":0.0}
[2025-02-25 18:22:27.192] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
[2025-02-25 18:22:27.618] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
******

The client also logs consumption metrics every few seconds:

{"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174048044****,"DefaultUserRecordQueue":0.0}
MetricDescription
outCountsTotal records consumed by the SDK client
outBytesTotal data volume consumed, in bytes
outRpsRecords consumed per second
outBpsBits consumed per second
countTotal number of metric fields (excludes count itself)
inBytesTotal data volume sent by the DTS server, in bytes
DStoreRecordQueueCurrent size of the data cache queue on the DTS server
inCountsTotal records sent by the DTS server
inBpsBits per second sent by the DTS server
inRpsRecords per second sent by the DTS server
__dtTimestamp when the SDK client received the data, in milliseconds
DefaultUserRecordQueueSize of the post-serialization data cache queue (default maximum: 512)

Step 4: Process subscribed data

Edit the client code to handle the change records according to your application logic. Manage consumer offsets carefully to prevent data loss, minimize duplication, and support on-demand consumption starting points.

Manage consumer offsets

The consumer offset is the UNIX timestamp of the first record the SDK client will consume. The SDK saves offsets every 5 seconds and commits them to the DTS server.

Query and reset offsets

Query the current offset from either of these locations:

  • The localCheckpointStore file on the server running the SDK client

  • The Data Consumption page of the change tracking channel in the DTS console

If you configured an external persistent storage using setUserRegisteredStore(new UserMetaStore()) in consumerContext.java, query the offset from that storage instead.

Offset lookup order on restart or retry

The SDK client searches for a saved offset in a specific order when it starts, retries internally, or restarts. The order depends on the mode:

ASSIGN mode — internal retry:

  1. External storage configured with setUserRegisteredStore(new UserMetaStore())

  2. localCheckpointStore file on the SDK client server

  3. initCheckpoint passed in DTSConsumerSubscribeDemo.java

ASSIGN mode — client restart:

The behavior depends on setForceUseCheckpoint in consumerContext.java:

  • true: Always uses the initCheckpoint value on every restart.

  • false (default): Searches in this order:

    1. localCheckpointStore file on the SDK client server

    2. Offset saved on the DTS server (updated only after commit() is called)

    3. External storage configured with setUserRegisteredStore(new UserMetaStore())

SUBSCRIBE mode — internal retry or restart:

setForceUseCheckpoint has no effect in SUBSCRIBE mode.
  1. External storage configured with setUserRegisteredStore(new UserMetaStore())

  2. Offset saved on the DTS server (updated only after commit() is called)

  3. initCheckpoint passed in DTSConsumerSubscribeDemo.java

  4. Start offset of the DTS server's incremental data ingestion module

Important

If the incremental data ingestion module switches over (for example, during disaster recovery), the new module does not retain the client's last offset. The client may start consuming from an earlier offset, causing duplicate consumption. Store offsets persistently on the client to prevent this.

Store offsets persistently

Persistent offset storage prevents duplicate consumption after a module switchover. The following example stores offsets in a MySQL database.

  1. Create a UserMetaStore class that extends AbstractUserMetaStore:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
            String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
            String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                rs = pres.executeQuery();
    
                if (rs.next()) {
                    String checkpoint = rs.getString("checkpoint");
                    return checkpoint;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
            return null;
        }
    }
  2. In consumerContext.java, register the store:

    consumerContext.setUserRegisteredStore(new UserMetaStore());

The persisted offset is stored as JSON. The timestamp value (for example, 1700709977) is the UNIX timestamp you pass directly to the SDK as initCheckpoint:

{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}

Troubleshooting

Connection failures

Error messageCauseSolution
telnet dts-cn-hangzhou.aliyuncs.com:18009 failed, please check the network and if the brokerUrl is correctIncorrect brokerUrlEnter the correct brokerUrl, userName, and password. For more information, see Parameter descriptions.
telnet real node *** failed, please check the networkCannot connect to the broker's real IP address
build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrongIncorrect username or passwordVerify userName and password. If not using the provided SDK client, use the format <consumer group username>-<consumer group ID>.
TimestampSeekException: seek timestamp for topic [...] failedsetUseCheckpoint=true but initCheckpoint is outside the subscription instance's timestamp rangeSet initCheckpoint to a value within the Data Range shown in the tracking task list. For more information, see Parameter description.

Slow consumption

If data consumption is slower than expected, check the DStoreRecordQueue and DefaultUserRecordQueue values in the metrics output:

  • `DStoreRecordQueue` stays at 0: The DTS server is pulling data slowly. Check server-side performance.

  • `DefaultUserRecordQueue` stays at 512: The SDK client is processing data slowly. Reset the offset by adjusting initCheckpoint in the code as needed.

FAQ

What is the data format of a persisted consumer offset?

The offset is stored as JSON with the timestamp field containing the UNIX timestamp. Pass this value directly to the SDK as initCheckpoint. For example: {"groupID":"dtsglg11d48230*","streamCheckpoint":[{"partition":0,"offset":577989,...,"timestamp":170070**,...}]}.

Can multiple clients consume from the same change tracking task in parallel?

Not at the same time. SUBSCRIBE mode allows multiple clients to run simultaneously for failover, but only one client actively consumes data. If the active client fails, another client is automatically assigned.

Which Kafka client version does the SDK use?

dts-new-subscribe-sdk 2.0.0 and later encapsulate kafka-clients 2.7.0. Earlier versions encapsulate kafka-clients 1.0.0.

If a vulnerability scanner flags the bundled kafka-clients, switch to the 2.1.4-shaded version, which shades the Kafka dependency:

<dependency>
    <groupId>com.aliyun.dts</groupId>
    <artifactId>dts-new-subscribe-sdk</artifactId>
    <version>2.1.4-shaded</version>
</dependency>