The change tracking feature of Data Transmission Service (DTS) exposes tracked data over a Kafka-compatible interface. Kafka clients from V0.11 to V2.7 can connect directly to a change tracking instance and consume incremental data serialized in Avro format.
This topic walks you through running the Kafka client demo, explains how to configure each parameter, and shows how to manage consumption checkpoints to avoid data loss.
Usage notes
Disable auto commit. When auto commit is enabled, the Kafka client commits offsets on a periodic timer — independently of whether your application has finished processing those records. If the client restarts before processing completes, it resumes from the committed offset and skips unprocessed data. Disable auto commit and commit offsets manually after your application successfully processes each batch.
If a commit fails and the client restarts, consumption resumes from the last successfully committed checkpoint. Records between the failed commit and the restart may be delivered again — filter out duplicates in your application logic.
Additional notes:
Tracked data is serialized in Avro format. See Record.avsc for the schema definition.
If you are not using the demo client described in this topic, parse tracked data using the Avro schema and validate the parsed output.
The
offsetForTimesoperation uses seconds as the time unit in DTS, compared to milliseconds in a standard Kafka client.The Kafka client may experience transient disconnections from the change tracking server — for example, during disaster recovery. If you are not using the demo client, implement automatic reconnection in your client.
In subscribe mode, when DTS switches the incremental data collection module, the consumption checkpoint saved on the DTS server is removed. Specify a consumption checkpoint explicitly based on your requirements. For more information, see Use the SDK demo to consume tracked data and Manage the consumption checkpoint.
How it works
The Kafka client demo follows a three-stage pipeline:
| Stage | What it does | Source location |
|---|---|---|
| 1. Fetch | Uses a native Kafka consumer to pull incremental data from the change tracking instance | subscribe_example-master/javaimpl/src/main/java/recordgenerator/ |
| 2. Deserialize | Decodes the Avro-serialized record to extract the pre-image, post-image, and metadata | subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java |
| 3. Type conversion | Maps dataTypeNumber values to the corresponding database data types | subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/ |
Pre-image completeness depends on the source database type:
Self-managed Oracle: Enable supplemental logging for all columns before consuming. Without it, the pre-image and post-image may be incomplete.
Other sources: DTS does not guarantee pre-image completeness. Validate the pre-image in your application before use.
Prerequisites
Before you begin, ensure that you have:
A change tracking instance. See Overview of change tracking scenarios.
One or more consumer groups. See Create consumer groups.
IntelliJ IDEA (this example uses Community Edition 2018.1.4 for Windows).
Run the Kafka client demo
Download the Kafka client demo package. If you are using Kafka client V2.0, open
subscribe_example-master/javaimpl/pom.xmland change the version number to2.0.0:Click
and select Download ZIP to download the package.
Open IntelliJ IDEA and click Open.

Navigate to the directory where you extracted the demo, select the
pom.xmlfile, and click Open as Project.
In the Project tool window, navigate to and double-click
NotifyDemoDB.java.Set the parameters in
NotifyDemoDB.java. Authentication parameters (required) Connection parameters (required)WarningIf you are not using this demo client, format
USER_NAMEas<Username>-<Consumer group ID>— for example,dtstest-dtsae******bpv. An incorrect format causes the connection to fail.Use an internal network endpoint when possible — it provides lower latency than a public endpoint. An internal network endpoint applies when the Elastic Compute Service (ECS) instance running your Kafka client is on the classic network or in the same virtual private cloud (VPC) as the change tracking instance.
Parameter Description How to get the value USER_NAMEUsername of the consumer group account. In the DTS console, click the change tracking instance ID. In the left navigation pane, click Consume Data to view consumer group details. PASSWORD_NAMEPassword of the consumer group account. Set when you create the consumer group. SID_NAMEConsumer group ID. In the DTS console, click the change tracking instance ID. In the left navigation pane, click Consume Data. GROUP_NAMEConsumer group name. Set this to the consumer group ID. Same page as SID_NAME.Parameter Description How to get the value KAFKA_TOPICName of the tracked topic. In the DTS console, click the change tracking instance ID. On the Basic Information page, find the topic and network information. KAFKA_BROKER_URL_NAMEEndpoint of the change tracking instance. Same page as KAFKA_TOPIC.Consumption behavior parameters
Parameter Description Default INITIAL_CHECKPOINT_NAMEStarting consumption checkpoint as a UNIX timestamp (for example, 1592269238). The value must fall within the data range of the change tracking instance. View the data range in the Data Range column on the Change Tracking Tasks page. In subscribe mode, this value only takes effect on the first start.— USE_CONFIG_CHECKPOINT_NAMEForces the client to start from the specified INITIAL_CHECKPOINT_NAME. Set totrueto prevent unprocessed records from being skipped after a restart.trueSUBSCRIBE_MODE_NAMERun mode for the consumer group. Set to subscribeto allow multiple Kafka clients in the same consumer group. Keep the defaultassignfor single-client deployments.assign
From the top menu bar, choose Run > Run to start the client.
The first run may take some time to download and install dependencies.
Verify the result
The following figure shows the Kafka client successfully consuming tracked data from the source database.

