After you configure a change tracking task, you can use the SDK demo that is provided by Data Transmission Service (DTS) to track and consume data. This topic describes how to use the SDK demo to consume tracked data.

Procedure

Important

This topic describes how to use the SDK demo to consume tracked data. In this example, IntelliJ IDEA Community Edition 2020.1 for Windows is used.

  1. Create a change tracking task. For more information, see Track data changes from an ApsaraDB RDS for MySQL instance, Track data changes from a PolarDB for MySQL cluster, or Track data changes from a self-managed Oracle database.
  2. Create one or more consumer groups. For more information, see Create consumer groups.
  3. Use the SDK demo based on your business requirements.
    • (Recommended) Use the packaged new change tracking SDK
      1. Open IntelliJ IDEA and click Create New Project.
      2. In the project that you created, find the pom.xml file.
      3. Add the following dependency to the pom.xml file:
        <dependency>
            <groupId>com.aliyun.dts</groupId>
            <artifactId>dts-new-subscribe-sdk</artifactId>
            <version>{dts_new_sdk_version}</version>
        </dependency>
        Note You can view the latest version of the change tracking SDK on the dts-new-subscribe-sdk page.
      4. Use the new change tracking SDK. For more information about the demo code, see SDK demo code.
    • Use the new change tracking SDK by customizing code
      1. Download the SDK demo package and decompress the package.
        Note To download the package, choose code > Download ZIP.
      2. Go to the directory in which the package is decompressed. Then, open the pom.xml file by using a text editor and change the SDK version to the latest. Set the SDK version
        Important You can obtain the latest version of the change tracking SDK from the Maven website. For more information, visit the Maven page of the change tracking SDK.
      3. Open IntelliJ IDEA. In the window that appears, click Open or Import. Open a project
      4. In the dialog box that appears, go to the directory in which the package is decompressed, and find the pom.xml file. Then, click OK. Find the pom.xml file
      5. In the dialog box that appears, select Open as Project.
      6. In IntelliJ IDEA, expand the folders to find the Java files. Then, double-click a Java file based on the mode in which you use an SDK client.The DTSConsumerAssignDemo.java and DTSConsumerSubscribeDemo.java Java files are available. Java files of the client
        Note DTS supports the following modes in which you use the SDK client:
        • ASSIGN mode: To ensure the global order of messages, DTS assigns only a single partition (Partition 0) to each tracked topic. If you use the SDK client in ASSIGN mode, we recommend that you start only a single SDK client.
        • SUBSCRIBE mode: To ensure the global order of messages, DTS assigns only a single partition (Partition 0) to each tracked topic. In SUBSCRIBE mode, you can start multiple SDK clients in a consumer group at the same time to implement disaster recovery. If an SDK client in the consumer group fails, other SDK clients are randomly and automatically allocated to Partition 0 to resume data consumption.
  4. Set the required parameters in the code of the Java file.
    assigndemo
    Table 1. The following table describes the required parameters.
    ParameterDescriptionMethod to obtain
    brokerUrlThe endpoint and port number of the change tracking instance.
    Note If you track data changes over internal networks, the network latency is minimal. This is applicable if the Elastic Compute Service (ECS) instance on which you deploy the SDK client belongs to the classic network or the same virtual private cloud (VPC) as the change tracking instance.
    In the new DTS console, click the instance ID. On the Basic Information page, you can obtain the endpoint and port number in the Network section. Network
    topicThe name of the topic of the change tracking instance. In the DTS console, click the instance ID. On the Basic Information page, you can obtain the tracked topic in the Basic Information section. topic
    sidThe ID of the consumer group. In the DTS console, click the instance ID. In the left-side navigation pane, click Consume Data. You can obtain the ID and account of the consumer group.
    Note The password of the consumer group account is automatically specified when you create the consumer group.
    Consumer group account
    userNameThe account of the consumer group.
    Warning If you are not using the SDK client that is described in this topic, you must specify this parameter in the following format: <Username>-<Consumer group ID>. Example: dtstest-dtsae******bpv. Otherwise, the connection fails.
    passwordThe password of the account.
    initCheckpointThe consumption checkpoint. It is the timestamp when the SDK client consumes the first data record. The value is a UNIX timestamp. Example: 1620962769.
    Note The consumption checkpoint can be used in the following scenarios:
    • If the consumption process is interrupted, you can specify the consumption checkpoint to resume data consumption. This allows you to prevent data loss.
    • When you start the change tracking client, you can specify the consumption checkpoint to consume data on demand.
    The consumption checkpoint must be within the data range of the change tracking instance, as shown in the following figure. The consumption checkpoint must be converted to a UNIX timestamp. Data range
    Note You can use a search engine to obtain a UNIX timestamp converter.
    ConsumerContext.ConsumerSubscribeMode subscribeModeThe mode in which you use the SDK client. Valid values:
    • ConsumerContext.ConsumerSubscribeMode.ASSIGN: In ASSIGN mode, only a single SDK client in a consumer group can consume tracked data.
    • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: In SUBSCRIBE mode, you can start multiple SDK clients in a consumer group at the same time to implement disaster recovery.
    N/A
  5. In the top navigation bar of IntelliJ IDEA, choose Run > Run to run the client.
    Note When you run IntelliJ IDEA for the first time, it takes a period of time to load and install relevant dependencies.
    • The following figure shows the result. This result indicates that the SDK client can track data changes from the source database. Consume data
    • The SDK client calculates and displays information about the consumed data at regular intervals. The information includes the total number of data records that are sent and received, the total amount of data, and the number of requests per second (RPS). Information about the consumed data
      Table 2. The following table describes the parameters in the information.
      ParameterDescription
      outCountsThe total number of data records consumed by the SDK client.
      outBytesThe total amount of data consumed by the SDK client. Unit: bytes.
      outRpsThe RPS in which the SDK client consumes data.
      outBpsThe number of bits transmitted per second in which the SDK client consumes data.
      inBytesThe total amount of data that is sent by the DTS server. Unit: bytes.
      DStoreRecordQueueThe size of the current data cache queue when the DTS server sends data.
      inCountsThe total number of data records that are sent by the DTS server.
      inRpsThe RPS in which the DTS server sends data.
      __dtThe timestamp that is generated when the SDK client receives data. Unit: milliseconds.
      DefaultUserRecordQueueThe size of the current data cache queue after serialization.

