After you configure a change tracking channel by creating a tracking task and a consumer group, you can use the software development kit (SDK) provided by DTS to consume the tracked data. This topic describes how to use the sample code.
If the data source is PolarDB-X 1.0 or DMS LogicDB, see Consume tracked data from PolarDB-X 1.0 using an SDK.
This topic provides sample code for an SDK client in Java. For sample code in Python and Go, see dts-subscribe-demo.
Prerequisites
A change tracking instance is created and is in the Normal state.
NoteFor instructions on how to create a subscription instance, see Subscription Plan Overview.
You have created a consumer group for the subscription instance.
If you use a Resource Access Management (RAM) user to consume the tracked data, the RAM user must have the AliyunDTSFullAccess permission and access privileges on the tracked objects. For more information about how to grant permissions, see Grant permissions to a RAM user to manage DTS using a system policy and Manage the permissions of a RAM user.
Notes
When you consume tracked data, call the `commit` method of `DefaultUserRecord` to commit the offset information. If you do not commit the offset, data may be consumed repeatedly.
Different consumption processes are independent of each other.
In the console, Current Offset indicates the offset that the tracking task has subscribed to, not the offset committed by the client.
Procedure
Download the SDK sample code file and decompress it.
Check the SDK code version.
Go to the directory where you decompressed the SDK sample code.
Use a text editor to open the pom.xml file in the directory.
Update the version of the change tracking SDK to the latest version.
NoteYou can view the latest Maven dependency on the dts-new-subscribe-sdk page.
Edit the SDK code.
Use an IDE to open the decompressed file.
Open the Java file for the mode that the SDK client uses.
NoteThe path of the Java file is
aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/.Usage mode
Java file
Description
Scenarios
ASSIGN mode
DTSConsumerAssignDemo.java
To ensure that messages are globally ordered, DTS assigns only one partition, partition 0, to each tracking topic. If the SDK client uses the ASSIGN mode, start only one SDK client.
Only one SDK client in a consumer group consumes tracked data.
SUBSCRIBE mode
DTSConsumerSubscribeDemo.java
To ensure that messages are globally ordered, DTS assigns only one partition, partition 0, to each tracking topic. If the SDK client uses the SUBSCRIBE mode, you can start multiple SDK clients in the same consumer group to implement disaster recovery. The principle is that if a client that is consuming data in the consumer group fails, another SDK client is randomly and automatically assigned to partition 0 to continue data consumption.
Multiple SDK clients in the same consumer group consume tracked data at the same time. This applies to disaster recovery scenarios.
Set the parameters in the Java code.
Parameter
Description
How to obtain
brokerUrlThe network address and port of the change tracking channel.
NoteIf the server where you deploy the SDK client, such as an ECS instance, and the change tracking instance are in the same virtual private cloud (VPC), consume data over the VPC to reduce network latency.
We do not recommend using a public endpoint because of potential network instability.
In the DTS console, click the target subscription instance ID. On the Basic Information page, obtain the network address and port from the Network section.
topicThe topic for the change tracking channel subscription.
In the DTS console, click the target subscription instance ID. On the Basic Information page, obtain the Topic from the Basic Information section.
sidThe ID of the consumer group.
In the DTS console, click the target subscription instance ID. On the Consume Data page, obtain the Consumer Group ID/Name and the consumer group's Account information.
userNameThe username of the consumer group.
WarningIf you do not use the client provided in this topic, set the username in the
<Username of the consumer group>-<Consumer group ID>format. For example,dtstest-dtsae******bpv. Otherwise, the connection fails.passwordThe password for the account.
The password that you set for the consumer group username when you created the consumer group.
initCheckpointThe consumer offset. This is the timestamp when the SDK client consumes the first data record. The format is a UNIX timestamp. For example, 1620962769.
NoteThe consumer offset information can be used for the following purposes:
If the application is interrupted, you can pass the consumed offset to continue data consumption and prevent data loss.
When the tracking client starts, you can pass the required consumer offset to adjust the tracking offset and consume data as needed.
The consumer offset must be within the timestamp range of the change tracking instance and must be converted to a UNIX timestamp.
NoteYou can view the timestamp range of the target subscription instance in the Data Range column of the tracking task list.
subscribeModeThe usage mode of the SDK client. You do not need to modify this parameter.
ConsumerContext.ConsumerSubscribeMode.ASSIGN: ASSIGN mode.ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: SUBSCRIBE mode.
None
Open the project structure in your IDE and make sure that the OpenJDK version for the project is 1.8.
Run the client code.
NoteWhen you run the code for the first time, the IDE takes some time to automatically load the required plug-ins and dependencies.
The SDK client periodically collects and displays statistics information about data consumption, including the total number of sent and received data records, total data volume, and records per second (RPS).
[2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}Parameter
Description
outCountsThe total number of data records consumed by the SDK client.
outBytesThe total volume of data consumed by the SDK client, in bytes.
outRpsThe number of requests per second for data consumption by the SDK client.
outBpsThe number of bits transmitted per second for data consumption by the SDK client.
countThe total number of parameters in the metrics of the SDK client for data consumption.
NoteThis does not include
countitself.inBytesThe total volume of data sent by the DTS server, in bytes.
DStoreRecordQueueThe current size of the data cache queue when the DTS server sends data.
inCountsThe total number of data records sent by the DTS server.
inBpsThe number of bits transmitted per second when the DTS server sends data.
inRpsThe number of requests per second when the DTS server sends data.
__dtThe timestamp when the SDK client receives the data, in milliseconds.
DefaultUserRecordQueueThe current size of the data cache queue after the data is serialized.
Edit the code to consume the tracked data as needed.
When you consume tracked data, you must manage consumer offsets to prevent data loss, minimize repeated consumption, and consume data as needed.
FAQ
What should I do if I cannot connect to a subscription instance?
Troubleshoot the issue based on the error message. For more information, see Troubleshooting.
What is the data format of a persisted consumer offset?
After a consumer offset is persisted, the data is returned in JSON format. The persisted consumer offset is a UNIX timestamp that you can pass directly to the SDK. In the following returned data,
1700709977after"timestamp"is the persisted consumer offset.{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}Can multiple clients consume data in parallel for a tracking task?
No. The SUBSCRIBE mode allows multiple clients to run in parallel, but only one client can consume data at a time.
Which version of the Kafka client is encapsulated in the SDK code?
dts-new-subscribe-sdk versions 2.0.0 and later encapsulate Kafka client 2.7.0 (kafka-clients). Versions earlier than 2.0.0 encapsulate Kafka client 1.0.0.
Appendix
Manage consumer offsets
When the SDK client starts for the first time, restarts, or performs an internal retry, you must query and enter a consumer offset to start or resume data consumption. The consumer offset is the UNIX timestamp when the SDK client consumes the first data record.
To reset the consumer offset of the client, you can query and modify the consumer offset based on the SDK usage mode. The following table describes how to query and modify the consumer offset.
Scenario | SDK usage mode | Offset management method |
Query a consumer offset | ASSIGN mode, SUBSCRIBE mode |
|
The SDK client starts for the first time and requires a consumer offset to consume data. | ASSIGN mode, SUBSCRIBE mode | Based on the SDK client usage mode, select the DTSConsumerAssignDemo.java or DTSConsumerSubscribeDemo.java file and configure the consumer offset ( |
The SDK client needs to re-enter the last recorded consumer offset to continue data consumption due to an internal retry. | ASSIGN mode | Search for the last recorded consumer offset in the following order. If an offset is found, it is returned:
|
SUBSCRIBE mode | Search for the last recorded consumer offset in the following order. If an offset is found, it is returned:
| |
The SDK client is restarted and needs to re-enter the last recorded consumer offset to continue data consumption. | ASSIGN mode | Query the consumer offset based on the
|
SUBSCRIBE mode | In this mode, the
|
Persistently store consumer offsets
If the disaster recovery mechanism is triggered for the incremental data collection module, especially in SUBSCRIBE mode, the new incremental data collection module cannot save the last consumer offset of the client. This may cause the client to start consuming tracked data from an earlier offset, resulting in repeated consumption of historical data. For example, before the incremental data service is switched, the offset range of the old incremental data collection module is from 08:00:00 on November 11, 2023, to 08:00:00 on November 12, 2023, and the consumer offset of the client is 08:00:00 on November 12, 2023. After the incremental data service is switched, the offset range of the new incremental data collection module is from 10:00:00 on November 08, 2023, to 08:01:00 on November 12, 2023. In this case, the client starts consumption from the start offset of the new incremental data collection module (10:00:00 on November 08, 2023). This causes repeated consumption of historical data.
To prevent repeated consumption of historical data in such a switchover scenario, you can configure a persistent storage method for consumer offsets on the client. The following sample method shows how to do this. You can modify it as needed.
Create a
UserMetaStore()method that inherits from and implements theAbstractUserMetaStore()method.The following sample Java code shows how to use a MySQL database to store offset information:
public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); ResultSet rs = pres.executeQuery() String checkpoint = rs.getString("checkpoint"); return checkpoint; } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } }In the consumerContext.java file, configure an external storage medium in the
setUserRegisteredStore(new UserMetaStore())method.
Troubleshooting
Exception | Error message | Cause | Solution |
Connection failure | | The | Enter the correct |
| The real IP address cannot be connected through the broker address. | ||
| The username and password are incorrect. | ||
| In the consumerContext.java file, | Enter a consumer offset that is within the timestamp range of the change tracking instance. For more information about how to query the range, see Metric description. | |
The subscription consumption rate decreases. | None |
| |