Use the DTS SDK for Java to consume change data captured by a change tracking channel. Download the SDK, configure the client, and run it to start receiving database change events.
For Python and Go examples, see dts-subscribe-demo. For PolarDB-X 1.0 or DMS LogicDB data sources, see Use an SDK to consume subscribed data from PolarDB-X 1.0.
Prerequisites
Before you begin, ensure that you have:
A subscription instance in the Normal state. To create one, see Subscription plan overview
A consumer group created for the subscription instance. See Create consumer groups
(If using a RAM user) AliyunDTSFullAccess permission and access permissions to the subscribed objects. See Grant permissions to a RAM user to manage DTS using a system policy and Manage the permissions of a RAM user
Usage notes
Call
DefaultUserRecord.commit()after processing each record to commit the offset. Skipping this step causes repeated data consumption on restart.Different consumption processes are independent of each other.
Current Offset in the DTS console shows the offset the change tracking task has subscribed to, not the offset committed by your client.
Choose a consumption mode
Before setting up the SDK client, decide which mode fits your use case:
| Mode | Java file | Behavior | When to use |
|---|---|---|---|
| ASSIGN | DTSConsumerAssignDemo.java | Assigns partition 0 to a single client, guaranteeing global message order | One SDK client per consumer group |
| SUBSCRIBE | DTSConsumerSubscribeDemo.java | Allows multiple clients in the same consumer group. If the active client fails, another is automatically assigned to partition 0 | Multiple clients for disaster recovery (only one consumes at a time) |
DTS assigns only one partition (partition 0) to each change tracking topic to ensure global message order. In SUBSCRIBE mode, multiple clients can run in parallel for failover, but only one actively consumes data.
Set up and run the SDK client
Step 1: Download and update the SDK
Download the aliyun-dts-subscribe-sdk-java repository and decompress the package.
Open
pom.xmlin the decompressed directory and updatedts-new-subscribe-sdkto the latest version. Find the latest version on the dts-new-subscribe-sdk Maven page. The version entry inpom.xmllooks like this:<groupId>com.aliyun.dts</groupId> <artifactId>dts-new-subscribe-sdk</artifactId> <version>2.1.4</version>
Step 2: Configure the SDK client
Open the decompressed project in your integrated development environment (IDE).
Navigate to
aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/and open the Java file for your chosen mode:DTSConsumerAssignDemo.javaorDTSConsumerSubscribeDemo.java.Set the connection parameters in the
mainmethod:Parameter Description Where to find it brokerUrlNetwork address and port of the change tracking channel. Use the virtual private cloud (VPC) address when the SDK client and change tracking instance are in the same VPC to reduce latency. Avoid public endpoints due to potential network instability. DTS console → subscription instance ID → Basic Information page → Network section topicTopic of the change tracking channel DTS console → subscription instance ID → Basic Information page → Basic Information section → Topic sidConsumer group ID DTS console → subscription instance ID → Consume Data page → Consumer Group ID/Name userNameConsumer group username. If you are not using the provided SDK client, format the username as <consumer group username>-<consumer group ID>(for example,dtstest-dtsae******bpv).DTS console → subscription instance ID → Consume Data page → Account passwordConsumer group password The password set when you created the consumer group initCheckpointUNIX timestamp of the first record to consume. Must be within the Data Range shown in the tracking task list. Use this to resume from a specific point after an interruption or to adjust the starting offset. Convert the target time to a UNIX timestamp subscribeModeSDK client mode. Set to ASSIGNorSUBSCRIBEto match the Java file you opened. Do not change this after initial configuration.N/A public static void main(String[] args) { // Kafka broker address and port of the change tracking channel String brokerUrl = "dts-cn-***.com:18001"; // Topic of the change tracking channel (partition 0) String topic = "cn_***_version2"; // Consumer group credentials String sid = "dts***"; String userName = "dts***"; String password = "DTS***"; // Starting offset: UNIX timestamp of the first record to consume // Example: 1566180200 starts consumption from 10:03:21 CST on Monday, August 19, 2019 String initCheckpoint = "1740472***"; // Consumption mode — matches the file you opened ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE; DTSConsumerSubscribeDemo consumerDemo = new DTSConsumerSubscribeDemo(brokerUrl, topic, sid, userName, password, initCheckpoint, subscribeMode); consumerDemo.start(); }In your IDE, open the project structure settings and confirm that the project uses OpenJDK 1.8.
Step 3: Run the client
Run the project. The IDE downloads required plugins and dependencies on the first run, which may take a moment.
Normal running results
When the client is running correctly and ready to consume change data, you should see output like this:
[2025-02-25 18:47:22.991] [INFO ] [...KafkaRecordFetcher] - [Consumer clientId=consumer-dtsl5vy2ao5250****-1, groupId=dtsl5vy2ao5250****] Seeking to offset 8200 for partition cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0
[2025-02-25 18:47:23.226] [INFO ] [...EtlRecordProcessor] [...DefaultRecordPrintListener:49] -
RecordID [8200]
RecordTimestamp [174048****]
Source [{"sourceType": "MySQL", "version": "8.0.36"}]
RecordType [HEARTBEAT]Normal subscription results
When the client successfully receives a data change (for example, an UPDATE), the output includes the before and after images:
[2025-02-25 18:48:24.905] [INFO ] [...EtlRecordProcessor] [...DefaultRecordPrintListener:49] -
RecordID [8413]
RecordTimestamp [174048****]
Source [{"sourceType": "MySQL", "version": "8.0.36"}]
RecordType [UPDATE]
Schema info [{, recordFields= [{fieldName='id', ...}, {fieldName='name', ...}], databaseName='dtsdb', tableName='person', ...}]
Before image {[Field [id] [3] Field [name] [test1]]}
After image {[Field [id] [3] Field [name] [test2]]}Abnormal running results
If the following result is returned, the client cannot connect to the source database.
******
[2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}
[2025-02-25 18:22:22.002] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****-1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
[2025-02-25 18:22:22.509] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
[2025-02-25 18:22:23.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":1740478943160,"DefaultUserRecordQueue":0.0}
[2025-02-25 18:22:27.192] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
[2025-02-25 18:22:27.618] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
******
The client also logs consumption metrics every few seconds:
{"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174048044****,"DefaultUserRecordQueue":0.0}| Metric | Description |
|---|---|
outCounts | Total records consumed by the SDK client |
outBytes | Total data volume consumed, in bytes |
outRps | Records consumed per second |
outBps | Bits consumed per second |
count | Total number of metric fields (excludes count itself) |
inBytes | Total data volume sent by the DTS server, in bytes |
DStoreRecordQueue | Current size of the data cache queue on the DTS server |
inCounts | Total records sent by the DTS server |
inBps | Bits per second sent by the DTS server |
inRps | Records per second sent by the DTS server |
__dt | Timestamp when the SDK client received the data, in milliseconds |
DefaultUserRecordQueue | Size of the post-serialization data cache queue (default maximum: 512) |
Step 4: Process subscribed data
Edit the client code to handle the change records according to your application logic. Manage consumer offsets carefully to prevent data loss, minimize duplication, and support on-demand consumption starting points.
Manage consumer offsets
The consumer offset is the UNIX timestamp of the first record the SDK client will consume. The SDK saves offsets every 5 seconds and commits them to the DTS server.
Query and reset offsets
Query the current offset from either of these locations:
The
localCheckpointStorefile on the server running the SDK clientThe Data Consumption page of the change tracking channel in the DTS console
If you configured an external persistent storage using setUserRegisteredStore(new UserMetaStore()) in consumerContext.java, query the offset from that storage instead.
Offset lookup order on restart or retry
The SDK client searches for a saved offset in a specific order when it starts, retries internally, or restarts. The order depends on the mode:
ASSIGN mode — internal retry:
External storage configured with
setUserRegisteredStore(new UserMetaStore())localCheckpointStorefile on the SDK client serverinitCheckpointpassed inDTSConsumerSubscribeDemo.java
ASSIGN mode — client restart:
The behavior depends on setForceUseCheckpoint in consumerContext.java:
true: Always uses theinitCheckpointvalue on every restart.false(default): Searches in this order:localCheckpointStorefile on the SDK client serverOffset saved on the DTS server (updated only after
commit()is called)External storage configured with
setUserRegisteredStore(new UserMetaStore())
SUBSCRIBE mode — internal retry or restart:
setForceUseCheckpoint has no effect in SUBSCRIBE mode.External storage configured with
setUserRegisteredStore(new UserMetaStore())Offset saved on the DTS server (updated only after
commit()is called)initCheckpointpassed inDTSConsumerSubscribeDemo.javaStart offset of the DTS server's incremental data ingestion module
If the incremental data ingestion module switches over (for example, during disaster recovery), the new module does not retain the client's last offset. The client may start consuming from an earlier offset, causing duplicate consumption. Store offsets persistently on the client to prevent this.
Store offsets persistently
Persistent offset storage prevents duplicate consumption after a module switchover. The following example stores offsets in a MySQL database.
Create a
UserMetaStoreclass that extendsAbstractUserMetaStore:public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); rs = pres.executeQuery(); if (rs.next()) { String checkpoint = rs.getString("checkpoint"); return checkpoint; } } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } return null; } }In
consumerContext.java, register the store:consumerContext.setUserRegisteredStore(new UserMetaStore());
The persisted offset is stored as JSON. The timestamp value (for example, 1700709977) is the UNIX timestamp you pass directly to the SDK as initCheckpoint:
{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}Troubleshooting
Connection failures
| Error message | Cause | Solution |
|---|---|---|
telnet dts-cn-hangzhou.aliyuncs.com:18009 failed, please check the network and if the brokerUrl is correct | Incorrect brokerUrl | Enter the correct brokerUrl, userName, and password. For more information, see Parameter descriptions. |
telnet real node *** failed, please check the network | Cannot connect to the broker's real IP address | — |
build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong | Incorrect username or password | Verify userName and password. If not using the provided SDK client, use the format <consumer group username>-<consumer group ID>. |
TimestampSeekException: seek timestamp for topic [...] failed | setUseCheckpoint=true but initCheckpoint is outside the subscription instance's timestamp range | Set initCheckpoint to a value within the Data Range shown in the tracking task list. For more information, see Parameter description. |
Slow consumption
If data consumption is slower than expected, check the DStoreRecordQueue and DefaultUserRecordQueue values in the metrics output:
`DStoreRecordQueue` stays at 0: The DTS server is pulling data slowly. Check server-side performance.
`DefaultUserRecordQueue` stays at 512: The SDK client is processing data slowly. Reset the offset by adjusting
initCheckpointin the code as needed.
FAQ
What is the data format of a persisted consumer offset?
The offset is stored as JSON with the timestamp field containing the UNIX timestamp. Pass this value directly to the SDK as initCheckpoint. For example: {"groupID":"dtsglg11d48230*","streamCheckpoint":[{"partition":0,"offset":577989,...,"timestamp":170070**,...}]}.
Can multiple clients consume from the same change tracking task in parallel?
Not at the same time. SUBSCRIBE mode allows multiple clients to run simultaneously for failover, but only one client actively consumes data. If the active client fails, another client is automatically assigned.
Which Kafka client version does the SDK use?
dts-new-subscribe-sdk 2.0.0 and later encapsulate kafka-clients 2.7.0. Earlier versions encapsulate kafka-clients 1.0.0.
If a vulnerability scanner flags the bundled kafka-clients, switch to the 2.1.4-shaded version, which shades the Kafka dependency:
<dependency>
<groupId>com.aliyun.dts</groupId>
<artifactId>dts-new-subscribe-sdk</artifactId>
<version>2.1.4-shaded</version>
</dependency>