Manage the consumption checkpoint
When DTS switches the incremental data collection module — for example, during a failover — the consumption checkpoint stored on the DTS server is removed. Configure the client to detect this switch and reset the checkpoint to avoid data loss.
Step 1: Listen for cluster switches
Register ClusterSwitchListener as a consumer interceptor:
properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ClusterSwitchListener.class.getName());The following shows an example implementation of ClusterSwitchListener:
public class ClusterSwitchListener implements ClusterResourceListener, ConsumerInterceptor {
private final static Logger LOG = LoggerFactory.getLogger(ClusterSwitchListener.class);
private ClusterResource originClusterResource = null;
private ClusterResource currentClusterResource = null;
public ConsumerRecords onConsume(ConsumerRecords records) {
return records;
}
public void close() {
}
public void onCommit(Map offsets) {
}
public void onUpdate(ClusterResource clusterResource) {
synchronized (this) {
originClusterResource = currentClusterResource;
currentClusterResource = clusterResource;
if (null == originClusterResource) {
LOG.info("Cluster updated to " + currentClusterResource.clusterId());
} else {
if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) {
LOG.info("Cluster not changed on update:" + clusterResource.clusterId());
} else {
LOG.error("Cluster changed");
throw new ClusterSwitchException("Cluster changed from " + originClusterResource.clusterId() + " to " + currentClusterResource.clusterId()
+ ", consumer require restart");
}
}
}
}
public boolean isClusterResourceChanged() {
if (null == originClusterResource) {
return false;
}
if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) {
return false;
}
return true;
}
public void configure(Map<String, ?> configs) {
}
public static class ClusterSwitchException extends KafkaException {
public ClusterSwitchException(String message, Throwable cause) {
super(message, cause);
}
public ClusterSwitchException(String message) {
super(message);
}
public ClusterSwitchException(Throwable cause) {
super(cause);
}
public ClusterSwitchException() {
super();
}
}
}Step 2: Reset the checkpoint on a cluster switch
When ClusterSwitchException is caught, reset the consumption checkpoint to the timestamp of the last successfully consumed record:
try {
// do some action
} catch (ClusterSwitchListener.ClusterSwitchException e) {
reset();
}
// Reset the consumption checkpoint.
public reset() {
long offset = kafkaConsumer.offsetsForTimes(timestamp);
kafkaConsumer.seek(tp, offset);
}For a complete implementation, see KafkaRecordFetcher.
FAQ
Why do I need to manage consumption checkpoints manually?
The checkpoint recorded by DTS reflects when DTS received the Kafka commit — not when your application finished processing the data. If your application or the client exits unexpectedly, specifying an accurate checkpoint lets you resume from exactly where processing stopped, preventing both data loss and unnecessary duplicate consumption.
Data type mappings
Use these tables to convert dataTypeNumber values in deserialized records to the corresponding database data types.
MySQL data types
| MySQL data type | dataTypeNumber |
|---|---|
| MYSQL_TYPE_DECIMAL | 0 |
| MYSQL_TYPE_INT8 | 1 |
| MYSQL_TYPE_INT16 | 2 |
| MYSQL_TYPE_INT32 | 3 |
| MYSQL_TYPE_FLOAT | 4 |
| MYSQL_TYPE_DOUBLE | 5 |
| MYSQL_TYPE_NULL | 6 |
| MYSQL_TYPE_TIMESTAMP | 7 |
| MYSQL_TYPE_INT64 | 8 |
| MYSQL_TYPE_INT24 | 9 |
| MYSQL_TYPE_DATE | 10 |
| MYSQL_TYPE_TIME | 11 |
| MYSQL_TYPE_DATETIME | 12 |
| MYSQL_TYPE_YEAR | 13 |
| MYSQL_TYPE_DATE_NEW | 14 |
| MYSQL_TYPE_VARCHAR | 15 |
| MYSQL_TYPE_BIT | 16 |
| MYSQL_TYPE_TIMESTAMP_NEW | 17 |
| MYSQL_TYPE_DATETIME_NEW | 18 |
| MYSQL_TYPE_TIME_NEW | 19 |
| MYSQL_TYPE_JSON | 245 |
| MYSQL_TYPE_DECIMAL_NEW | 246 |
| MYSQL_TYPE_ENUM | 247 |
| MYSQL_TYPE_SET | 248 |
| MYSQL_TYPE_TINY_BLOB | 249 |
| MYSQL_TYPE_MEDIUM_BLOB | 250 |
| MYSQL_TYPE_LONG_BLOB | 251 |
| MYSQL_TYPE_BLOB | 252 |
| MYSQL_TYPE_VAR_STRING | 253 |
| MYSQL_TYPE_STRING | 254 |
| MYSQL_TYPE_GEOMETRY | 255 |
Oracle data types
| Oracle data type | dataTypeNumber |
|---|---|
| VARCHAR2/NVARCHAR2 | 1 |
| NUMBER/FLOAT | 2 |
| LONG | 8 |
| DATE | 12 |
| RAW | 23 |
| LONG_RAW | 24 |
| UNDEFINED | 29 |
| XMLTYPE | 58 |
| ROWID | 69 |
| CHAR and NCHAR | 96 |
| BINARY_FLOAT | 100 |
| BINARY_DOUBLE | 101 |
| CLOB/NCLOB | 112 |
| BLOB | 113 |
| BFILE | 114 |
| TIMESTAMP | 180 |
| TIMESTAMP_WITH_TIME_ZONE | 181 |
| INTERVAL_YEAR_TO_MONTH | 182 |
| INTERVAL_DAY_TO_SECOND | 183 |
| UROWID | 208 |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE | 231 |
PostgreSQL data types
| PostgreSQL data type | dataTypeNumber |
|---|---|
| INT2/SMALLINT | 21 |
| INT4/INTEGER/SERIAL | 23 |
| INT8/BIGINT | 20 |
| CHARACTER | 18 |
| CHARACTER VARYING | 1043 |
| REAL | 700 |
| DOUBLE PRECISION | 701 |
| NUMERIC | 1700 |
| MONEY | 790 |
| DATE | 1082 |
| TIME/TIME WITHOUT TIME ZONE | 1083 |
| TIME WITH TIME ZONE | 1266 |
| TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE | 1114 |
| TIMESTAMP WITH TIME ZONE | 1184 |
| BYTEA | 17 |
| TEXT | 25 |
| JSON | 114 |
| JSONB | 3082 |
| XML | 142 |
| UUID | 2950 |
| POINT | 600 |
| LSEG | 601 |
| PATH | 602 |
| BOX | 603 |
| POLYGON | 604 |
| LINE | 628 |
| CIDR | 650 |
| CIRCLE | 718 |
| MACADDR | 829 |
| INET | 869 |
| INTERVAL | 1186 |
| TXID_SNAPSHOT | 2970 |
| PG_LSN | 3220 |
| TSVECTOR | 3614 |
| TSQUERY | 3615 |