After you create a tracking task and a consumer group for a change tracking channel, you can use the Data Transmission Service (DTS) software development kit (SDK) to consume the tracked data. This topic describes how to use the provided sample code.
For information about how to consume data tracked from a PolarDB-X 1.0 instance or a Data Management (DMS) logical database, see Use the SDK demo to consume the data tracked from a PolarDB-X 1.0 instance.
This topic uses a Java SDK client as an example. For sample code in Python and Go, see dts-subscribe-demo.
Prerequisites
You have created a subscription instance that is in the Normal status.
NoteFor more information about how to create a subscription instance, see Overview of subscription scenarios.
A consumer group is created for the change tracking instance. For more information, see Create consumer groups.
To consume subscribed data, a RAM user must have the AliyunDTSFullAccess permission and permissions to access source objects. For more information about how to grant these permissions, see Use a system policy to authorize a RAM user to manage DTS instances and Grant permissions to a RAM user.
Usage notes
When you consume tracked data, you must call the commit method of DefaultUserRecord to commit the offset. Otherwise, data will be consumed repeatedly.
Different consumption processes are independent of each other.
The Current Offset in the console shows the current offset of the tracking task, not the offset submitted by the client.
Procedure
Download the SDK sample code file, and then decompress the file.
Verify the version of the SDK code.
Go to the directory where you decompressed the file.
Use a text editor to open the pom.xml file in the directory.
Change the change tracking SDK version (version) to the latest version.
NoteYou can view the latest Maven dependency on the dts-new-subscribe-sdk page.
Edit the SDK code.
Use a code editor to open the decompressed file.
Open the Java file that corresponds to the consumption mode of the SDK client.
NoteThe path of the Java file is
aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/.Usage mode
Java file
Description
Scenarios
ASSIGN mode
DTSConsumerAssignDemo.java
To ensure the global order of messages, DTS assigns only one partition, partition 0, to each tracking topic. When the SDK client is used in ASSIGN mode, we recommend that you start only one SDK client.
Only one SDK client consumes tracked data in the same consumer group.
SUBSCRIBE mode
DTSConsumerSubscribeDemo.java
To ensure the global order of messages, DTS assigns only one partition, partition 0, to each tracking topic. When the SDK client is used in SUBSCRIBE mode, you can start multiple SDK clients in one consumer group at the same time to implement disaster recovery. This works because if a client that is consuming data in the consumer group fails, other SDK clients are randomly and automatically assigned to partition 0 to continue consumption.
Multiple SDK clients consume tracked data in the same consumer group. This is applicable to data disaster recovery scenarios.
Set the parameters in the Java code.
Parameter
Description
How to obtain
brokerUrlThe network address and port number of the change tracking channel.
NoteIf the server on which you deploy the SDK client, such as an ECS instance, and the change tracking instance are in the same VPC, we recommend that you use the VPC address for data consumption to reduce network latency.
Because of potential network instability, we do not recommend using a public endpoint.
In the DTS console, click the target subscription instance ID. This opens the Basic Information page, where you can obtain the endpoint and port number from the Network section.
topicThe tracking topic of the change tracking channel.
In the DTS console, click the target subscription instance ID. On the Basic Information page, find the Topic in the Basic Information section.
sidThe consumer group ID.
In the DTS console, click the target subscription instance ID. On the Consume Data page, obtain the Consumer Group ID/Name and the Account for the consumer group.
userNameThe username for the consumer group account.
WarningIf you are not using the client provided in this topic, set the username to the
<Username of the consumer group>-<Consumer group ID>format. Example:dtstest-dtsae******bpv. Otherwise, the connection will fail.passwordThe password for the account.
The password for the consumer group account that you set when you created the consumer group.
initCheckpointThe consumer offset. This parameter specifies the timestamp of the first data record to be consumed by the SDK client. The value must be a UNIX timestamp, such as 1620962769.
NoteThe consumer offset can be used for the following purposes:
Continue data consumption from the last consumed offset after an application interruption. This prevents data loss.
Start data consumption from a specific offset when the client starts. This lets you consume data from a specific point in time.
The consumer offset must be within the timestamp range of the change tracking instance and must be converted to a UNIX timestamp.
NoteYou can view the data range for the target change tracking instance in the Data Range column of the tracking task list.
subscribeModeThe consumption mode of the SDK client. You do not need to modify this parameter.
ConsumerContext.ConsumerSubscribeMode.ASSIGN: ASSIGN mode.ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: SUBSCRIBE mode.
None
Open the project structure in the code editor and ensure that the OpenJDK version for this project is 1.8.
Run the client code.
NoteWhen you run the code for the first time, the code editor may take some time to automatically load the required plug-ins and dependencies.
The SDK client periodically collects and displays statistics information about data consumption. This information includes the total number of data records sent and received, the total data volume, and the number of records per second (RPS) received.
[2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}Parameter
Description
outCountsThe total number of data records consumed by the SDK client.
outBytesThe total amount of data consumed by the SDK client, in bytes.
outRpsThe RPS when the SDK client consumes data.
outBpsThe number of bits transmitted per second when the SDK client consumes data.
countThe total number of parameters in the data consumption information (metrics) of the SDK client.
NoteThis does not include
countitself.inBytesThe total amount of data sent by the DTS server, in bytes.
DStoreRecordQueueThe current size of the data cache queue when the DTS server sends data.
inCountsThe total number of data records sent by the DTS server.
inBpsThe number of bits transmitted per second when the DTS server sends data.
inRpsThe RPS when the DTS server sends data.
__dtThe current timestamp when the SDK client receives data, in milliseconds.
DefaultUserRecordQueueThe current size of the data cache queue after serialization.
Edit the code to consume tracked data as needed.
When you consume subscribed data, you must manage consumer offsets. This practice prevents data loss and duplication and allows for on-demand consumption.
FAQ
What do I do if I cannot connect to the change tracking instance?
Troubleshoot the issue based on the error message. For more information, see Troubleshooting.
What is the data format of a consumer offset after persistence?
After a consumer offset is persisted, it is returned in JSON format. The persisted consumer offset is a UNIX timestamp that you can pass directly to the SDK. In the following example,
1700709977after"timestamp"is the persisted consumer offset.{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}Does a tracking task support parallel consumption by multiple clients?
No. Although the SUBSCRIBE mode allows multiple clients to run in parallel, only one client can actively consume data at any given time.
Which version of the Kafka client is encapsulated in the SDK code?
Version 2.0.0 and later of dts-new-subscribe-sdk encapsulate version 2.7.0 of the Kafka client (kafka-clients). Versions earlier than 2.0.0 encapsulate version 1.0.0 of the Kafka client.
Appendix
Manage consumer offsets
When the SDK client starts for the first time, restarts, or performs an internal retry, you must query and pass the consumer offset to start or resume data consumption. The consumer offset is the UNIX timestamp of the first data record that the SDK client consumes.
To reset the consumer offset of the client, you can query and modify the consumer offset based on the consumption mode (SDK usage mode) as described in the following table.
Scenario | SDK usage mode | Offset management method |
Query a consumer offset | ASSIGN mode, SUBSCRIBE mode |
|
When starting the SDK client for the first time, pass a consumer offset to consume data. | ASSIGN mode, SUBSCRIBE mode | Based on the usage mode of your SDK client, select the DTSConsumerAssignDemo.java or DTSConsumerSubscribeDemo.java file. Then, configure the consumer offset ( |
The SDK client needs to re-pass the last recorded consumer offset to continue data consumption due to an internal retry. | ASSIGN mode | Search for the last recorded consumer offset in the following order. Once found, the offset information is returned:
|
SUBSCRIBE mode | Search for the last recorded consumer offset in the following order. Once found, the offset information is returned:
| |
After restarting the SDK client, re-pass the last recorded consumer offset to continue data consumption. | ASSIGN mode | Query the consumer offset based on the
|
SUBSCRIBE mode | In this mode, the
|
Use persistent storage for consumer offsets
If a disaster recovery mechanism is triggered for the incremental data collection module, especially in SUBSCRIBE mode, the new module cannot retrieve the client's last consumer offset. This may cause the client to start consuming data from an older offset, which results in repeated consumption of historical data. For example, assume that before a service switchover, the offset range of the old module is from 08:00:00 on November 11, 2023, to 08:00:00 on November 12, 2023. The client's consumer offset is at the end of this range: 08:00:00 on November 12, 2023. After the switchover, the offset range of the new module is from 10:00:00 on November 8, 2023, to 08:01:00 on November 12, 2023. In this scenario, the client starts consuming data from the new module's starting offset (10:00:00 on November 8, 2023), which causes historical data to be consumed again.
To avoid repeated consumption of historical data in this switchover scenario, we recommend that you configure a persistent storage method for consumer offsets on the client. The following section provides a sample method that you can modify based on your requirements.
Create a
UserMetaStore()method that inherits from and implements theAbstractUserMetaStore()method.For example, to use a MySQL database to store offset information, the Java sample code is as follows:
public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); ResultSet rs = pres.executeQuery() String checkpoint = rs.getString("checkpoint"); return checkpoint; } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } }In the consumerContext.java file, configure the external storage medium using the
setUserRegisteredStore(new UserMetaStore())method.
Troubleshooting
Fault | Error message | Cause | Solution |
Connection failed | | The | Enter valid values for the |
| Cannot connect to the real IP address through the broker address. | ||
| The username or password is incorrect. | ||
| In the consumerContext.java file, | Specify a consumer offset within the data range of the change tracking instance. For more information, see the parameter description table in this topic. | |
Data consumption slows down | None |
| |