新版数据订阅支持使用0.11版本至2.0版本的Kafka客户端消费订阅数据,DTS为您提供了Kafka客户端Demo,本文将介绍该客户端的使用说明。
注意事项
- 使用本文提供的Demo消费数据时,如果采用auto commit(自动提交),可能会因为数据还没被消费完就执行了提交操作,从而丢失部分数据,建议采用手动提交的方式以避免该问题。
说明 如果发生故障没有提交成功,重启客户端后会从上一个记录的位点进行数据消费,期间会有部分重复数据,您需要手动过滤。
- 数据以Avro序列化存储,详细格式请参见Record.avsc文档。
警告 如果您使用的不是本文提供的Kafka客户端,在进行反序列化解析时,可能出现解析的数据有误,您需要自行验证数据的正确性。
- 关于
offsetFotTimes
接口,DTS的搜索单位为秒,原生Kafka的搜索单位为毫秒。
Kafka客户端运行流程说明
请下载Kafka客户端Demo代码。更多关于代码使用的详细介绍,请参见Demo中的Readme文档。
说明 如需使用Kafka客户端2.0版本,您需要修改subscribe_example-master/javaimpl/pom.xml文件,将kafka客户端的版本号修改成2.0.0。

步骤 | 相关目录或文件 |
---|---|
1、使用原生的Kafka consumer从数据订阅通道中获取增量数据。 | subscribe_example-master/javaimpl/src/main/java/recordgenerator/ |
2、将获取的增量数据镜像执行反序列化,并从中获取 前镜像 、 后镜像 和其他属性。 | subscribe_example-master/javaimpl/src/main/java/boot/MysqlRecordPrinter.java |
3、将反序列化后的数据中的dataTypeNumber字段转换为对应的MySQL字段类型。
说明 关于对应关系的详情,请参见文末的MySQL字段类型与dataTypeNumber数值的对应关系。
|
subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/ |
操作步骤
本文以IntelliJ IDEA软件(Community Edition 2018.1.4 Windows版本)为例,介绍如何运行该客户端消费订阅通道中的数据。
执行结果
运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。

您也可以去除NotifyDemo.java文件中的打印日志详情的注释(即删除第25行//log.info(ret);
中的//
),然后再次运行该客户端即可查看详细的数据变更信息。

常见问题
- Q:为什么需要自行记录客户端的消费位点?
A:由于DTS记录的消费位点是接收到Kafka消费客户端执行commit操作的时间点,可能与当前实际消费到的时间点存在一定的时间差。当业务程序或Kafka消费客户端异常中断后,您可以传入自行记录的消费位点以继续消费,避免消费到重复的数据或缺失部分数据。
MySQL字段类型与dataTypeNumber数值的对应关系
MySQL字段类型 | 对应dataTypeNumber数值 |
---|---|
MYSQL_TYPE_DECIMAL | 0 |
MYSQL_TYPE_INT8 | 1 |
MYSQL_TYPE_INT16 | 2 |
MYSQL_TYPE_INT32 | 3 |
MYSQL_TYPE_FLOAT | 4 |
MYSQL_TYPE_DOUBLE | 5 |
MYSQL_TYPE_NULL | 6 |
MYSQL_TYPE_TIMESTAMP | 7 |
MYSQL_TYPE_INT64 | 8 |
MYSQL_TYPE_INT24 | 9 |
MYSQL_TYPE_DATE | 10 |
MYSQL_TYPE_TIME | 11 |
MYSQL_TYPE_DATETIME | 12 |
MYSQL_TYPE_YEAR | 13 |
MYSQL_TYPE_DATE_NEW | 14 |
MYSQL_TYPE_VARCHAR | 15 |
MYSQL_TYPE_BIT | 16 |
MYSQL_TYPE_TIMESTAMP_NEW | 17 |
MYSQL_TYPE_DATETIME_NEW | 18 |
MYSQL_TYPE_TIME_NEW | 19 |
MYSQL_TYPE_JSON | 245 |
MYSQL_TYPE_DECIMAL_NEW | 246 |
MYSQL_TYPE_ENUM | 247 |
MYSQL_TYPE_SET | 248 |
MYSQL_TYPE_TINY_BLOB | 249 |
MYSQL_TYPE_MEDIUM_BLOB | 250 |
MYSQL_TYPE_LONG_BLOB | 251 |
MYSQL_TYPE_BLOB | 252 |
MYSQL_TYPE_VAR_STRING | 253 |
MYSQL_TYPE_STRING | 254 |
MYSQL_TYPE_GEOMETRY | 255 |