After you configure a change tracking task, you can use the SDK demo code that is provided by Data Transmission Service (DTS) to track and consume data. This topic describes how to use the SDK demo code to consume tracked data.

Before you begin

Create an AccessKey pair, which consists of an AccessKey ID and AccessKey secret. For more information, see Create an AccessKey pair.

Notice If you track and consume data as a RAM user, the RAM user must have the AliyunDTSFullAccess permission and the permissions to access the source objects. For more information, see Use a system policy to authorize a RAM user to manage DTS instances and Grant permissions to a RAM user.

Procedure

IntelliJ IDEA (Community Edition 2020.1 Windows) is used in this example.

  1. Create a change tracking task. For more information, see Track data changes from an ApsaraDB RDS for MySQL instance (new), Track data changes from a PolarDB for MySQL cluster, or Track data changes from a self-managed Oracle database.
  2. Create one or more consumer groups. For more information, see Create consumer groups.
  3. Download the SDK demo code package and decompress the package.
  4. Go to the directory where the package is decompressed, open the pom.xml file by using a text editor, and then change the SDK version to the latest.
    Set the SDK version
    Notice You can obtain the latest version of the change tracking SDK from the Maven website. For more information, visit the Maven page of the change tracking SDK.
  5. Open IntelliJ IDEA. In the window that appears, click Open or Import.
    Open a project
  6. In the dialog box that appears, go to the directory where the package is decompressed, and expand the folders to find the pom.xml file.
    Find the pom.xml file
  7. In the dialog box that appears, select Open as Project.
  8. On the IntelliJ IDEA page, expand the folders to find the Java files. Then, double-click a Java file based on the mode in which you use the SDK client.The following Java files are available: DTSConsumerAssignDemo.java and DTSConsumerSubscribeDemo.java.
    Java files of the client
    Note DTS supports the following modes for using the SDK client:
    • ASSIGN mode: To ensure the global order of messages, DTS assigns only one partition (Partition 0) to each tracked topic. If you use the SDK client in ASSIGN mode, we recommend that you start only one SDK client.
    • SUBSCRIBE mode: To ensure the global order of messages, DTS assigns only one partition (Partition 0) to each tracked topic. In SUBSCRIBE mode, you can start multiple SDK clients in a consumer group at the same time to implement disaster recovery. If an SDK client in the consumer group fails, other SDK clients will be randomly and automatically allocated to Partition 0 to resume data consumption.
  9. Set the required parameters in the code of the Java file.
    assigndemo
    Table 1. Required parameters
    Parameter Description Method to obtain
    brokerUrl The network address and port number of the change tracking instance.
    Note If you track data changes over internal networks, the network latency is minimal. This is applicable if the ECS instance where you deploy the SDK client belongs to the classic network or the same VPC as the change tracking instance.
    In the DTS console, click the instance ID. On the View Task Settings page, you can obtain the tracked topic, network address, and port number. View Task Settings
    topic The topic of the change tracking instance.
    sid The ID of the consumer group. In the DTS console, click the instance ID, and then click Data Consume. You can obtain the consumer group ID and the corresponding username.
    Note When you create a consumer group, the password of the consumer group is automatically specified.
    Data Consume
    userName The username of the consumer group.
    Warning If you are not using the SDK client that is described in this topic, you must specify this parameter in the following format: <Username>-<Consumer group ID>, for example, dtstest-dtsae******bpv. Otherwise, the connection fails.
    password The password of the consumer group.
    initCheckpoint The consumer offset. It is the timestamp when the SDK client consumes the first data record. The value is a UNIX timestamp, for example, 1620962769.
    Note The consumer offset is useful in the following scenarios:
    • If the consumption process is interrupted, you can specify the consumer offset on the change tracking client to resume data consumption. This allows you to prevent against data loss.
    • When you start the change tracking client, you can specify the consumer offset to consume data on demand.
    The consumer offset must be within the data range of the change tracking instance, as shown in the following figure. The consumer offset must be converted to a UNIX timestamp. Data range
    Note You can use a search engine to obtain a UNIX timestamp converter.
    ConsumerContext.ConsumerSubscribeMode subscribeMode The mode in which you use the SDK client. Valid values:
    • ConsumerContext.ConsumerSubscribeMode.ASSIGN: In ASSIGN mode, only one SDK client in a consumer group can consume tracked data. For more information, see SDK usage modes.
    • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: In SUBSCRIBE mode, you can start multiple SDK clients in a consumer group at the same time to implement disaster recovery. For more information, see SDK usage modes.
    None
  10. In the top menu bar of IntelliJ IDEA, choose Run > Run to run the client.
    Note When you run IntelliJ IDEA for the first time, it requires some time to load and install the relevant dependency.
    • The following figure shows that the SDK client can track data changes from the source database. Consume data
    • The SDK client calculates and displays information about the consumed data at regular intervals. The information includes the total number of data records that are sent and received, the total amount of data, and the number of requests per second (RPS). Statistics
      Table 2. Statistics of consumed data
      Parameter Description
      outCounts The total number of data records that are consumed by the SDK client.
      outBytes The total amount of data that is consumed by the SDK client. Unit: bytes.
      outRps The number of requests per second when the SDK client consumes data.
      outBps The number of bits transmitted per second when the SDK client consumes data.
      inBytes The total amount of data that is sent by the DTS server. Unit: bytes.
      DStoreRecordQueue The size of the current data cache queue when the DTS server sends data.
      inCounts The total number of data records that are sent by the DTS server.
      inRps The number of requests per second when the DTS server sends data.
      __dt The current timestamp when the SDK client receives data. Unit: milliseconds.
      DefaultUserRecordQueue The size of the current data cache queue after serialization.

