All Products
Search
Document Center

Data Transmission Service:Use a Kafka client to consume tracked data

Last Updated:Nov 10, 2023

This topic describes how to use the demo of a Kafka client to consume tracked data. The change tracking feature of the new version allows you to consume tracked data by using a Kafka client of V0.11 to V2.7.

Usage notes

  • If you enable auto commit when you use the change tracking feature, some data may be committed before it is consumed. This results in data loss. We recommend that you manually commit data.

    Note

    If data fails to be committed, you can restart the client to continue consuming data from the last recorded consumption checkpoint. However, duplicate data may be generated during this period. You must manually filter out the duplicate data.

  • Data is serialized and stored in the Avro format. For more information, see Record.avsc.

    Warning

    If you are not using the Kafka client that is described in this topic, you must parse the tracked data based on the Avro schema and check the parsed data.

  • The search unit is second when Data Transmission Service (DTS) calls the offsetForTimes operation. The search unit is millisecond when a native Kafka client calls this operation.

  • Transient connections may occur between a Kafka client and the change tracking server due to several reasons, such as disaster recovery. If you are not using the Kafka client that is described in this topic, your Kafka client must have network reconnection capabilities.

  • If you use a native Kafka client to consume tracked data, the incremental data collection module may be changed in DTS. In subscribe mode, the consumption checkpoint that the Kafka client saves to the DTS server is removed. You need to specify a consumption checkpoint to consume tracked data based on your business requirements. If you want to consume data in subscribe mode, we recommend that you use the SDK demo that is provided by DTS to track and consume data, or manually manage the consumption checkpoint. For more information, see Use the SDK demo to consume tracked data and the Manage the consumption checkpoint section of this topic.

Run the Kafka client

Download the Kafka client demo. For more information about how to use the demo, see Readme.

Note
  • Click code and select Download ZIP to download the package.

  • If you use a Kafka client of version 2.0, you must change the version number in the subscribe_example-master/javaimpl/pom.xml file to 2.0.0.

kafka2.0
Table 1 Process description

Step

Related directory or file

1. Use the native Kafka consumer to obtain incremental data from the change tracking instance.

subscribe_example-master/javaimpl/src/main/java/recordgenerator/

2. Deserialize the image of the incremental data, and obtain the pre-image , the post-image , and other attributes.

Warning
  • If the source instance is a self-managed Oracle database, you must enable supplemental logging for all columns. This ensures that the client can successfully consume the tracked data and ensures the integrity of the pre-image and post-image.

  • If the source instance is not a self-managed Oracle database, DTS does not ensure the integrity of the pre-image. We recommend that you verify the obtained pre-image.

subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java

3. Convert the dataTypeNumber values in the deserialized data into the data types of the corresponding database.

subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/

Procedure

