新版数据订阅支持使用0.11版本至2.0版本的Kafka客户端消费订阅数据,DTS为您提供了Kafka客户端Demo,本文将介绍该客户端的使用说明。

注意事项

  • 使用本文提供的Demo消费数据时,如果采用auto commit(自动提交),可能会因为数据还没被消费完就执行了提交操作,从而丢失部分数据,建议采用手动提交的方式以避免该问题。
    说明 如果发生故障没有提交成功,重启客户端后会从上一个记录的位点进行数据消费,期间会有部分重复数据,您需要手动过滤。
  • 数据以Avro序列化存储,详细格式请参见Record.avsc文档。
    警告 如果您使用的不是本文提供的Kafka客户端,在进行反序列化解析时,可能出现解析的数据有误,您需要自行验证数据的正确性。
  • 关于offsetForTimes接口,DTS的搜索单位为秒,原生Kafka的搜索单位为毫秒。
  • 由于数据订阅服务端会因容灾等原因导致网络闪断,若您未使用本文提供的Kafka客户端,您使用的Kafka客户端需具备网络重试能力。

Kafka客户端运行流程说明

请下载Kafka客户端Demo代码。更多关于代码使用的详细介绍,请参见Demo中的Readme文档。

说明
  • 单击code,然后选择Download ZIP下载文件。
  • 如需使用Kafka客户端2.0版本,您需要修改subscribe_example-master/javaimpl/pom.xml文件,将kafka客户端的版本号修改成2.0.0。
kafka2.0
表 1. 运行流程说明
步骤 相关目录或文件
1、使用原生的Kafka consumer从数据订阅通道中获取增量数据。 subscribe_example-master/javaimpl/src/main/java/recordgenerator/
2、将获取的增量数据镜像执行反序列化,并从中获取 前镜像后镜像 和其他属性。
警告
  • 如源实例为自建Oracle数据库,则为确保客户端成功消费订阅数据,并保证前后镜像完整性,您需要开启全列补偿日志。
  • 如源实例不为自建Oracle数据库,则DTS暂时不能保证前镜像的完整性,建议您对所获得的前镜像进行校验。
subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java
3、将反序列化后的数据中的dataTypeNumber字段转换为对应数据库的字段类型。 subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/

操作步骤

本文以IntelliJ IDEA软件(Community Edition 2018.1.4 Windows版本)为例,介绍如何运行该客户端消费订阅通道中的数据。

  1. 创建新版数据订阅通道,详情请参见创建RDS MySQL数据订阅通道创建PolarDB MySQL数据订阅通道创建Oracle数据订阅通道
  2. 创建一个或多个消费组,详情请参见新增消费组
  3. 下载Kafka客户端Demo代码,然后解压该文件。
    说明 单击code,然后选择Download ZIP下载文件。
  4. 打开IntelliJ IDEA软件,然后单击Open
    打开项目
  5. 在弹出的对话框中,定位至Kafka客户端Demo代码下载的目录,参照下图依次展开文件夹,找到项目对象模型文件:pom.xml
    打开项目文件
  6. 在弹出对话框中,选择Open as Project
  7. 在IntelliJ IDEA软件界面,依次展开文件夹,找到并双击打开Kafka客户端Demo文件:NotifyDemoDB.java
  8. 设置NotifyDemoDB.java文件中的各参数对应的值。
    设置参数值
    参数 说明 获取方式
    USER_NAME 消费组的账号。
    警告 如您未使用本文提供的客户端,请按照<消费组的账号>-<消费组ID>的格式设置用户名(例如:dtstest-dtsae******bpv),否则无法正常连接。
    在DTS控制台单击目标订阅实例ID,然后单击数据消费,您可以获取到消费组ID和消费组的账号信息。
    说明 消费组账号的密码已在您新建消费组时指定。
    查看消费组和账号
    PASSWORD_NAME 该账号的密码。
    SID_NAME 消费组ID。
    GROUP_NAME 消费组名称,需保持和消费组ID相同(即本参数也填入消费组ID)。
    KAFKA_TOPIC 数据订阅通道的订阅Topic。 在DTS控制台单击目标订阅实例ID,在任务管理页面,您可以获取到订阅Topic、网络地址信息。获取topic和网络信息
    KAFKA_BROKER_URL_NAME 数据订阅通道的网络地址信息。
    说明 如果您部署Kafka客户端所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。
    INITIAL_CHECKPOINT_NAME 消费的数据时间点,格式为Unix时间戳,例如1592269238。
    说明 您需要自行保存时间点信息,以便:
    • 当业务程序中断后,传入已消费的数据时间点继续消费数据,防止数据丢失。
    • 在订阅客户端启动时,传入所需的消费位点,调整订阅位点,实现按需消费数据。
    消费的数据时间点必须在订阅实例的数据范围(如图示)之内,并需转化为Unix时间戳。数据范围
    说明
    • 数据范围查看方式,请参见查看订阅数据
    • Unix时间戳转换工具可用搜索引擎获取。
    USE_CONFIG_CHECKPOINT_NAME 默认取值为true,即强制使用指定的数据时间点来消费数据,避免丢失已接收到的但未处理的数据。
    SUBSCRIBE_MODE_NAME 一个消费组下支持同时启动两个及以上Kafka客户端,如需实现该功能,请将所有客户端该参数的值设置为subscribe

    默认值为assign,即不使用该功能,只部署一个客户端。

  9. 在IntelliJ IDEA软件界面的顶部,选择Run > Run运行该客户端。
    说明 首次运行时,软件需要一定时间自动加载相关依赖包并完成安装。

