After you configure a change tracking task, you can use the SDK demo code that is provided by Data Transmission Service (DTS) to consume the tracked data. This topic describes how to use the SDK demo code to consume tracked data from distributed databases such as PloarDB-X 1.0 and Data Management (DMS) LogicDB.

Prerequisites

  • The Java Development Kit (JDK) of the 1.8 version is installed.
  • IntelliJ IDEA is installed.

Precautions

If you use a Resource Access Management (RAM) user to track data, the RAM user must have the AliyunDTSFullAccess permission and permissions to access the source objects. For more information, see Use a system policy to authorize a RAM user to manage DTS instances and Grant permissions to the RAM user.

Procedure

This topic describes how to use the SDK demo code to consume the tracked data from a PolarDB-X 1.0 instance. IntelliJ IDEA (Community Edition 2020.1 for Windows) is used in this example.

  1. Create a change tracking instance. For more information, see Track data changes from a PolarDB-X 1.0 instance.
  2. Create one or more consumer groups. For more information, see Create consumer groups.
  3. Download the SDK demo code package and decompress the package.
  4. Open the destination project in IntelliJ IDEA.
    1. Open IntelliJ IDEA. In the window that appears, click Open or Import.
      Open a project
    2. In the dialog box that is displayed, go to the directory where the package is decompressed. Then, open the folders and double-click the pom.xml file.
      pom.xml
    3. In the dialog box that appears, select Open as Project.
  5. In the IntelliJ IDEA window, expand the folders to find the Java files. Then, double-click a Java file based on the mode in which you use the SDK client.In this scenario, select DistributedDTSConsumerDemo.
    java
  6. Set the required parameters in the code of the Java file.
    public static void main(String[] args) throws ClientException {
            // Configure a change tracking task for a distributed database such as a PolarDB-X 1.0 instance. Specify parameters related to your AccessKey, DTS instance, DTS job, and consumer groups. 
            String accessKeyId = "LTA***********99reZ";
            String accessKeySecret = "****************";
            String regionId = "cn-hangzhou";
            String dtsInstanceId = "dtse5212sed162****";
            String jobId = "l791216x16d****";
            String sid = "dtsip412t13160****";
            String userName = "xftest";
            String password = "******";
            String proxyUrl = "dts-cn-****.com:18001";
            // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019))
            String checkpoint = "1639620090";
    
            // Convert physical database/table name to logical database/table name
            boolean mapping = true;
            // if force use config checkpoint when start. for checkpoint reset, only assign mode works
            boolean isForceUseInitCheckpoint = false;
    
            ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN;
            DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId,
                    jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl,
                    checkpoint, isForceUseInitCheckpoint, mapping);
            demo.start();
        }
    ParameterDescriptionMethod to obtain
    accessKeyIdThe AccessKey ID. For more information about how to obtain an AccessKey ID, see Create and obtain an AccessKey pair.
    accessKeySecretThe AccessKey secret.
    regionIdThe ID of the region where the change tracking instance resides. In the new DTS console, click the instance ID. On the Task Management page, you can obtain the instance region information. For example, if the instance resides in the China (Hangzhou) region, set the parameter to cn-hangzhou. For more information, see List of supported regions.
    dtsInstanceIdThe ID of the change tracking instance. In the new DTS console, click the instance ID. On the Task Management page, you can obtain the instance ID and task ID.
    jobIdThe ID of the change tracking task.
    sidThe ID of the consumer group. In the new DTS console, click the instance ID. In the left-side navigation pane, click Consume Data. You can obtain the Sid and the corresponding Account.
    Note The password of the consumer group account is automatically specified when you create a consumer group.
    userNameThe account of the consumer group.
    passwordThe password that corresponds to the account of the consumer group.
    proxyUrlThe endpoint and port number of the change tracking instance.
    Note If you track data changes over internal networks, the network latency is minimal. This is applicable if the ECS instance where you deploy the SDK client belongs to the classic network or the same virtual private cloud (VPC) as the change tracking instance.
    In the new DTS console, click the instance ID. On the Task Management page, you can obtain the endpoint and port number.
    checkpointThe consumer offset. It is the timestamp when the SDK client consumes the first data record. The value is a UNIX timestamp in seconds.
    Note The consumer offset can be used in the following scenarios:
    • After the consumption process is interrupted, you can specify the consumer offset to resume data consumption. This allows you to prevent data loss.
    • When you start the change tracking client, you can specify the consumer offset to consume data on demand.
    The consumer offset must be within the data range of the change tracking instance. The consumer offset must be converted into a UNIX timestamp.
    Note You can use a search engine to obtain a UNIX timestamp converter.
  7. In the top menu bar of IntelliJ IDEA, choose Run > Run to run the client.
    Note When you run IntelliJ IDEA for the first time, it requires some time to load and install the relevant dependency.
    • The running result shows the result that the SDK client can track data changes from the source instance.
    • The SDK client collects and displays statistics about the consumed data at regular intervals. The statistics information includes the total number of data records that are sent and received, the total amount of data, and the number of requests per second (RPS).
      Table 1. The following table describes the parameters in the information.
      ParameterDescription
      outCountsThe total number of data records consumed by the SDK client.
      outBytesThe total amount of data consumed by the SDK client. Unit: bytes.
      outRpsThe number of RPS when the SDK client consumes data.
      outBpsThe number of bits transmitted per second when the SDK client consumes data.
      countNone
      inBytesThe total amount of data that is sent by the DTS server. Unit: bytes.
      DStoreRecordQueueThe size of the current data cache queue when the DTS server sends data.
      inCountsThe total number of data records that are sent by the DTS server.
      inRpsThe number of RPS when the DTS server sends data.
      inBpsThe number of bits transmitted per second when the DTS server sends data.
      __dtThe current timestamp when the SDK client receives data. Unit: milliseconds.
      DefaultUserRecordQueueThe size of the current data cache queue after serialization.
  8. Optional: To modify the data type of the data to track, you can modify the code in the buildRecordListener() method or use a custom class.
    public static Map<String, RecordListener> buildRecordListener() {
            // user can impl their own listener
            RecordListener mysqlRecordPrintListener = new RecordListener() {
                @Override
                public void consume(DefaultUserRecord record) {
    
                    OperationType operationType = record.getOperationType();
    
                    if (operationType.equals(OperationType.INSERT)
                            || operationType.equals(OperationType.UPDATE)
                            || operationType.equals(OperationType.DELETE)
                            || operationType.equals(OperationType.HEARTBEAT)) {
    
                        // consume record
                        RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL);
    
                        recordPrintListener.consume(record);
    
                        //commit method push the checkpoint update
                        record.commit("");
                    }
                }
            };
            return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener);
        }