The following steps show how to run the Kafka client to consume tracked data. In this example, IntelliJ IDEA Community Edition 2018.1.4 for Windows is used.

  1. Create a change tracking instance. For more information, see Overview of change tracking scenarios.

  2. Create one or more consumer groups. For more information, see Create consumer groups.

  3. Download the package of Kafka client demo and decompress the package.

    Note

    Click code and select Download ZIP to download the package.

  4. Open IntelliJ IDEA. In the window that appears, click Open.

    打开项目
  5. In the dialog box that appears, go to the directory in which the downloaded demo resides. Find the pom.xml file.

    打开项目文件
  6. In the dialog box that appears, select Open as Project.

  7. In the Project tool window of IntelliJ IDEA, click folders to find the demo file of the Kafka client, and double-click the file. The file name is NotifyDemoDB.java.

  8. Specify the parameters in the NotifyDemoDB.java file.

    设置参数值

    Parameter

    Description

    Method to obtain the parameter value

    USER_NAME

    The username of the consumer group account.

    Warning

    If you are not using the Kafka client that is described in this topic, you must specify this parameter in the following format: <Username>-<Consumer group ID>. Example: dtstest-dtsae******bpv. Otherwise, the connection fails.

    In the DTS console, find the change tracking instance that you want to manage and click the instance ID. In the left-side navigation pane, click Consume Data. On the page that appears, you can view the information about a consumer group, such as the ID or name and account of the consumer group.

    Note

    The password of the consumer group account is specified when you create the consumer group.

    PASSWORD_NAME

    The password of the account.

    SID_NAME

    The ID of the consumer group.

    GROUP_NAME

    The name of the consumer group. Set this parameter to the consumer group ID.

    KAFKA_TOPIC

    The name of the tracked topic of the change tracking instance.

    In the DTS console, find the change tracking instance that you want to manage and click the instance ID. On the Basic Information page, you can view the information about topic and network.

    KAFKA_BROKER_URL_NAME

    The endpoint of the change tracking instance.

    Note

    If you track data changes over internal networks, the network latency is minimal. This is applicable if the Elastic Compute Service (ECS) instance on which you deploy the Kafka client resides on the classic network or in the same virtual private cloud (VPC) as the change tracking instance.

    INITIAL_CHECKPOINT_NAME

    The consumption checkpoint of consumed data. The value is a UNIX timestamp. Example: 1592269238.

    Note
    • You must save the consumption checkpoint for the following reasons:

      • If the consumption process is interrupted, you can specify the consumption checkpoint on the Kafka client to resume data consumption. This prevents data loss.

      • When you start the Kafka client, you can specify the consumption checkpoint to consume data based on your business requirements.

    • If the SUBSCRIBE_MODE_NAME parameter is set to subscribe, the INITIAL_CHECKPOINT_NAME parameter that you specified takes effect only when you start the Kafka client for the first time.

    The consumption checkpoint of consumed data must be within the data range of the change tracking instance. The consumption checkpoint must be converted into a UNIX timestamp.

    Note
    • You can view the data range of the change tracking instance in the Data Range column on the Change Tracking Tasks page.

    • You can use a search engine to obtain a UNIX timestamp converter.

    USE_CONFIG_CHECKPOINT_NAME

    Specifies whether to force the client to consume data from the specified consumption checkpoint. Default value: true. You can set this parameter to true to prevent the data that is received but not processed from being lost.

    None

    SUBSCRIBE_MODE_NAME

    Specifies whether to run two or more Kafka clients for a consumer group. If you want to use this feature, set this parameter to subscribe for these Kafka clients.

    The default value is assign, which indicates that the feature is not used. We recommend that you deploy only one Kafka client for a consumer group.

    None

  9. In the top menu bar of IntelliJ IDEA, choose Run > Run to run the client.

    Note

    If you run IntelliJ IDEA for the first time, a specific time period is required to load and install relevant dependencies.

Results on the Kafka client

The following figure shows that the Kafka client can track data changes from the source database.

Kafka客户端订阅结果

You can delete the double forward slashes (//) from the //log.info(ret) string in line 25 of the NotifyDemoDB.java file. Then, run the client again to view the data change information.

FAQ

  • Q: Why do I need to record the consumption checkpoint of the Kafka client?

    A: The consumption checkpoint recorded by DTS is the point in time when DTS receives a commit operation from the Kafka client. The recorded consumption checkpoint may be different from the actual consumption time. If a business application or the Kafka client is unexpectedly interrupted, you can specify an accurate consumption checkpoint to continue data consumption. This prevents data loss or duplicate data consumption.

Manage the consumption checkpoint

  1. Configure the Kafka client to listen to the switchover of the data collection module in DTS.

    You can configure the consumer properties of the Kafka client to listen to the switchover of the data collection module in DTS. The following code provides an example on how to configure the consumer properties:

    properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ClusterSwitchListener.class.getName());

    The following code provides an example on how to implement ClusterSwitchListener:

    public class ClusterSwitchListener implements ClusterResourceListener, ConsumerInterceptor {
        private final static Logger LOG = LoggerFactory.getLogger(ClusterSwitchListener.class);
        private ClusterResource originClusterResource = null;
        private ClusterResource currentClusterResource = null;
    
        public ConsumerRecords onConsume(ConsumerRecords records) {
            return records;
        }
    
    
        public void close() {
        }
    
        public void onCommit(Map offsets) {
        }
    
    
        public void onUpdate(ClusterResource clusterResource) {
            synchronized (this) {
                originClusterResource = currentClusterResource;
                currentClusterResource = clusterResource;
                if (null == originClusterResource) {
                    LOG.info("Cluster updated to " + currentClusterResource.clusterId());
                } else {
                    if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) {
                        LOG.info("Cluster not changed on update:" + clusterResource.clusterId());
                    } else {
                        LOG.error("Cluster changed");
                        throw new ClusterSwitchException("Cluster changed from " + originClusterResource.clusterId() + " to " + currentClusterResource.clusterId()
                                + ", consumer require restart");
                    }
                }
            }
        }
    
        public boolean isClusterResourceChanged() {
            if (null == originClusterResource) {
                return false;
            }
            if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) {
                return false;
            }
            return true;
        }
    
        public void configure(Map<String, ?> configs) {
        }
    
        public static class ClusterSwitchException extends KafkaException {
            public ClusterSwitchException(String message, Throwable cause) {
                super(message, cause);
            }
    
            public ClusterSwitchException(String message) {
                super(message);
            }
    
            public ClusterSwitchException(Throwable cause) {
                super(cause);
            }
    
            public ClusterSwitchException() {
                super();
            }
    
        }
  2. Specify the consumption checkpoint based on the captured switchover of the data collection module in DTS.

    Set the start consumption checkpoint of the next data tracking to the timestamp of the latest tracked data entry that was consumed by the client. The following code provides an example on how to specify the consumption checkpoint:

    try{
       //do some action
    } catch (ClusterSwitchListener.ClusterSwitchException e) {
       reset();
    }
    
    // Reset the consumption checkpoint.
    public reset() {
      long offset = kafkaConsumer.offsetsForTimes(timestamp);
      kafkaConsumer.seek(tp,offset);
    }
    Note

    For more information about the examples, see KafkaRecordFetcher.

