This topic describes how to use the demo code of a Kafka client to consume tracked data. The change tracking feature of the new version allows you to consume tracked data by using a Kafka client (V0.11 to V2.0).
Precautions
- If you enable auto commit when you use the change tracking feature, some data may
be committed before it is consumed. This results in data loss. We recommend that you
manually commit data.
Note If data fails to be committed, you can restart the client to continue consuming data from the last recorded consumer offset. However, duplicate data may be generated during this period. You must manually filter out the duplicate data.
- Data is serialized and stored in the Avro format. For more information, see Record.avsc.
Warning If you are not using the Kafka client that is described in this topic, you must parse the tracked data based on the Avro schema.
- The search unit is second when Data Transmission Service (DTS) calls the
offsetForTimes
operation. The search unit is millisecond when a native Kafka client calls this operation.
Run the Kafka client
Click Kafka client demo code to download the demo code of the Kafka client. For more information about how to use the demo code, see Readme.

Step | File or directory |
---|---|
1. Use the native Kafka consumer to obtain incremental data from the change tracking instance. | subscribe_example-master/javaimpl/src/main/java/recordgenerator/ |
2. Deserialize the image of the incremental data, and obtain the pre-image, post-image , and other attributes.
Warning
|
subscribe_example-master/javaimpl/src/main/java/boot/MysqlRecordPrinter.java |
3. Convert the dataTypeNumber values in the deserialized data into data types of the
corresponding database.
Note For more information, see the following topics:
|
subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/ |
Procedure
The following steps show how to run the Kafka client to consume tracked data. In this example, IntelliJ IDEA (Community Edition 2018.1.4 Windows) is used.
Running result of the Kafka client
The following figure shows the result that the Kafka client can track data changes from the source database.

You can delete the //
characters from the //log.info(ret);
string in line 25 of the NotifyDemoDB.java file. Then, run the client again to view the data change information.

FAQ
- Q: Why do I need to record the consumer offset of the Kafka client?
A: The consumer offset recorded by DTS is the time when DTS receives a commit operation from the Kafka client. The recorded consumer offset may be different from the actual consumption time. If a business application or the Kafka client is unexpectedly interrupted, you can specify an accurate consumer offset to continue data consumption. This prevents data loss or duplicate data consumption.
Mappings between MySQL data types and dataTypeNumber values
MySQL data type | Value of dataTypeNumber |
---|---|
MYSQL_TYPE_DECIMAL | 0 |
MYSQL_TYPE_INT8 | 1 |
MYSQL_TYPE_INT16 | 2 |
MYSQL_TYPE_INT32 | 3 |
MYSQL_TYPE_FLOAT | 4 |
MYSQL_TYPE_DOUBLE | 5 |
MYSQL_TYPE_NULL | 6 |
MYSQL_TYPE_TIMESTAMP | 7 |
MYSQL_TYPE_INT64 | 8 |
MYSQL_TYPE_INT24 | 9 |
MYSQL_TYPE_DATE | 10 |
MYSQL_TYPE_TIME | 11 |
MYSQL_TYPE_DATETIME | 12 |
MYSQL_TYPE_YEAR | 13 |
MYSQL_TYPE_DATE_NEW | 14 |
MYSQL_TYPE_VARCHAR | 15 |
MYSQL_TYPE_BIT | 16 |
MYSQL_TYPE_TIMESTAMP_NEW | 17 |
MYSQL_TYPE_DATETIME_NEW | 18 |
MYSQL_TYPE_TIME_NEW | 19 |
MYSQL_TYPE_JSON | 245 |
MYSQL_TYPE_DECIMAL_NEW | 246 |
MYSQL_TYPE_ENUM | 247 |
MYSQL_TYPE_SET | 248 |
MYSQL_TYPE_TINY_BLOB | 249 |
MYSQL_TYPE_MEDIUM_BLOB | 250 |
MYSQL_TYPE_LONG_BLOB | 251 |
MYSQL_TYPE_BLOB | 252 |
MYSQL_TYPE_VAR_STRING | 253 |
MYSQL_TYPE_STRING | 254 |
MYSQL_TYPE_GEOMETRY | 255 |
Mappings between Oracle data types and dataTypeNumber values
Oracle data type | Value of dataTypeNumber |
---|---|
VARCHAR2/NVARCHAR2 | 1 |
NUMBER/FLOAT | 2 |
LONG | 8 |
DATE | 12 |
RAW | 23 |
LONG_RAW | 24 |
UNDEFINED | 29 |
XMLTYPE | 58 |
ROWID | 69 |
CHAR and NCHAR | 96 |
BINARY_FLOAT | 100 |
BINARY_DOUBLE | 101 |
CLOB/NCLOB | 112 |
BLOB | 113 |
BFILE | 114 |
TIMESTAMP | 180 |
TIMESTAMP_WITH_TIME_ZONE | 181 |
INTERVAL_YEAR_TO_MONTH | 182 |
INTERVAL_DAY_TO_SECOND | 183 |
UROWID | 208 |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | 231 |
Mappings between PostgreSQL data types and dataTypeNumber values
PostgreSQL data type | Value of dataTypeNumber |
---|---|
INT2/SMALLINT | 21 |
INT4/INTEGER/SERIAL | 23 |
INT8/BIGINT | 20 |
CHARACTER | 18 |
CHARACTER VARYING | 1043 |
REAL | 700 |
DOUBLE PRECISION | 701 |
NUMERIC | 1700 |
MONEY | 790 |
DATE | 1082 |
TIME/TIME WITHOUT TIME ZONE | 1083 |
TIME WITH TIME ZONE | 1266 |
TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE | 1114 |
TIMESTAMP WITH TIME ZONE | 1184 |
BYTEA | 17 |
TEXT | 25 |
JSON | 114 |
JSONB | 3082 |
XML | 142 |
UUID | 2950 |
POINT | 600 |
LSEG | 601 |
PATH | 602 |
BOX | 603 |
POLYGON | 604 |
LINE | 628 |
CIDR | 650 |
CIRCLE | 718 |
MACADDR | 829 |
INET | 869 |
INTERVAL | 1186 |
TXID_SNAPSHOT | 2970 |
PG_LSN | 3220 |
TSVECTOR | 3614 |
TSQUERY | 3615 |