Save and query the consumption checkpoint

When the SDK client is started for the first time or restarted, or an internal retry occurs, you must query and specify the consumption checkpoint to start or resume data consumption. The following table describes how to manage and query the consumption checkpoint in different scenarios. This prevents data loss or duplicate data and allows you to consume data on demand.
ScenarioUsage mode of the SDK clientQuery method
Query the consumption checkpointASSIGN and SUBSCRIBE
  • The SDK client saves the consumption checkpoint every 5 seconds and submits the consumption checkpoint to the DTS server. To query the last consumption checkpoint, you can use the following methods:
    • Find the localCheckpointStore file of the server on which the SDK client resides.
    • Go to the Consume Data page of the change tracking instance.
  • If you configured an external persistent shared storage medium such as a database in the setUserRegisteredStore(newUserMetaStore()) parameter in the consumerContext.java file, the storage medium saves the consumption checkpoint every 5 seconds. You can query the consumption checkpoint by using the storage medium.
When you start the SDK client for the first time, you must specify the consumption checkpoint to consume data. ASSIGN and SUBSCRIBESelect the DTSConsumerAssignDemo.java or DTSConsumerSubscribeDemo.java file based on the mode in which you use the SDK client. Then, specify the initCheckpoint parameter to consume data. For more information, see Use the SDK demo based on your business requirements. (Recommended) Use the packaged new change tracking SDK Open IntelliJ IDEA and click Create New Project. In the project that you created, find the pom.xml file. Add the following dependency to the pom.xml file:<dependency> <groupId>com.aliyun.dts</groupId> <artifactId>dts-new-subscribe-sdk</artifactId> <version>{dts_new_sdk_version}</version> </dependency>You can view the latest version of the change tracking SDK on the dts-new-subscribe-sdk page. Use the new change tracking SDK. For more information about the demo code, see SDK demo code. Use the new change tracking SDK by customizing code Download the SDK demo package and decompress the package. To download the package, choose > Download ZIP. Go to the directory in which the package is decompressed. Then, open the pom.xml file by using a text editor and change the SDK version to the latest. You can obtain the latest version of the change tracking SDK from the Maven website. For more information, visit the Maven page of the change tracking SDK. Open IntelliJ IDEA. In the window that appears, click Open or Import. In the dialog box that appears, go to the directory in which the package is decompressed, and find the pom.xml file. Then, click OK. In the dialog box that appears, select Open as Project. In IntelliJ IDEA, expand the folders to find the Java files. Then, double-click a Java file based on the mode in which you use an <dfn class="aliterm" aliterm-def="The program that is used to run the SDK demo.">SDK client</dfn>.The DTSConsumerAssignDemo.java and DTSConsumerSubscribeDemo.java Java files are available. DTS supports the following modes in which you use the SDK client: ASSIGN mode: To ensure the global order of messages, DTS assigns only a single partition (Partition 0) to each tracked topic. If you use the SDK client in ASSIGN mode, we recommend that you start only a single SDK client. SUBSCRIBE mode: To ensure the global order of messages, DTS assigns only a single partition (Partition 0) to each tracked topic. In SUBSCRIBE mode, you can start multiple SDK clients in a consumer group at the same time to implement disaster recovery. If an SDK client in the consumer group fails, other SDK clients are randomly and automatically allocated to Partition 0 to resume data consumption. and The following table describes the required parameters..
When an internal retry occurs, you must specify the consumption checkpoint of the previous data record to resume data consumption. ASSIGNPerform the following steps to find the consumption checkpoint of the previous data record:
  1. Find the external storage medium that you configured in the setUserRegisteredStore(newUserMetaStore()) parameter in the consumerContext.java file.
  2. Find the localCheckpointStore file of the server on which the SDK client resides.
  3. Find the start timestamp that you specified in the initCheckpoint parameter in the DTSConsumerSubscribeDemo.java file.
