This topic describes how to use the demo code of a Kafka client to consume tracked data. The change tracking feature of the new version allows you to consume tracked data by using a Kafka client (V0.11 to V2.0).

Precautions

  • If you enable auto commit when you use the change tracking feature, some data may be committed before it is consumed. This results in data loss. We recommend that you manually commit data.
    Note If data fails to be committed, you can restart the client to continue consuming data from the last recorded consumer offset. However, duplicate data may be generated during this period. You must manually filter out the duplicate data.
  • Data is serialized and stored in the Avro format. For more information, see Record.avsc.
    Warning If you are not using the Kafka client that is described in this topic, you must parse the tracked data based on the Avro schema.
  • The search unit is second when Data Transmission Service (DTS) calls the offsetForTimes operation. The search unit is millisecond when a native Kafka client calls this operation.

Run the Kafka client

Click Kafka client demo code to download the demo code of the Kafka client. For more information about how to use the demo code, see Readme.

Note If you use a Kafka client of version 2.0, you must change the version number in the subscribe_example-master/javaimpl/pom.xml file to 2.0.0.
kafka2.0
Table 1. Steps
Step File or directory
1. Use the native Kafka consumer to obtain incremental data from the change tracking instance. subscribe_example-master/javaimpl/src/main/java/recordgenerator/
2. Deserialize the image of the incremental data, and obtain the pre-image, post-image , and other attributes.
Warning
  • If the source instance is a self-managed Oracle database, you must enable supplemental logging for all columns. This ensures that the client can successfully consume the tracked data and guarantees the integrity of the pre-image and post-image. You can submit a ticket to enable supplemental logging for all columns.
  • If the source instance is not a self-managed Oracle database, DTS does not guarantee the integrity of the pre-image. We recommend that you verify the obtained pre-image.
subscribe_example-master/javaimpl/src/main/java/boot/MysqlRecordPrinter.java
3. Convert the dataTypeNumber values in the deserialized data into data types of the corresponding database. subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/

Procedure

The following steps show how to run the Kafka client to consume tracked data. In this example, IntelliJ IDEA (Community Edition 2018.1.4 Windows) is used.

  1. Create a change tracking task. For more information, see Track data changes from an ApsaraDB RDS for MySQL instance, Track data changes from a PolarDB for MySQL cluster, or Track data changes from a self-managed Oracle database.
  2. Create one or more consumer groups. For more information, see Create consumer groups.
  3. Download the demo code of the Kafka client and decompress the package.
  4. Open IntelliJ IDEA. In the window that appears, click Open.
    Open a project
  5. In the dialog box that is displayed, go to the directory where the downloaded demo code resides. Find the pom.xml file.
    Open the pom.xml file
  6. In the dialog box that is displayed, select Open as Project.
  7. In the Project tool window of IntelliJ IDEA, expand folders to find the demo file of the Kafka client, and double-click the file. The file name is NotifyDemoDB.java.
  8. Set the parameters in the NotifyDemoDB.java file.
    Set the parameters
    Parameter Description Method to obtain
    USER_NAME The username of the consumer group.
    Warning If you are not using the SDK client that is used in this topic, you must specify this parameter in the following format: <Username>-<Consumer group ID>. Example: dtstest-dtsae******bpv. Otherwise, the connection fails.
    In the DTS console, click the instance ID. In the left-side navigation pane, click Consume Data. You can obtain the Consumer Group ID and the corresponding Account.
    Note The password of the consumer group account is automatically specified when you create a consumer group.
    View the consumer group ID and username
    PASSWORD_NAME The password of the account.
    SID_NAME The ID of the consumer group.
    GROUP_NAME The name of the consumer group. Set this parameter to the consumer group ID.
    KAFKA_TOPIC The topic of the change tracking instance. In the DTS console, click the instance ID. On the Task Management page, you can obtain the tracked Topic and the network information. Obtain the topic and network information
    KAFKA_BROKER_URL_NAME The endpoint of the change tracking instance.
    Note If you track data changes over internal networks, the network latency is minimal. This is applicable if the Elastic Compute Service (ECS) instance where you deploy the Kafka client belongs to the classic network or the same virtual private cloud (VPC) as the change tracking instance.
    INITIAL_CHECKPOINT_NAME The consumer offset of consumed data. The value is a UNIX timestamp. Example: 1592269238.
    Note You must save the consumer offset.
    • If the consumption process is interrupted, you can specify the consumer offset on the change tracking client to resume data consumption. This prevents data loss.
    • When you start the change tracking client, you can specify the consumer offset to consume data on demand.
    The consumer offset of consumed data must be within the data range of the change tracking instance, as shown in the following figure. The consumer offset must be converted into a UNIX timestamp. Data range
    Note
    • For more information about how to view the data range, see View tracked data changes.
    • You can use a search engine to obtain a UNIX timestamp converter.
    USE_CONFIG_CHECKPOINT_NAME Specifies whether the client is forced to consume data from the specified consumer offset. Default value: true. The default value retains the data that is received but not processed. N/A
    SUBSCRIBE_MODE_NAME Specifies whether to run two or more Kafka clients for a consumer group. If you want to use this feature, set this parameter to subscribe for these Kafke clients.

    Default value: assign. The default value indicates that only one Kafka client is deployed.

    N/A
  9. In the top menu bar of IntelliJ IDEA, choose Run > Run to run the client.
    Note The first time you run IntelliJ IDEA, a specific time period is required to load and install the relevant dependency.

