All Products
Search
Document Center

Data Transmission Service:Use a Kafka client to consume tracked data

Last Updated:Mar 28, 2026

The change tracking feature of Data Transmission Service (DTS) exposes tracked data over a Kafka-compatible interface. Kafka clients from V0.11 to V2.7 can connect directly to a change tracking instance and consume incremental data serialized in Avro format.

This topic walks you through running the Kafka client demo, explains how to configure each parameter, and shows how to manage consumption checkpoints to avoid data loss.

Usage notes

Disable auto commit. When auto commit is enabled, the Kafka client commits offsets on a periodic timer — independently of whether your application has finished processing those records. If the client restarts before processing completes, it resumes from the committed offset and skips unprocessed data. Disable auto commit and commit offsets manually after your application successfully processes each batch.

If a commit fails and the client restarts, consumption resumes from the last successfully committed checkpoint. Records between the failed commit and the restart may be delivered again — filter out duplicates in your application logic.

Additional notes:

  • Tracked data is serialized in Avro format. See Record.avsc for the schema definition.

  • If you are not using the demo client described in this topic, parse tracked data using the Avro schema and validate the parsed output.

  • The offsetForTimes operation uses seconds as the time unit in DTS, compared to milliseconds in a standard Kafka client.

  • The Kafka client may experience transient disconnections from the change tracking server — for example, during disaster recovery. If you are not using the demo client, implement automatic reconnection in your client.

  • In subscribe mode, when DTS switches the incremental data collection module, the consumption checkpoint saved on the DTS server is removed. Specify a consumption checkpoint explicitly based on your requirements. For more information, see Use the SDK demo to consume tracked data and Manage the consumption checkpoint.

How it works

The Kafka client demo follows a three-stage pipeline:

StageWhat it doesSource location
1. FetchUses a native Kafka consumer to pull incremental data from the change tracking instancesubscribe_example-master/javaimpl/src/main/java/recordgenerator/
2. DeserializeDecodes the Avro-serialized record to extract the pre-image, post-image, and metadatasubscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java
3. Type conversionMaps dataTypeNumber values to the corresponding database data typessubscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/

Pre-image completeness depends on the source database type:

  • Self-managed Oracle: Enable supplemental logging for all columns before consuming. Without it, the pre-image and post-image may be incomplete.

  • Other sources: DTS does not guarantee pre-image completeness. Validate the pre-image in your application before use.

Prerequisites

Before you begin, ensure that you have:

Run the Kafka client demo

  1. Download the Kafka client demo package. If you are using Kafka client V2.0, open subscribe_example-master/javaimpl/pom.xml and change the version number to 2.0.0:

    Click code and select Download ZIP to download the package.

    kafka2.0

  2. Open IntelliJ IDEA and click Open.

    Open IntelliJ IDEA

  3. Navigate to the directory where you extracted the demo, select the pom.xml file, and click Open as Project.

    Select pom.xml

  4. In the Project tool window, navigate to and double-click NotifyDemoDB.java.

  5. Set the parameters in NotifyDemoDB.java. Authentication parameters (required) Connection parameters (required)

    Warning

    If you are not using this demo client, format USER_NAME as <Username>-<Consumer group ID> — for example, dtstest-dtsae******bpv. An incorrect format causes the connection to fail.

    Use an internal network endpoint when possible — it provides lower latency than a public endpoint. An internal network endpoint applies when the Elastic Compute Service (ECS) instance running your Kafka client is on the classic network or in the same virtual private cloud (VPC) as the change tracking instance.
    ParameterDescriptionHow to get the value
    USER_NAMEUsername of the consumer group account.In the DTS console, click the change tracking instance ID. In the left navigation pane, click Consume Data to view consumer group details.
    PASSWORD_NAMEPassword of the consumer group account.Set when you create the consumer group.
    SID_NAMEConsumer group ID.In the DTS console, click the change tracking instance ID. In the left navigation pane, click Consume Data.
    GROUP_NAMEConsumer group name. Set this to the consumer group ID.Same page as SID_NAME.
    ParameterDescriptionHow to get the value
    KAFKA_TOPICName of the tracked topic.In the DTS console, click the change tracking instance ID. On the Basic Information page, find the topic and network information.
    KAFKA_BROKER_URL_NAMEEndpoint of the change tracking instance.Same page as KAFKA_TOPIC.

    Consumption behavior parameters

    ParameterDescriptionDefault
    INITIAL_CHECKPOINT_NAMEStarting consumption checkpoint as a UNIX timestamp (for example, 1592269238). The value must fall within the data range of the change tracking instance. View the data range in the Data Range column on the Change Tracking Tasks page. In subscribe mode, this value only takes effect on the first start.
    USE_CONFIG_CHECKPOINT_NAMEForces the client to start from the specified INITIAL_CHECKPOINT_NAME. Set to true to prevent unprocessed records from being skipped after a restart.true
    SUBSCRIBE_MODE_NAMERun mode for the consumer group. Set to subscribe to allow multiple Kafka clients in the same consumer group. Keep the default assign for single-client deployments.assign

    Set parameters

  6. From the top menu bar, choose Run > Run to start the client.

    The first run may take some time to download and install dependencies.

Verify the result

The following figure shows the Kafka client successfully consuming tracked data from the source database.

Kafka client consuming tracked data

Manage the consumption checkpoint

When DTS switches the incremental data collection module — for example, during a failover — the consumption checkpoint stored on the DTS server is removed. Configure the client to detect this switch and reset the checkpoint to avoid data loss.

Step 1: Listen for cluster switches

Register ClusterSwitchListener as a consumer interceptor:

properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ClusterSwitchListener.class.getName());

