全部产品
Search
文档中心

数据传输服务 DTS:使用SDK消费PolarDB-X 1.0订阅数据

更新时间:Mar 21, 2025

完成数据订阅任务后,您可以使用DTS提供的SDK来订阅数据变更信息。本文介绍通过SDK代码消费分布式订阅数据,支持的数据源包括PolarDB-X 1.0和DMS LogicDB。

前提条件

注意事项

  • 在消费订阅数据时,您需要调用DefaultUserRecord的commit方法以提交位点信息,否则会导致数据重复消费。

  • 不同的消费之间是相互独立的。

操作步骤

  1. 下载并解压SDK示例代码

  2. 确认SDK代码的版本。

    1. 定位至SDK示例代码解压的目录。

    2. 使用文本编辑工具打开目录中的pom.xml文件。

    3. 将数据订阅SDK的版本(version)修改为最新版本。

      说明

      您可以在dts-new-subscribe-sdk页面查看最新Maven依赖。

      SDK版本参数的位置(单击展开)

      <name>dts-new-subscribe-sdk</name>
      <url>https://www.aliyun.com/product/dts</url>
      <description>The Aliyun new Subscribe SDK for Java used for accessing Data Transmission Service</description>
      <packaging>jar</packaging>
      <groupId>com.aliyun.dts</groupId>
      <artifactId>dts-new-subscribe-sdk</artifactId>
      <version>2.1.4</version>
  3. 编辑SDK代码。

    1. 使用编码软件打开解压后的文件。

    2. 根据SDK客户端的使用模式,打开对应模式的Java文件DistributedDTSConsumerDemo.java

      说明

      Java文件的路径为aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/

    3. 设置Java代码中的参数。

      public static void main(String[] args) throws ClientException {
              //分布式类型数据源的订阅配置方式,例如PolarDBX10(原DRDS)。配置AccessKey、实例Id、主任务id,订阅消费组等相关信息。
              String accessKeyId = "LTA***********99reZ";
              String accessKeySecret = "****************";
              String regionId = "cn-hangzhou";
              String dtsInstanceId = "dtse5212sed162****";
              String jobId = "l791216x16d****";
              String sid = "dtsip412t13160****";
              String userName = "xftest";
              String password = "******";
              String proxyUrl = "dts-cn-****.com:18001";
              // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019))
              String checkpoint = "1639620090";
      
              // Convert physical database/table name to logical database/table name
              boolean mapping = true;
              // if force use config checkpoint when start. for checkpoint reset, only assign mode works
              boolean isForceUseInitCheckpoint = false;
      
              ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN;
              DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId,
                      jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl,
                      checkpoint, isForceUseInitCheckpoint, mapping);
              demo.start();
          }

      参数

      说明

      获取方法

      accessKeyId

      访问密钥ID。

      获取方法请参见获取AccessKey

      accessKeySecret

      访问密钥ID的密码。

      regionId

      数据订阅任务的地域ID。

      在DTS控制台单击目标订阅实例ID,在基本信息页面,您可以获取地域信息,例如:地域为华东1(杭州),代码中需要填写为cn-hangzhou,详情请参见地域列表

      dtsInstanceId

      数据订阅任务实例ID。

      在DTS控制台单击目标订阅实例ID,在基本信息页面,您可以获取数据订阅实例的任务实例ID

      jobId

      数据订阅任务ID。

      您可以调用DescribeDtsJobs接口获取数据订阅任务ID(DtsJobId)。

      sid

      消费组ID。

      在DTS控制台单击目标订阅实例ID,然后单击左侧导航栏的数据消费,您可以获取到消费组ID/名称和消费组的账号信息。

      说明

      消费组账号的密码已在您新建消费组时指定。

      userName

      消费组的账号。

      password

      消费组账号的密码。

      proxyUrl

      数据订阅通道的网络地址及端口号信息。

      说明
      • 如果您部署SDK客户端所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。

      • 不建议使用公网地址。

      在DTS控制台单击目标订阅实例ID,在基本信息页面,您可以获取到网络信息。

      checkpoint

      消费位点,即SDK客户端消费第一条数据的时间戳,格式为Unix时间戳,使用秒级时间戳。

      说明

      消费位点信息可用于:

      • 当业务程序中断后,传入已消费位点继续消费数据,防止数据丢失。

      • 在订阅客户端启动时,传入所需的消费位点,调整订阅位点,实现按需消费数据。

      消费位点必须在订阅实例的数据范围之内,并需转化为Unix时间戳。

      说明
      • 您可以在订阅任务列表的数据范围列,查看订阅实例的数据范围。

      • Unix时间戳转换工具可通过搜索引擎获取。

  4. 可选:如果您需要修改订阅数据的数据类型,可以在buildRecordListener()方法中进行修改或者自定义类。

    public static Map<String, RecordListener> buildRecordListener() {
            // user can impl their own listener
            RecordListener mysqlRecordPrintListener = new RecordListener() {
                @Override
                public void consume(DefaultUserRecord record) {
    
                    OperationType operationType = record.getOperationType();
    
                    if (operationType.equals(OperationType.INSERT)
                            || operationType.equals(OperationType.UPDATE)
                            || operationType.equals(OperationType.DELETE)
                            || operationType.equals(OperationType.HEARTBEAT)) {
    
                        // consume record
                        RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL);
    
                        recordPrintListener.consume(record);
    
                        //commit method push the checkpoint update
                        record.commit("");
                    }
                }
            };
            return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener);
        }
  5. 打开编码软件的项目结构,确保此项目的OpenJDK版本为1.8。

  6. 运行该客户端代码。

    • 运行结果中显示该客户端可正常订阅到源库的数据变更信息。

    • SDK客户端每隔一定时间会统计并显示消费数据的信息,包括数据发送和接收时数据总数、数据总量、每秒请求数接收RPS等。

      表 1. 消费数据的统计信息

      参数

      说明

      outCounts

      SDK客户端所消费的数据总数。

      outBytes

      SDK客户端所消费的数据总量,单位为Byte。

      outRps

      SDK客户端消费数据时的每秒请求数。

      outBps

      SDK客户端消费数据时每秒传送的比特数。

      count

      暂无。

      inBytes

      DTS服务器发送的数据总量,单位为Byte。

      DStoreRecordQueue

      DTS服务器发送数据时,当前数据缓存队列的大小。

      inCounts

      DTS服务器发送数据总数。

      inRps

      DTS服务器发送数据时的每秒请求数。

      inBps

      DTS服务器发送数据时每秒传送的比特数。

      __dt

      SDK客户端接收到数据的当前时间戳,单位为毫秒。

      DefaultUserRecordQueue

      序列化后,当前数据缓存队列的大小。