全部產品
Search
文件中心

Data Transmission Service:使用flink-dts-connector消費訂閱資料

更新時間:May 26, 2026

完成資料訂閱通道的配置後,您可以使用flink-dts-connector檔案消費通道中的資料,用於Flink用戶端消費。本文介紹flink-dts-connector檔案的使用說明。

注意事項

  • 僅支援Flink用戶端使用DataStream API、Table API和SQL。

  • 如您的Flink用戶端使用Table API和SQL,則單次配置時僅支援消費單張表的資料,如需消費多張表的資料,您需進行多次配置獨立的任務。

操作步驟

本文以IntelliJ IDEA軟體(Community Edition 2020.1 Windows版本)為例,介紹如何使用flink-dts-connector檔案來消費訂閱通道中的資料。

  1. 建立新版資料訂閱通道,詳情請參見建立RDS MySQL資料訂閱通道建立PolarDB MySQL版資料訂閱通道建立Oracle資料訂閱通道

  2. 建立一個或多個消費組,詳情請參見新增消費組

  3. 下載flink-dts-connector檔案並解壓。

  4. 運行IntelliJ IDEA工具,然後單擊Open or Import

  5. 在彈出的對話方塊中,定位至flink-dts-connector檔案所在目錄,依次展開檔案夾,找到專案物件模型檔案:pom.xml

  6. 在彈出對話方塊中,選擇Open as Project

  7. pom.xml檔案中添加如下依賴:

    <dependency>
          <groupId>com.alibaba.flink</groupId>
          <artifactId>flink-dts-connector</artifactId>
          <version>1.1.1-SNAPSHOT</version>
          <classifier>jar-with-dependencies</classifier>
    </dependency>
  8. 在IntelliJ IDEA軟體介面,依次展開檔案夾,並根據您所使用的Flink Connector的程式類型,選擇對應的Java檔案。

    • 如Flink用戶端類型為DataStream API,您需雙擊開啟flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java檔案,並執行如下操作:

      1. 在頂部功能表列中,選擇 Run > Run...

      2. 在彈跳框中單擊DtsExample > Edit

      3. 在彈跳框的Program arguments中,按如下樣本輸入參數及對應的值,並單擊下方的Run,啟動flink-dts-connector。

        說明

        具體參數說明及查詢方式,請參見參數說明

        --broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
      4. 運行結果表明,該用戶端可正常訂閱到源庫的資料變更資訊。啟動後,DataStream中輸出的資料變更記錄樣本如下。

        LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006303@289540@2688@1625045211000]}
        LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006307@290047@2688@1625045212000]}
        LazyParseRecord {operationType [UPDATE], checkpoint [0@12006305@290016@2688@1625045212000]}
        LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006308@290047@2688@1625045214000]}
        LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006309@290047@2688@1625045215000]}
        LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006310@290047@2688@1625045216000]}
        LazyParseRecord {operationType [HEARTBEAT], checkpoint [0@12006311@290047@2688@1625045217000]}
        說明

        如需查詢資料變更的具體記錄,您可登入Flink用戶端的Task Manager介面進行查看。

    • 如Flink用戶端類型為Table API和SQL,您需雙擊開啟flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java檔案,並執行如下操作:

      說明

      單個DtsTableISelectTCaseTest.java檔案,僅支援配置並消費單張表的訂閱資料。如需配置並消費多張表中的資料,您需要重複配置,並運行多個獨立任務。

      1. 在如下位置添加前置字元//,注釋該行代碼資訊。注釋掉 properties.load 程式碼(在行首添加 //)。

        /*將平台頁面中設定的參數值載入到Properties對象中。*/
        //properties.load(new StringReader(new String(Files.readAllBytes(Paths.get(configFilePath)), StandardCharsets.UTF_8)));
      2. 設定所需消費的單張表的資訊,支援使用SQL語句。

      3. 設定訂閱通道參數,具體參數說明及查詢方式,請參見參數說明

        public static void main(String[] args) throws Exception {
            setup(args);
            final String createTable =
                    "create table `dts` (\n"
                    + " `ts` TIMESTAMP(3) METADATA FROM 'timestamp',\n"
                    + " `id` bigint,\n"
                    + " `name` varchar,\n"
                    + " `age` bigint,\n"
                    + " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND"
                    + ") with (\n"
                    + "'connector' = 'dts',"
                    + "'dts.server' = 'dts-cn-xxx:18001',"
                    + "'topic' = 'cn_hangzhou_rm_xxx_dtstest_version2',"
                    + "'dts.sid' = 'dtsxxx', "
                    + "'dts.user' = 'dtstest', "
                    + "'dts.password' = 'xxx',"
                    + "'dts.checkpoint' = '1624440043', "
                    + "'dts-cdc.table.name' = 'dtstestdata.order',"
                    + "'format' = 'dts-cdc')";
        }
      4. 在IntelliJ IDEA軟體介面的頂部,單擊Run'DtsTableISelectTCaseTest',啟動flink-dts-connector。

      5. 運行結果表明,該用戶端可正常訂閱到源庫的資料變更資訊。啟動後,終端輸出資料變更(changelog)記錄,更新前(-U)和更新後(+U)成對出現,樣本如下。

        ######> (false,-U(2021-06-23T20:32:17.391,null,null,null))
        ######> (true,+U(2021-06-23T20:32:17.391,null,null,null))
        ######> (false,-U(2021-06-23T20:32:45.604,null,null,null))
        ######> (true,+U(2021-06-23T20:32:45.604,null,null,null))
        ######> (false,-U(2021-06-30T17:26:52.201,null,null,null))
        ######> (true,+U(2021-06-30T17:26:52.201,null,null,null))
        ######> (false,-U(2021-06-30T19:19:26.975,null,null,null))
        ######> (true,+U(2021-06-30T19:19:26.975,null,null,null))
        說明

        如需查詢資料變更的具體記錄,您可登入Flink用戶端的Task Manager介面進行查看。