The following shows an example implementation of ClusterSwitchListener:

public class ClusterSwitchListener implements ClusterResourceListener, ConsumerInterceptor {
    private final static Logger LOG = LoggerFactory.getLogger(ClusterSwitchListener.class);
    private ClusterResource originClusterResource = null;
    private ClusterResource currentClusterResource = null;

    public ConsumerRecords onConsume(ConsumerRecords records) {
        return records;
    }

    public void close() {
    }

    public void onCommit(Map offsets) {
    }

    public void onUpdate(ClusterResource clusterResource) {
        synchronized (this) {
            originClusterResource = currentClusterResource;
            currentClusterResource = clusterResource;
            if (null == originClusterResource) {
                LOG.info("Cluster updated to " + currentClusterResource.clusterId());
            } else {
                if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) {
                    LOG.info("Cluster not changed on update:" + clusterResource.clusterId());
                } else {
                    LOG.error("Cluster changed");
                    throw new ClusterSwitchException("Cluster changed from " + originClusterResource.clusterId() + " to " + currentClusterResource.clusterId()
                            + ", consumer require restart");
                }
            }
        }
    }

    public boolean isClusterResourceChanged() {
        if (null == originClusterResource) {
            return false;
        }
        if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) {
            return false;
        }
        return true;
    }

    public void configure(Map<String, ?> configs) {
    }

    public static class ClusterSwitchException extends KafkaException {
        public ClusterSwitchException(String message, Throwable cause) {
            super(message, cause);
        }

        public ClusterSwitchException(String message) {
            super(message);
        }

        public ClusterSwitchException(Throwable cause) {
            super(cause);
        }

        public ClusterSwitchException() {
            super();
        }
    }
}

Step 2: Reset the checkpoint on a cluster switch

When ClusterSwitchException is caught, reset the consumption checkpoint to the timestamp of the last successfully consumed record:

try {
    // do some action
} catch (ClusterSwitchListener.ClusterSwitchException e) {
    reset();
}

// Reset the consumption checkpoint.
public reset() {
    long offset = kafkaConsumer.offsetsForTimes(timestamp);
    kafkaConsumer.seek(tp, offset);
}
For a complete implementation, see KafkaRecordFetcher.

FAQ

Why do I need to manage consumption checkpoints manually?

The checkpoint recorded by DTS reflects when DTS received the Kafka commit — not when your application finished processing the data. If your application or the client exits unexpectedly, specifying an accurate checkpoint lets you resume from exactly where processing stopped, preventing both data loss and unnecessary duplicate consumption.

Data type mappings

Use these tables to convert dataTypeNumber values in deserialized records to the corresponding database data types.

MySQL data types

MySQL data typedataTypeNumber
MYSQL_TYPE_DECIMAL0
MYSQL_TYPE_INT81
MYSQL_TYPE_INT162
MYSQL_TYPE_INT323
MYSQL_TYPE_FLOAT4
MYSQL_TYPE_DOUBLE5
MYSQL_TYPE_NULL6
MYSQL_TYPE_TIMESTAMP7
MYSQL_TYPE_INT648
MYSQL_TYPE_INT249
MYSQL_TYPE_DATE10
MYSQL_TYPE_TIME11
MYSQL_TYPE_DATETIME12
MYSQL_TYPE_YEAR13
MYSQL_TYPE_DATE_NEW14
MYSQL_TYPE_VARCHAR15
MYSQL_TYPE_BIT16
MYSQL_TYPE_TIMESTAMP_NEW17
MYSQL_TYPE_DATETIME_NEW18
MYSQL_TYPE_TIME_NEW19
MYSQL_TYPE_JSON245
MYSQL_TYPE_DECIMAL_NEW246
MYSQL_TYPE_ENUM247
MYSQL_TYPE_SET248
MYSQL_TYPE_TINY_BLOB249
MYSQL_TYPE_MEDIUM_BLOB250
MYSQL_TYPE_LONG_BLOB251
MYSQL_TYPE_BLOB252
MYSQL_TYPE_VAR_STRING253
MYSQL_TYPE_STRING254
MYSQL_TYPE_GEOMETRY255

Oracle data types

Oracle data typedataTypeNumber
VARCHAR2/NVARCHAR21
NUMBER/FLOAT2
LONG8
DATE12
RAW23
LONG_RAW24
UNDEFINED29
XMLTYPE58
ROWID69
CHAR and NCHAR96
BINARY_FLOAT100
BINARY_DOUBLE101
CLOB/NCLOB112
BLOB113
BFILE114
TIMESTAMP180
TIMESTAMP_WITH_TIME_ZONE181
INTERVAL_YEAR_TO_MONTH182
INTERVAL_DAY_TO_SECOND183
UROWID208
TIMESTAMP_WITH_LOCAL_TIME_ZONE231

PostgreSQL data types

PostgreSQL data typedataTypeNumber
INT2/SMALLINT21
INT4/INTEGER/SERIAL23
INT8/BIGINT20
CHARACTER18
CHARACTER VARYING1043
REAL700
DOUBLE PRECISION701
NUMERIC1700
MONEY790
DATE1082
TIME/TIME WITHOUT TIME ZONE1083
TIME WITH TIME ZONE1266
TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE1114
TIMESTAMP WITH TIME ZONE1184
BYTEA17
TEXT25
JSON114
JSONB3082
XML142
UUID2950
POINT600
LSEG601
PATH602
BOX603
POLYGON604
LINE628
CIDR650
CIRCLE718
MACADDR829
INET869
INTERVAL1186
TXID_SNAPSHOT2970
PG_LSN3220
TSVECTOR3614
TSQUERY3615

What's next