This topic describes how to use the demo code of a Kafka client to consume tracked data. The change tracking feature of the new version allows you to consume tracked data by using a Kafka client from V0.11 to V1.1.

Prerequisites

Precautions

  • If you enable auto commit when you use the change tracking feature, some data may be committed before it is consumed. This results in data loss. We recommend that you manually commit data.
    Note If data fails to be committed due to a fault, you can restart the client to continue consuming data from the last recorded consumer offset. However, duplicate data may be generated during this period. You must manually filter out the duplicate data.
  • Data is serialized and stored in the Avro format. For more information, see Record.avsc.
    Warning If you are not using the Kafka client that is described in this topic, you must parse the tracked data based on the Avro schema.
  • The search unit is second when DTS calls the offsetFotTimes operation. The search unit is millisecond when a native Kafka client calls this operation.

Download and run the demo code of the Kafka client

Click here to download the demo code of the Kafka client. For more information about how to use the demo code, see Readme.

Table 1. Download and run the demo code of the Kafka client
Step File or directory
1. Use the native Kafka consumer to obtain incremental data from change tracking tasks. subscribe_example-master/javaimpl/src/main/java/recordgenerator/
2. Deserialize the image of the incremental data, and obtain pre-image , post-image, and other attributes. subscribe_example-master/javaimpl/src/main/java/boot/MysqlRecordPrinter.java
3. Convert the dataTypeNumber values in the deserialized data into MySQL data types. subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/

Procedure

IntelliJ IDEA (Community Edition 2018.1.4 Windows) is used in this example.

  1. Download the demo code of the Kafka client, and then decompress the package.
  2. Open IntelliJ IDEA. In the window that appears, click Open.
    Open a project
  3. In the dialog box that appears, go to the directory where the downloaded demo code resides. Find the pom.xml file.
    Open the pom.xml file
  4. In the dialog box that appears, select Open as Project.
  5. On the IntelliJ IDEA page, expand folders to find the demo file of the Kafka client, and double-click the file. The file name is NotifyDemo.java.
    Open the demo file of the Kafka client
  6. Set the parameters in the NotifyDemo.java file.
    Set parameters
    Parameter Description Method to obtain
    USER_NAME The username of the consumer group.
    Warning If you are not using the Kafka client that is described in this topic, you must specify the username in the following format: <Consumer group account>-<Consumer group ID>, for example, dtstest-dtsae******bpv. Otherwise, the connection fails.
    In the DTS console, click the instance ID, and then click Data Consume. You can obtain the Consumer Group ID and the corresponding Account information.
    Note The password of the consumer group account is automatically specified when you create a consumer group.
    View the consumer group ID and account
    PASSWORD_NAME The password of the account.
    SID_NAME The ID of the consumer group.
    GROUP_NAME The name of the consumer group. Set this parameter to the consumer group ID.
    KAFKA_TOPIC The topic of the change tracking task. In the DTS console, click the instance ID. On the Track Data Changes page, you can obtain the tracked topic, network address, and port number.Obtain the topic and network information
    KAFKA_BROKER_URL_NAME The network address and port number of the change tracking task.
    Note If you track data changes over internal networks, the network latency is minimal. This is applicable if the ECS instance where you deploy the Kafka client belongs to the same VPC or classic network as the change tracking instance.
    INITIAL_CHECKPOINT_NAME The consumer offset of consumed data. The value is a UNIX timestamp.
    Note You must save the consumer offset. If the consumption process is interrupted, you can specify the consumer offset on the change tracking client to resume data consumption. This allows you to prevent against data loss. When you start the change tracking client, you can specify the consumer offset to consume data on demand.
    When you use the Kafka client to track data changes for the first time, you can view the data range of the change tracking instance. Then, you can convert the required time point into a UNIX timestamp. For more information, see FAQ.
    USE_CONFIG_CHECKPOINT_NAME Default value: true. The default value indicates that the client is forced to consume data from the specified consumer offset. This allows you to retain the data that is received but not processed. None.
    SUBSCRIBE_MODE_NAME You can run two Kafka clients for a consumer group to implement disaster recovery. To use this feature, deploy two Kafka clients and set the value of the SUBSCRIBE_MODE_NAME parameter to subscribe.

    Default value: assign. The default value indicates that only one Kafka client is deployed.

    None.
  7. On the top of the IntelliJ IDEA page, choose Run > Run to run the client.
    Note When you run IntelliJ IDEA for the first time, it loads and installs the relevant dependency.

Running result of the Kafka client

The following figure shows that the Kafka client can track data changes from the source database.

Running result of the Kafka client

You can delete the // characters from the //log.info(ret); string in line 25 of the NotifyDemo.java file. Then, run the client again to view the data change information.

Running details of the Kafka client

FAQ

  • Q: How do I set the value of the INITIAL_CHECKPOINT_NAME parameter when I use the Kafka client for the first time?
    A: You can view the data range of the change tracking instance and then convert the required time point into a UNIX timestamp. For example, you can copy the starting time (09:00:38, June 16, 2020) into a UNIX timestamp (1592269238) and then set the value of the INITIAL_CHECKPOINT_NAME parameter to 1592269238, as shown in the following figure.Data range
  • Q: Why do I need to record the consumer offset of the Kafka client?

    A: The consumer offset recorded by DTS is the time when DTS receives a commit operation from the Kafka client. The recorded consumer offset may be different from the actual consumption time. If a business application or the Kafka client is unexpectedly interrupted, you can specify an accurate consumer offset to continue data consumption. This prevents against data loss or duplicate data consumption.

Mappings between MySQL data types and dataTypeNumber values

MySQL data type Value of dataTypeNumber
MYSQL_TYPE_DECIMAL 0
MYSQL_TYPE_INT8 1
MYSQL_TYPE_INT16 2
MYSQL_TYPE_INT32 3
MYSQL_TYPE_FLOAT 4
MYSQL_TYPE_DOUBLE 5
MYSQL_TYPE_NULL 6
MYSQL_TYPE_TIMESTAMP 7
MYSQL_TYPE_INT64 8
MYSQL_TYPE_INT24 9
MYSQL_TYPE_DATE 10
MYSQL_TYPE_TIME 11
MYSQL_TYPE_DATETIME 12
MYSQL_TYPE_YEAR 13
MYSQL_TYPE_DATE_NEW 14
MYSQL_TYPE_VARCHAR 15
MYSQL_TYPE_BIT 16
MYSQL_TYPE_TIMESTAMP_NEW 17
MYSQL_TYPE_DATETIME_NEW 18
MYSQL_TYPE_TIME_NEW 19
MYSQL_TYPE_JSON 245
MYSQL_TYPE_DECIMAL_NEW 246
MYSQL_TYPE_ENUM 247
MYSQL_TYPE_SET 248
MYSQL_TYPE_TINY_BLOB 249
MYSQL_TYPE_MEDIUM_BLOB 250
MYSQL_TYPE_LONG_BLOB 251
MYSQL_TYPE_BLOB 252
MYSQL_TYPE_VAR_STRING 253
MYSQL_TYPE_STRING 254
MYSQL_TYPE_GEOMETRY 255