Running result of the Kafka client

The following figure shows the result that the Kafka client can track data changes from the source database.

Running result of the Kafka client

You can delete the // characters from the //log.info(ret); string in line 25 of the NotifyDemoDB.java file. Then, run the client again to view the data change information.

kafka

FAQ

  • Q: Why do I need to record the consumer offset of the Kafka client?

    A: The consumer offset recorded by DTS is the time when DTS receives a commit operation from the Kafka client. The recorded consumer offset may be different from the actual consumption time. If a business application or the Kafka client is unexpectedly interrupted, you can specify an accurate consumer offset to continue data consumption. This prevents data loss or duplicate data consumption.

Mappings between MySQL data types and dataTypeNumber values

MySQL data type Value of dataTypeNumber
MYSQL_TYPE_DECIMAL 0
MYSQL_TYPE_INT8 1
MYSQL_TYPE_INT16 2
MYSQL_TYPE_INT32 3
MYSQL_TYPE_FLOAT 4
MYSQL_TYPE_DOUBLE 5
MYSQL_TYPE_NULL 6
MYSQL_TYPE_TIMESTAMP 7
MYSQL_TYPE_INT64 8
MYSQL_TYPE_INT24 9
MYSQL_TYPE_DATE 10
MYSQL_TYPE_TIME 11
MYSQL_TYPE_DATETIME 12
MYSQL_TYPE_YEAR 13
MYSQL_TYPE_DATE_NEW 14
MYSQL_TYPE_VARCHAR 15
MYSQL_TYPE_BIT 16
MYSQL_TYPE_TIMESTAMP_NEW 17
MYSQL_TYPE_DATETIME_NEW 18
MYSQL_TYPE_TIME_NEW 19
MYSQL_TYPE_JSON 245
MYSQL_TYPE_DECIMAL_NEW 246
MYSQL_TYPE_ENUM 247
MYSQL_TYPE_SET 248
MYSQL_TYPE_TINY_BLOB 249
MYSQL_TYPE_MEDIUM_BLOB 250
MYSQL_TYPE_LONG_BLOB 251
MYSQL_TYPE_BLOB 252
MYSQL_TYPE_VAR_STRING 253
MYSQL_TYPE_STRING 254
MYSQL_TYPE_GEOMETRY 255

Mappings between Oracle data types and dataTypeNumber values

Oracle data type Value of dataTypeNumber
VARCHAR2/NVARCHAR2 1
NUMBER/FLOAT 2
LONG 8
DATE 12
RAW 23
LONG_RAW 24
UNDEFINED 29
XMLTYPE 58
ROWID 69
CHAR and NCHAR 96
BINARY_FLOAT 100
BINARY_DOUBLE 101
CLOB/NCLOB 112
BLOB 113
BFILE 114
TIMESTAMP 180
TIMESTAMP_WITH_TIME_ZONE 181
INTERVAL_YEAR_TO_MONTH 182
INTERVAL_DAY_TO_SECOND 183
UROWID 208
TIMESTAMP_WITH_LOCAL_TIME_ZONE 231

Mappings between PostgreSQL data types and dataTypeNumber values

PostgreSQL data type Value of dataTypeNumber
INT2/SMALLINT 21
INT4/INTEGER/SERIAL 23
INT8/BIGINT 20
CHARACTER 18
CHARACTER VARYING 1043
REAL 700
DOUBLE PRECISION 701
NUMERIC 1700
MONEY 790
DATE 1082
TIME/TIME WITHOUT TIME ZONE 1083
TIME WITH TIME ZONE 1266
TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE 1114
TIMESTAMP WITH TIME ZONE 1184
BYTEA 17
TEXT 25
JSON 114
JSONB 3082
XML 142
UUID 2950
POINT 600
LSEG 601
PATH 602
BOX 603
POLYGON 604
LINE 628
CIDR 650
CIRCLE 718
MACADDR 829
INET 869
INTERVAL 1186
TXID_SNAPSHOT 2970
PG_LSN 3220
TSVECTOR 3614
TSQUERY 3615