Mappings between MySQL data types and dataTypeNumber values

MySQL data type

Value of dataTypeNumber

MYSQL_TYPE_DECIMAL

0

MYSQL_TYPE_INT8

1

MYSQL_TYPE_INT16

2

MYSQL_TYPE_INT32

3

MYSQL_TYPE_FLOAT

4

MYSQL_TYPE_DOUBLE

5

MYSQL_TYPE_NULL

6

MYSQL_TYPE_TIMESTAMP

7

MYSQL_TYPE_INT64

8

MYSQL_TYPE_INT24

9

MYSQL_TYPE_DATE

10

MYSQL_TYPE_TIME

11

MYSQL_TYPE_DATETIME

12

MYSQL_TYPE_YEAR

13

MYSQL_TYPE_DATE_NEW

14

MYSQL_TYPE_VARCHAR

15

MYSQL_TYPE_BIT

16

MYSQL_TYPE_TIMESTAMP_NEW

17

MYSQL_TYPE_DATETIME_NEW

18

MYSQL_TYPE_TIME_NEW

19

MYSQL_TYPE_JSON

245

MYSQL_TYPE_DECIMAL_NEW

246

MYSQL_TYPE_ENUM

247

MYSQL_TYPE_SET

248

MYSQL_TYPE_TINY_BLOB

249

MYSQL_TYPE_MEDIUM_BLOB

250

MYSQL_TYPE_LONG_BLOB

251

MYSQL_TYPE_BLOB

252

MYSQL_TYPE_VAR_STRING

253

MYSQL_TYPE_STRING

254

MYSQL_TYPE_GEOMETRY

255

Mappings between Oracle data types and dataTypeNumber values

Oracle data type

Value of dataTypeNumber

VARCHAR2/NVARCHAR2

1

NUMBER/FLOAT

2

LONG

8

DATE

12

RAW

23

LONG_RAW

24

UNDEFINED

29

XMLTYPE

58

ROWID

69

CHAR and NCHAR

96

BINARY_FLOAT

100

BINARY_DOUBLE

101

CLOB/NCLOB

112

BLOB

113

BFILE

114

TIMESTAMP

180

TIMESTAMP_WITH_TIME_ZONE

181

INTERVAL_YEAR_TO_MONTH

182

INTERVAL_DAY_TO_SECOND

183

UROWID

208

TIMESTAMP_WITH_LOCAL_TIME_ZONE

231

Mappings between PostgreSQL data types and dataTypeNumber values

PostgreSQL data type

Value of dataTypeNumber

INT2/SMALLINT

21

INT4/INTEGER/SERIAL

23

INT8/BIGINT

20

CHARACTER

18

CHARACTER VARYING

1043

REAL

700

DOUBLE PRECISION

701

NUMERIC

1700

MONEY

790

DATE

1082

TIME/TIME WITHOUT TIME ZONE

1083

TIME WITH TIME ZONE

1266

TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE

1114

TIMESTAMP WITH TIME ZONE

1184

BYTEA

17

TEXT

25

JSON

114

JSONB

3082

XML

142

UUID

2950

POINT

600

LSEG

601

PATH

602

BOX

603

POLYGON

604

LINE

628

CIDR

650

CIRCLE

718

MACADDR

829

INET

869

INTERVAL

1186

TXID_SNAPSHOT

2970

PG_LSN

3220

TSVECTOR

3614

TSQUERY

3615