參數說明

DstExample檔案中的參數

DtsTableISelectTCaseTest檔案中的參數

說明

查詢方式

broker-url

dts.server

資料訂閱通道的網路地址及連接埠號碼資訊。

說明
  • 如果您部署的Flink所屬的ECS執行個體與資料訂閱通道屬於傳統網路或同一專用網路,建議通過內網地址進行資料訂閱,網路延遲最小。

  • 鑒於網路穩定性因素,不建議使用公網地址。

在DTS控制台單擊目標訂閱執行個體ID,在訂閱配置頁面,您可以擷取到訂閱Topic、網路地址及連接埠號碼資訊。

topic

topic

資料訂閱通道的訂閱Topic。

sid

dts.sid

消費組ID。

在DTS控制台單擊目標訂閱執行個體ID,然後單擊數據消費,您可以擷取到消費組ID和消費組的帳號資訊。

說明

消費組帳號的密碼已在您建立消費組時指定。

user

dts.user

消費組的帳號。

警告

如您未使用本文提供的flink-dts-connector檔案,請按照<消費組的帳號>-<消費組ID>的格式設定使用者名稱(例如:dtstest-dtsae******bpv),否則無法正常串連。

password

dts.password

該帳號的密碼。

checkpoint

dts.checkpoint

消費位點,即flink-dts-connector消費第一條資料的時間戳記,格式為Unix時間戳記,例如1624440043。

說明

消費位點資訊可用於:

  • 當業務程式中斷後,傳入已消費位點繼續消費資料,防止資料丟失。

  • 在訂閱用戶端啟動時,傳入所需的消費位點,調整訂閱位點,實現按需消費資料。

消費位點必須在訂閱執行個體的資料範圍之內,並需轉換為Unix時間戳記。在 DTS 訂閱任務詳情頁面,可查看資料範圍欄位所顯示的起止時間,消費位點需設定在該時間範圍內。

說明

Unix時間戳記轉換工具可通過搜尋引擎擷取。

dts-cdc.table.name

訂閱對象。僅支援傳入單張表,且格式要求如下:

  • 當資料庫類型為MySQLPolarDB for MySQLPolarDB-X 1.0PolarDB-X 2.0時:格式為<資料庫名稱>.<表名稱>

  • 當資料庫為其他類型時:格式為<Schema名稱>.<表名稱>

在DTS控制台單擊目標訂閱執行個體ID,在訂閱配置頁面,單擊右上方的查看訂閱對象,查詢訂閱對象所屬資料庫和表。

常見問題

報錯提示

可能的原因

解決方式

Cluster changed from *** to ***, consumer require restart.

DTS用於讀取增量資料的模組DStore發生切換,導致Flink用戶端的消費位點丟失。

您無需重啟用戶端,僅需查詢用戶端的消費位點,並在DtsExample.javaDtsTableISelectTCaseTest.java檔案中重新傳入消費位點checkpointdts.checkpoint,即可重新消費訂閱資料。