Save and query the consumer offset

When the SDK client is started for the first time or restarted, or an internal retry occurs, you need to query and specify the consumer offset to start or resume data consumption. The following table describes how to manage and query the consumer offset in different situations. To implement on-demand data consumption, you must ensure that no data is lost or duplicate.
Scenario SDK usage mode Query method
Query the consumer offset ASSIGN and SUBSCRIBE
  • The SDK client saves the consumer offset every 5 seconds and submits it to the DTS server. To query the last consumer offset, you can use the following methods:
    • Find the localCheckpointStore file of the server where the SDK client resides.
    • Go to the Data Consume page of the change tracking instance.
  • If you configured an external persistent shared storage medium (such as a database) in setUserRegisteredStore(newUserMetaStore()) in the consumerContext.java file, the storage medium saves the consumer offset every 5 seconds. You can query the consumer offset by using the storage medium.
When you start the SDK client for the first time, you must specify the consumer offset to consume data. ASSIGN and SUBSCRIBE Select the DTSConsumerAssignDemo.java or DTSConsumerSubscribeDemo.java file based on the mode in which you use the SDK client. Then, specify the initCheckpoint parameter. For more information, see Step 8 and Step 9.
If an internal retry occurs, you must specify the consumer offset of the last data record to resume data consumption. ASSIGN Perform the following steps to find the consumer offset of the last data record:
  1. Find the localCheckpointStore file of the server where the SDK client resides.
  2. Find the consumer offset of the latest data record in IntelliJ IDEA.
  3. Find the external storage medium that you configured in setUserRegisteredStore(newUserMetaStore()) in the consumerContext.java file.
SUBSCRIBE Perform the following steps to find the consumer offset of the last data record:
  1. Go to the Data Consume page of the change tracking instance and obtain the consumer offset.
  2. Find the external storage medium that you configured in setUserRegisteredStore(newUserMetaStore()) in the consumerContext.java file.
After the SDK client is restarted, you must specify the consumer offset of the last data record to resume data consumption. ASSIGN Check the setting of the setForceUseCheckpoint parameter in the consumerContext.java file and query the consumer offset.
  • If the parameter is set to true, the specified initCheckpoint is used as the consumer offset each time the SDK client is restarted.
  • If the parameter is set to false or is not specified, perform the following steps to find the consumer offset of the last data record:
    1. Go to the Data Consume page of the change tracking instance and obtain the consumer offset.
    2. Find the external storage medium that you configured in setUserRegisteredStore(newUserMetaStore()) in the consumerContext.java file.
    3. Find the consumer offset of the latest data record in IntelliJ IDEA.
SUBSCRIBE In this mode, the setting of the setForceUseCheckpoint parameter in the consumerContext.java file does not take effect. Perform the following steps to find the consumer offset of the last data record:
  1. Go to the Data Consume page of the change tracking instance and obtain the consumer offset.
  2. Find the external storage medium that you configured in setUserRegisteredStore(newUserMetaStore()) in the consumerContext.java file.
  3. Find the consumer offset of the latest data record in IntelliJ IDEA.

Troubleshooting

Issue Error message Cause Solution
Connection failed
ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)
The specified brokerUrl is invalid. Enter the valid brokerUrl, userName, and password. For more information, see Table 1.
telnet real node *** failed, please check the network
The broker address cannot be redirected to the real IP address.
ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)
The specified username and password are invalid.
com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed
The setUseCheckpoint parameter in the consumerContext.java file is set to true, but the consumer offset is not within the data range of the change tracking instance. Enter the consumer offset within the data range of the change tracking instance. For more information, see Table 1.
The response time of data consumption is increased. None You can analyze the cause by querying the DStoreRecordQueue and DefaultUserRecordQueue parameters. For more information, see Table 2.
  • If the DStoreRecordQueue parameter is set to 0, the DTS server pulls data at a slower rate.
  • If the DefaultUserRecordQueue parameter is set to the default value 512, the SDK client consumes data at a slower rate.