完成数据订阅通道的配置后,您可以使用flink-dts-connector文件消费通道中的数据,用于Flink客户端消费。本文介绍如何flink-dts-connector文件的使用说明。

注意事项

  • 仅支持Flink客户端使用DataStream API、Table API和SQL。
  • 如您的Flink客户端使用Table API和SQL,则单次配置时仅支持消费单张表的数据,如需消费多张表的数据,您需进行多次配置独立的任务。

操作步骤

本文以IntelliJ IDEA软件(Community Edition 2020.1 Windows版本)为例,介绍如何使用flink-dts-connector文件来消费订阅通道中的数据。

  1. 创建新版数据订阅通道,详情请参见创建RDS MySQL数据订阅通道创建PolarDB MySQL数据订阅通道创建Oracle数据订阅通道
  2. 创建一个或多个消费组,详情请参见新增消费组
  3. 下载flink-dts-connector文件并解压。
  4. 运行IntelliJ IDEA工具,然后单击Open or Import
    打开工程
  5. 在弹出的对话框中,定位至flink-dts-connector文件所在目录,依次展开文件夹,找到项目对象模型文件:pom.xml
    pom模型
  6. 在弹出对话框中,选择Open as Project
  7. pom.xml文件中添加如下依赖:
    <dependency>
          <groupId>com.alibaba.flink</groupId>
          <artifactId>flink-dts-connector</artifactId>
          <version>1.1.1-SNAPSHOT</version>
          <classifier>jar-with-dependencies</classifier>
    </dependency>
  8. 在IntelliJ IDEA软件界面,依次展开文件夹,并根据您所使用的Flink Connector的程序类型,选择对应的Java文件。
    • 如Flink客户端类型为DataStream API,您需双击打开flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java文件,并执行如下操作:
      1. 在IntelliJ IDEA软件界面的顶部,单击如下图标。run图标
      2. 在弹跳框中单击DtsExample > Editedit
      3. 在弹跳框的Program arguments中,按如下示例输入参数及对应的值,并单击下方的Run,启动flink-dts-connector。
        说明 具体参数说明及查询方式,请参见 参数说明
        --broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
      4. 运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。数据变更信息(DataStream)
        说明 如需查询数据变更的具体记录,您可登录Flink客户端的Task Manager界面进行查看。
    • 如Flink客户端类型为Table API和SQL,您需双击打开flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java文件,并执行如下操作:
      说明 单个 DtsTableISelectTCaseTest.java文件,仅支持配置并消费单张表的订阅数据。如需配置并消费多张表中的数据,您需要重复配置,并运行多个独立任务。
      1. 在如下位置添加前导字符//,注释该行代码信息。注释掉一行
      2. 设置所需消费的单张表的信息,支持使用SQL语句。
      3. 设置订阅通道参数,具体参数说明及查询方式,请参见参数说明table api和sql的参数配置
      4. 在IntelliJ IDEA软件界面的顶部,单击Run'DtsTableISelectTCaseTest',启动flink-dts-connector。
      5. 运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。tableapi和sql-数据变更信息
        说明 如需查询数据变更的具体记录,您可登录Flink客户端的Task Manager界面进行查看。

参数说明

DstExample文件中的参数 DtsTableISelectTCaseTest文件中的参数 说明 查询方式
broker-url dts.server 数据订阅通道的网络地址及端口号信息。
说明 如果您部署的Flink所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。
在DTS控制台单击目标订阅实例ID,在订阅配置页面,您可以获取到订阅Topic、网络地址及端口号信息。订阅配置
topic topic 数据订阅通道的订阅Topic。
sid dts.sid 消费组ID。 在DTS控制台单击目标订阅实例ID,然后单击数据消费,您可以获取到消费组ID和消费组的账号信息。
说明 消费组账号的密码已在您新建消费组时指定。
数据消费
user dts.user 消费组的账号。
警告 如您未使用本文提供的flink-dts-connector文件,请按照 <消费组的账号>-<消费组ID>的格式设置用户名(例如: dtstest-dtsae******bpv),否则无法正常连接。
password dts.password 该账号的密码。
checkpoint dts.checkpoint 消费位点,即flink-dts-connector消费第一条数据的时间戳,格式为Unix时间戳,例如1624440043。
说明 消费位点信息可用于:
  • 当业务程序中断后,传入已消费位点继续消费数据,防止数据丢失。
  • 在订阅客户端启动时,传入所需的消费位点,调整订阅位点,实现按需消费数据。
消费位点必须在订阅实例的数据范围(如图所示)之内,并需转化为Unix时间戳。数据范围
说明 Unix时间戳转换工具可用搜索引擎获取。
dts.cdc.table.name 订阅对象。仅支持传入单张表,且格式为<数据库名称>.<表名称>,例如dtstestdata.order 在DTS控制台单击目标订阅实例ID,在订阅配置页面,单击右上方的查看订阅对象,查询订阅对象所属数据库和表。

常见问题

报错提示 可能的原因 解决方式
Cluster changed from *** to ***, consumer require restart.
DTS用于读取增量数据的模块DStore发生切换,导致Flink客户端的消费位点丢失。 您无需重启客户端,仅需查询客户端的消费位点,并在DtsExample.javaDtsTableISelectTCaseTest.java文件中重新传入消费位点checkpointdts.checkpoint,即可重新消费订阅数据。