SUBSCRIBEPerform the following steps to find the consumption checkpoint of the previous data record:
  1. Find the external storage medium that you configured in the setUserRegisteredStore(newUserMetaStore()) parameter in the consumerContext.java file.
  2. Find the saved consumption checkpoint of the DTS server (DStore).
  3. Find the start timestamp that you specified in the initCheckpoint parameter in the DTSConsumerSubscribeDemo.java file.
  4. Use the start consumption checkpoint of the DTS server (new DStore).
After the SDK client is restarted, you must specify the consumption checkpoint of the last data record to resume data consumption. ASSIGNCheck the setting of the setForceUseCheckpoint parameter in the consumerContext.java file and query the consumption checkpoint.
  • If the parameter is set to true, the value of the initCheckpoint parameter is used as the consumption checkpoint each time the SDK client is restarted.
  • If the parameter is set to false or is not specified, perform the following steps to find the consumption checkpoint of the previous data record:
    1. Find the localCheckpointStore file of the server on which the SDK client resides.
    2. Find the saved consumption checkpoint of the DTS server (DStore).
    3. Find the external storage medium that you configured in the setUserRegisteredStore(newUserMetaStore()) parameter in the consumerContext.java file.
SUBSCRIBEIn this mode, the setting of the setForceUseCheckpoint parameter in the consumerContext.java file does not take effect. Perform the following steps to find the consumption checkpoint of the previous data record:
  1. Find the external storage medium that you configured in the setUserRegisteredStore(newUserMetaStore()) parameter in the consumerContext.java file.
  2. Find the saved consumption checkpoint of the DTS server (DStore).
  3. Find the start timestamp that you specified in the initCheckpoint parameter in the DTSConsumerSubscribeDemo.java file.
  4. Use the start consumption checkpoint of the DTS server (new DStore).

Troubleshooting

IssueError messageCauseSolution
The connection failed.
ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)
The value of the brokerUrl parameter is invalid. Enter valid values for the brokerUrl, userName, and password parameters. For more information, see The following table describes the required parameters..
telnet real node *** failed, please check the network
The broker address cannot be redirected to the real IP address.
ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)
The specified username or password is invalid.
com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed
The setUseCheckpoint parameter in the consumerContext.java file is set to true, but the consumption checkpoint is not within the data range of the change tracking instance. Specify a consumption checkpoint within the data range of the change tracking instance. For more information, see The following table describes the required parameters..
The response time of data consumption increased. N/AYou can analyze the cause by querying the DStoreRecordQueue and DefaultUserRecordQueue parameters. For more information, see The following table describes the parameters in the information..
  • If the value of the DStoreRecordQueue parameter is 0, the rate at which DTS server pulls data decreases.
  • If the value of the DefaultUserRecordQueue parameter is the default value 512, the rate at which the SDK client consumes data decreases.