After you configure a change tracking channel by creating a tracking task and a consumer group, you can use the software development kit (SDK) provided by DTS to consume the subscribed data. This topic describes how to use the sample code.
To consume subscribed data from a PolarDB-X 1.0 or DMS LogicDB data source, see Use an SDK to consume subscribed data from PolarDB-X 1.0.
This topic provides sample code for an SDK client in Java. For sample code in Python and Go, see dts-subscribe-demo.
Prerequisites
A subscription instance is created and running in the Normal state.
NoteFor instructions on creating a subscription instance, see Subscription Plan Overview.
You have created a consumer group for your subscription instance.
If you use a RAM user to consume subscribed data, the RAM user must have the AliyunDTSFullAccess permission and access permissions to the subscribed objects. For more information, see Grant permissions to a RAM user to manage DTS using a system policy and Manage the permissions of a RAM user.
Precautions
When you consume subscribed data, you must call the commit method of DefaultUserRecord to commit the offset information. Otherwise, data might be consumed repeatedly.
Different consumption processes are independent of each other.
In the console, Current Offset indicates the offset that the tracking task has subscribed to, not the offset committed by the client.
Procedure
Download the sample SDK code files and decompress the package.
Verify the SDK code version.
Go to the directory where you decompressed the sample SDK code.
Use a text editor to open the pom.xml file in the directory.
Update the change tracking SDK to the latest version.
NoteYou can find the latest Maven dependency on the dts-new-subscribe-sdk page.
Edit the SDK code.
Use an integrated development environment (IDE) to open the decompressed files.
Open the Java file that corresponds to the mode in which you want to use the SDK client.
NoteThe path of the Java file is
aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/.Usage mode
Java file
Description
Scenarios
ASSIGN mode
DTSConsumerAssignDemo.java
To ensure global message order, DTS assigns only one partition (partition 0) to each tracking topic. If you use the SDK client in ASSIGN mode, start only one SDK client.
Only one SDK client in a consumer group consumes subscribed data.
SUBSCRIBE mode
DTSConsumerSubscribeDemo.java
To ensure global message order, DTS assigns only one partition (partition 0) to each tracking topic. If you use the SDK client in SUBSCRIBE mode, you can start multiple SDK clients in the same consumer group for disaster recovery. If the client that is consuming data fails, another SDK client is randomly and automatically assigned to partition 0 to continue consumption.
Multiple SDK clients in the same consumer group consume subscribed data. This is a data disaster recovery scenario.
Set the parameters in the Java code.
Parameter
Description
How to obtain
brokerUrlSpecifies the network address and port number of the change tracking channel.
NoteIf the server where you deploy the SDK client, such as an ECS instance, and the change tracking instance are in the same virtual private cloud (VPC), you can consume data over the VPC to reduce network latency.
We do not recommend using a public endpoint because of potential network instability.
In the DTS console, click the ID of the target subscription instance. On the Basic Information page, obtain the network address and port number from the Network section.
topicThe topic of the change tracking channel.
In the DTS console, click the target subscription instance ID. On the Basic Information page, go to the Basic Information section and obtain the Topic.
sidThe ID of the consumer group.
In the DTS console, click the target subscription instance ID. On the Consume Data page, obtain the Consumer Group ID/Name and Account.
userNameThe username of the consumer group.
WarningIf you do not use the client provided in this topic, set the username in the
<consumer group username>-<consumer group ID>format (for example,dtstest-dtsae******bpv). Otherwise, the connection will fail.passwordThe password for the account.
The password that you set for the consumer group username when you created the consumer group.
initCheckpointThe consumer offset. This is the UNIX timestamp of the first data record for the SDK client to consume, such as 1620962769.
NoteYou can use the consumer offset to:
Resume data consumption from a specific offset after your application is interrupted to prevent data loss.
Adjust the starting offset for data consumption as needed.
The consumer offset must be within the timestamp range of the tracking instance and must be converted to a UNIX timestamp.
NoteThe Data Range column in the tracking task list shows the timestamp range of the target subscription instance.
subscribeModeThe mode in which the SDK client is used. You do not need to modify this parameter.
ConsumerContext.ConsumerSubscribeMode.ASSIGN: ASSIGN mode.ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: SUBSCRIBE mode.
N/A
Open the project structure in your IDE and make sure that the OpenJDK version for the project is 1.8.
Run the client code.
NoteWhen you run the code for the first time, the IDE needs some time to automatically load the required plugins and dependencies.
The SDK client periodically collects and displays statistics about data consumption. These statistics include the total number of data records sent and received, the total data volume, and the number of records per second (RPS).
[2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}Parameter
Description
outCountsThe total number of data records consumed by the SDK client.
outBytesThe total volume of data consumed by the SDK client, in bytes.
outRpsThe number of requests per second for data consumption by the SDK client.
outBpsThe number of bits transmitted per second for data consumption by the SDK client.
countThe total number of parameters in the data consumption information (metrics).
NoteThis does not include
countitself.inBytesThe total volume of data sent by the DTS server, in bytes.
DStoreRecordQueueThe current size of the data cache queue when the DTS server sends data.
inCountsThe total number of data records sent by the DTS server.
inBpsThe number of bits transmitted per second when the DTS server sends data.
inRpsThe number of requests per second when the DTS server sends data.
__dtThe timestamp when the SDK client receives the data, in milliseconds.
DefaultUserRecordQueueThe size of the data cache queue after serialization.
Edit the code to consume subscribed data as needed.
When you consume subscribed data, you need to manage consumer offsets to prevent data loss, minimize data duplication, and enable on-demand consumption.
FAQ
What can I do if I cannot connect to a subscription instance?
Troubleshoot the issue based on the error message. For more information, see Troubleshooting.
What is the data format of a consumer offset after persistence?
After a consumer offset is persisted, the data is returned in JSON format. The persisted consumer offset is a Unix timestamp that you can pass directly to the SDK. For example, in the returned data, the value
1700709977for the"timestamp"key is the persisted consumer offset.{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}Can a tracking task be consumed by multiple clients in parallel?
No. Although the SUBSCRIBE mode allows multiple clients to run in parallel, only one client can consume data at a time.
Which version of the Kafka client is encapsulated in the SDK code?
Version 2.0.0 and later of dts-new-subscribe-sdk encapsulate Kafka client (kafka-clients) 2.7.0. Versions earlier than 2.0.0 encapsulate Kafka client 1.0.0.
NoteIf you use a dependency package vulnerability detection tool in your application development process and find that the Kafka client (kafka-clients) encapsulated by dts-new-subscribe-sdk has a security vulnerability, you can resolve this vulnerability by replacing the client with the
2.1.4-shadedversion.<dependency> <groupId>com.aliyun.dts</groupId> <artifactId>dts-new-subscribe-sdk</artifactId> <version>2.1.4-shaded</version> </dependency>
Appendix
Manage consumer offsets
When the SDK client starts for the first time, restarts, or retries internally, you must query and pass a consumer offset to start or resume data consumption. The consumer offset is the UNIX timestamp of the first data record that the SDK client will consume.
To reset the consumer offset of the client, you can query and modify the consumer offset based on the consumption mode (SDK usage mode) as described in the following table.
Scenario | SDK usage mode | Offset management method |
Query a consumer offset | ASSIGN mode, SUBSCRIBE mode |
|
The SDK client starts for the first time. You must pass a consumer offset to consume data. | ASSIGN mode, SUBSCRIBE mode | Depending on the usage pattern of the SDK client, select the Java file DTSConsumerAssignDemo.java or DTSConsumerSubscribeDemo.java, and configure the consumer offset ( |
The SDK client retries internally. You must pass the last recorded consumer offset to resume data consumption. | ASSIGN mode | Search for the last recorded consumer offset in the following order. If an offset is found, the offset information is returned:
|
SUBSCRIBE mode | Search for the last recorded consumer offset in the following order. If an offset is found, the offset information is returned:
| |
The SDK client is restarted. You must pass the last recorded consumer offset to resume data consumption. | ASSIGN mode | Based on the
|
SUBSCRIBE mode | In this mode, in the consumerContext.java file, the
|
Store the consumer offset with persistence
If a disaster recovery switchover occurs for the incremental data ingestion module, the new module cannot save the client's last consumer offset. This is especially true in SUBSCRIBE mode. As a result, the client may start consuming data from an earlier offset, leading to repeated consumption of historical data. For example, assume that before a service switchover, the offset range of the old module is from 08:00:00 on November 11, 2023, to 08:00:00 on November 12, 2023, and the client's consumer offset is 08:00:00 on November 12, 2023. After the switchover, the offset range of the new module is from 10:00:00 on November 08, 2023, to 08:01:00 on November 12, 2023. In this scenario, the client starts consumption from the start offset of the new module (10:00:00 on November 08, 2023), which results in repeated data consumption.
To prevent repeated consumption of historical data in this switchover scenario, we recommend that you configure a persistent storage method for the consumer offset on the client. The following sample method is provided for reference. You can modify it as needed.
Create a
UserMetaStore()method that inherits and implements theAbstractUserMetaStore()method.For example, you can use a MySQL database to store offset information. The following sample Java code shows how to do this:
public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); rs = pres.executeQuery(); if (rs.next()) { String checkpoint = rs.getString("checkpoint"); return checkpoint; } } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } return null; } }In the consumerContext.java file, call the
setUserRegisteredStore(new UserMetaStore())method to configure the external storage medium.
Troubleshooting
Exception | Error message | Cause | Solution |
Connection failure | | The | Enter the correct |
| You cannot connect to the real IP address using the broker address. | ||
| The username or password is incorrect. | ||
| In the consumerContext.java file, | Enter a consumer offset within the timestamp range of the subscription instance. For more information about the query method, see Parameter description. | |
Slowdown in subscription consumption | N/A |
| |