执行结果

运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。

Kafka客户端订阅结果

您也可以去除NotifyDemoDB.java文件中的打印日志详情的注释(即删除第25行//log.info(ret);中的//),然后再次运行该客户端即可查看详细的数据变更信息。

kafka

常见问题

  • Q:为什么需要自行记录客户端的消费位点?

    A:由于DTS记录的消费位点是接收到Kafka消费客户端执行commit操作的时间点,可能与当前实际消费到的时间点存在一定的时间差。当业务程序或Kafka消费客户端异常中断后,您可以传入自行记录的消费位点以继续消费,避免消费到重复的数据或缺失部分数据。

MySQL字段类型与dataTypeNumber数值的对应关系

MySQL字段类型 对应dataTypeNumber数值
MYSQL_TYPE_DECIMAL 0
MYSQL_TYPE_INT8 1
MYSQL_TYPE_INT16 2
MYSQL_TYPE_INT32 3
MYSQL_TYPE_FLOAT 4
MYSQL_TYPE_DOUBLE 5
MYSQL_TYPE_NULL 6
MYSQL_TYPE_TIMESTAMP 7
MYSQL_TYPE_INT64 8
MYSQL_TYPE_INT24 9
MYSQL_TYPE_DATE 10
MYSQL_TYPE_TIME 11
MYSQL_TYPE_DATETIME 12
MYSQL_TYPE_YEAR 13
MYSQL_TYPE_DATE_NEW 14
MYSQL_TYPE_VARCHAR 15
MYSQL_TYPE_BIT 16
MYSQL_TYPE_TIMESTAMP_NEW 17
MYSQL_TYPE_DATETIME_NEW 18
MYSQL_TYPE_TIME_NEW 19
MYSQL_TYPE_JSON 245
MYSQL_TYPE_DECIMAL_NEW 246
MYSQL_TYPE_ENUM 247
MYSQL_TYPE_SET 248
MYSQL_TYPE_TINY_BLOB 249
MYSQL_TYPE_MEDIUM_BLOB 250
MYSQL_TYPE_LONG_BLOB 251
MYSQL_TYPE_BLOB 252
MYSQL_TYPE_VAR_STRING 253
MYSQL_TYPE_STRING 254
MYSQL_TYPE_GEOMETRY 255

Oracle字段类型与dataTypeNumber数值的对应关系

Oracle字段类型 对应dataTypeNumber数值
VARCHAR2/NVARCHAR2 1
NUMBER/FLOAT 2
LONG 8
DATE 12
RAW 23
LONG_RAW 24
UNDEFINED 29
XMLTYPE 58
ROWID 69
CHAR、NCHAR 96
BINARY_FLOAT 100
BINARY_DOUBLE 101
CLOB/NCLOB 112
BLOB 113
BFILE 114
TIMESTAMP 180
TIMESTAMP_WITH_TIME_ZONE 181
INTERVAL_YEAR_TO_MONTH 182
INTERVAL_DAY_TO_SECOND 183
UROWID 208
TIMESTAMP_WITH_LOCAL_TIME_ZONE 231

PostgreSQL字段类型与dataTypeNumber数值的对应关系

PostgreSQL字段类型 对应dataTypeNumber数值
INT2/SMALLINT 21
INT4/INTEGER/SERIAL 23
INT8/BIGINT 20
CHARACTER 18
CHARACTER VARYING 1043
REAL 700
DOUBLE PRECISION 701
NUMERIC 1700
MONEY 790
DATE 1082
TIME/TIME WITHOUT TIME ZONE 1083
TIME WITH TIME ZONE 1266
TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE 1114
TIMESTAMP WITH TIME ZONE 1184
BYTEA 17
TEXT 25
JSON 114
JSONB 3082
XML 142
UUID 2950
POINT 600
LSEG 601
PATH 602
BOX 603
POLYGON 604
LINE 628
CIDR 650
CIRCLE 718
MACADDR 829
INET 869
INTERVAL 1186
TXID_SNAPSHOT 2970
PG_LSN 3220
TSVECTOR 3614
TSQUERY 3615