全部產品
Search
文件中心

E-MapReduce:基於Realtime ComputeFlink使用CTAS語句同步MySQL資料至StarRocks

更新時間:Aug 29, 2025

本文為您介紹如何使用阿里雲Realtime ComputeFlink,通過CTAS語句將MySQL資料同步至EMR Serverless StarRocks中。

背景資訊

您可以通過CTAS或CDAS語句將MySQL資料同步至EMR Serverless StarRocks,CTAS可以實現單表的結構和資料同步,CDAS可以實現整庫同步或者同一庫中的多表結構和資料同步。本文使用CTAS語句,CDAS語句的使用方法與CTAS類似,具體請參見CDAS介紹

通過CTAS(CREATE TABLE AS)語句,您可以在StarRocks中自動建立和MySQL中表結構一致的表,並進行資料同步。同時還能即時同步上遊表結構(Schema)的變更到下遊表,提高您在目標儲存中建立表和維護源表結構變更的效率。

當執行CTAS語句時,Flink會按照以下流程執行:

  1. 檢查目標儲存中是否存在該目標表。

    • 如果不存在,則通過目標端Catalog在目標儲存中建立相應的目標表,該目標表具有和資料來源相同的Schema。

    • 如果存在,則跳過建表。如果已存在的目標表與源表Schema不一致,則會報錯提示。

  2. 提交和啟動相應的資料同步作業。同步資料來源的資料以及Schema的變更到目標表中。

表結構變更同步策略通過CTAS語句,在即時同步資料的同時,還能同步源表Schema的變更到目標表中。

Schema變更包括初始表的建立以及未來表的變更。

  • 當前支援同步的Schema變更:

    • 添加可空列:會自動在目標表Schema末尾添加對應的列,並自動同步新增列的資料。

    • 刪除可空列:不會直接在目標表中刪除該列,而是將該列的資料自動填滿為NULL值。

    • 重新命名列:直接在目標表中末尾添加重新命名後的列,並將重新命名前的列資料自動填滿為NULL值。

      例如,如果col_a重新命名為col_b,則會在目標表末尾添加col_b,並自動將col_a的資料填充為NULL值。

  • 暫不支援同步的Schema變更:

    • 資料類型的變更。

      例如,由VARCHAR變為BIGINT,由NOT NULL變為NULLABLE屬性。

    • 主鍵或索引等約束的變更。

    • 非空列的增加或刪除的變更。

說明
  • 如果遇到不支援的Schema變更,則需要您手動刪除下遊目標表,重新啟動CTAS作業,即重新建立目標表並重新同步歷史資料。
  • CTAS不會識別具體的DDL類型,而是對比前後兩條資料的Schema差異。因此,如果您先刪除了某列後,又加回了該列,且這兩個DDL之間無資料變化,則CTAS會認為沒有發生結構變更。同理,如果您添加了一列,直到該表有資料變化,CTAS才會感知到結構變更,才會同步結構變更到目標表。
  • 通過CTAS建表支援的欄位類型資訊,請參見Flink與StarRocks的資料類型映射關係

前提條件

說明

本文以5.7版本的MySQL和vvr-8.0.11-flink-1.17版本的Flink為例介紹。

使用限制

  • 建立的Flink叢集、EMR Serverless StarRocks執行個體以及RDS MySQL執行個體需要在同一個VPC下。

  • RDS MySQL須為5.7及以上版本。

步驟一:準備測試資料

  1. 建立測試的資料庫和帳號,詳情請參見建立資料庫和帳號

    建立完資料庫和帳號後,需要授權測試帳號的讀寫權限。

    說明

    本文建立的資料庫名稱為test_cdc,帳號為test。

  2. 使用建立的測試帳號串連MySQL執行個體,詳情請參見通過DMS登入RDS MySQL

  3. 在MySQL中執行以下命令,建立資料表。

    use test_cdc;
    -- 建立表
    CREATE TABLE IF NOT EXISTS `runoob_tbl`(
       `runoob_id` INT UNSIGNED AUTO_INCREMENT,
       `runoob_title` VARCHAR(100) NOT NULL,
       `runoob_author` VARCHAR(40) NOT NULL,
       `submission_date` DATE,
       `add_col` int DEFAULT NULL,
       PRIMARY KEY ( `runoob_id` )
    )ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    -- 插入資料
    INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2025-06-22 17:13:44',3)
  4. 登入並串連EMR Serverless StarRocks執行個體,詳情請參見通過MySQL用戶端方式串連StarRocks執行個體

  5. 執行以下命令,建立資料庫test_cdc、建立超級管理使用者test(樣本密碼為1qaz!QAZ)或者建立普通使用者test並給普通使用者授予該資料庫及表的許可權,詳情請參見系統管理使用者

    -- 建立資料庫
    CREATE DATABASE test_cdc;
    -- 建立使用者
    CREATE USER 'test' IDENTIFIED by '1qaz!QAZ';
    -- 給使用者授權資料庫許可權
    GRANT ALL on test_cdc to test;
    -- 給使用者授權表許可權
    GRANT ALL ON ALL TABLES IN DATABASE test_cdc to test;

步驟二:在Realtime ComputeFlink控制台通過SQL用戶端建立Catalog

在阿里雲Realtime ComputeFlink控制台的資料管理頁面中,建立MySQL和StarRocks的Catalog。詳情請參見資料管理

說明

參數配置僅供參考,具體內容請根據實際情況配置。

  • MySQL Catalog

    • 程式碼範例

      CREATE CATALOG mysql WITH (
        'type' = 'mysql',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'emr-test',
        'password' = '123456',
        'default-database' = 'test_cdc'
      );
    • 參數配置

      參數

      說明

      type

      類型,固定值為mysql。

      hostname

      RDS的內網地址。您可以在RDS的資料庫連接頁面,單擊內網地址進行複製。例如,rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com。

      port

      MySQL資料庫服務的連接埠號碼,預設值為3306。

      username

      MySQL資料庫服務的使用者名稱。

      填寫步驟一:準備測試資料中帳號的使用者名稱。本樣本為test。

      password

      MySQL資料庫服務的密碼。

      填寫步驟一:準備測試資料中帳號的密碼。

      default-database

      預設的MySQL資料庫名稱。

      填寫步驟一:準備測試資料中建立的資料庫名。本樣本為test_cdc。

  • StarRocks Catalog

    • 程式碼範例

      CREATE CATALOG sr  WITH (
        'type' = 'starrocks',
        'endpoint' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
        'username' = 'test',
        'password' = '1qaz!QAZ',
        'dbname' = 'test_cdc'
      );
    • 參數配置

      參數

      說明

      type

      類型,固定值為starrocks。

      endpoint

      FE的內網地址和查詢連接埠,格式為EMR Serverless StarRocks執行個體FE節點的內網地址:9030

      例如,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。

      說明

      關於如何擷取EMR Serverless StarRocks執行個體FE節點的內網地址,請參見查看執行個體列表與詳情

      username

      StarRocks的使用者名稱。

      填寫步驟一:準備測試資料中帳號的使用者名稱。本樣本為test。

      password

      StarRocks資料庫服務的密碼。

      填寫步驟一:準備測試資料中帳號的密碼。

      dbname

      StarRocks資料庫名稱。

      填寫步驟一:準備測試資料中建立的資料庫名。本樣本為test_cdc。

步驟三:建立並上線作業

  1. 在阿里雲Realtime ComputeFlink控制台的資料開發 > ETL頁面,編寫CTAS語句。

    以下是三種樣本:

    • AtLeast once語義:通過sink.buffer-flush.interval-ms配置項,配置每次寫入StarRocks的時間間隔,優點是寫入間隔時間短,佔用記憶體較少。

      /*
            AtLeast once 語義
      */
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl;
                                      
    • Exactly once語義:需要定義checkpoint間隔,優點是在各種異常情況下保障資料不丟失不重複,缺點是資料可見時間取決於checkpoint間隔。更多資訊,請參見Checkpointing

      /*
            Exactly once 語義。
      */
      set 'execution.checkpointing.interval' = '1 min';
      set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
      set 'execution.checkpointing.timeout' = '10 min';
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.semantic' = 'exactly-once',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl;
                                      
    • Simple模式:優點是建立表時不需要關注原表有哪些欄位,會按照MySQL的表格式照搬過來,開發人員使用比較方便。缺點是不能建立分區,對於需要分區的表,仍需要通過normal模式建立。

      /*
            上面兩個為normal模式,本樣本示範simple模式
      */
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl with (
      'starrocks.create.table.mode'='simple',
       'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl;
                                      

    表 1. WITH參數

    參數

    是否必選

    描述

    starrocks.create.table.properties

    StarRocks建表語句中除了欄位定義以外的其他尾碼定義,例如樣本中的engine、key和buckets等。

    database-name

    StarRocks資料庫名稱。

    本樣本為test_cdc。

    jdbc-url

    用於在StarRocks中執行查詢操作。

    例如,jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。其中,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com為EMR Serverless StarRocks執行個體FE節點的內網地址。

    說明

    關於如何擷取EMR Serverless StarRocks執行個體FE節點的內網地址,請參見查看執行個體列表與詳情

    load-url

    指定FE的內網地址和查詢連接埠,格式為EMR Serverless StarRocks執行個體FE節點的內網地址:8030

    例如,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030。

    說明

    關於如何擷取EMR Serverless StarRocks執行個體FE節點的內網地址,請參見查看執行個體列表與詳情

    sink.semantic

    填寫exactly-once可以保障資料一致性語義,預設為at-least-once。

    starrocks.create.table.mode

    支援以下參數值:

    • normal模式(預設值):必須像樣本一樣在starrocks.create.table.properties配置中填寫engine、key和buckets等完整的配置。

    • simple模式:預設選擇engine為olap,選擇key類型為primary key,且主鍵與MySQL的主鍵保持完全一致,預設distributed by hash(所有的主鍵),預設無分區。需要在starrocks.create.table.properties配置中填寫的必填內容為buckets ,選填內容為properties等配置。

    sink.properties.row_delimiter

    自訂行分隔字元。

    sink.properties.column_separator

    自訂欄分隔字元。

    說明
    • 因為vvr-6.0.5-flink-1.15及以上版本移除了sink.use.new-api,所以使用vvr-6.0.5-flink-1.15之前的版本時,請在with參數中添加'sink.use.new-api'='false',

    • 其他配置請參見Continuously load data from Apache Flink

    表 2. OPTIONS參數

    參數

    描述

    connector

    類型,固定值為mysql-cdc。

    hostname

    RDS的內網地址。

    您可以在RDS的資料庫連接頁面,單擊內網地址進行複製。例如,rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。

    port

    MySQL資料庫服務的連接埠號碼,預設值為3306。

    username

    MySQL資料庫服務的使用者名稱。

    填寫步驟一:準備測試資料中帳號的使用者名稱。本樣本為test。

    password

    MySQL資料庫服務的密碼。

    填寫步驟一:準備測試資料中帳號的密碼。

    table-name

    StarRocks中的表名稱。

    填寫步驟一:準備測試資料中建立的表名。本樣本為runoob_tbl。

    database-name

    預設的MySQL資料庫名稱。

    填寫步驟一:準備測試資料中建立的資料庫名。本樣本為test_cdc。

  2. 單擊部署

  3. 在部署新版本頁面選擇好部署目標,單擊確定

  4. 在作業營運頁面,單擊目標作業操作列的啟動

說明

阿里雲Realtime ComputeFlink控制台不支援調試CTAS語句。

步驟四:驗證資料同步結果

查詢資料

  1. 登入並串連EMR Serverless StarRocks執行個體,詳情請參見通過MySQL用戶端方式串連StarRocks執行個體

  2. 在StarRocks串連視窗執行以下命令,查看錶資料。

    use test_cdc;
    select * from runoob_tbl;

    返回資訊如下,表示MySQL上的資料已同步至StarRocks。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | first        | tom           | 2025-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

插入資料同步

  1. 在RDS資料庫視窗執行以下命令,插入資料。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`)  values(1,'second','tom2','2022-06-23',1)
  2. 在StarRocks串連視窗執行以下命令,查看錶資料。

    select * from runoob_tbl;

    返回資訊如下,表示資料已成功插入。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2025-06-23      |       1 |
    |        18 | first        | tom           | 2025-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

更新資料同步

  1. 在RDS資料庫視窗執行以下命令,更新指定資料。

    update runoob_tbl set runoob_title= 'new' where runoob_id = 18
  2. 在StarRocks串連視窗執行以下命令,查看錶資料。

    select * from runoob_tbl;

    返回資訊如下,表示資料已同步更新。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2025-06-23      |       1 |
    |        18 | new          | tom           | 2025-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

刪除資料同步

  1. 在RDS資料庫視窗執行以下命令,刪除指定資料。

    DELETE FROM runoob_tbl WHERE runoob_id = 1
  2. 在StarRocks串連視窗執行以下命令,查看錶資料。

    select * from runoob_tbl;

    返回資訊如下,表示資料已同步刪除。

    +-----------+--------------+---------------+-----------------+---------+ 
    | runoob_id | runoob_title | runoob_author | submission_date | add_col | 
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | new          | tom           | 2025-06-22      |       3 | 
    +-----------+--------------+---------------+-----------------+---------+

增加可空列同步

  1. 在RDS資料庫視窗執行以下命令,增加可空列。

    alter table `runoob_tbl` add COLUMN `add_col2` INT;
  2. 執行以下命令 ,插入資料。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`)  values(1,'second','tom2','2022-06-23',1,2)
  3. 在StarRocks串連視窗執行以下命令,查看錶資料。

    select * from runoob_tbl;

    返回資訊如下,表示Schema已經成功變更。

    +-----------+--------------+---------------+-----------------+---------+----------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 | 
    +-----------+--------------+---------------+-----------------+---------+----------+ 
    |        18 | new          | tom           | 2025-06-22      |       3 |     NULL |
    +-----------+--------------+---------------+-----------------+---------+----------+ 
    |         1 | second       | tom2          | 2025-06-23      |       1 |      2   |
    +-----------+--------------+---------------+-----------------+---------+----------+ 

CDAS介紹

CDAS是CTAS的一個文法糖。通過CDAS語句,可以實現MySQL中的整庫同步,即產生一個Flink Job,源表是MySQL中的Database,目標表是StarRocks中對應的多張表,同時可以使用including table文法,只選擇一個Database中的部分表進行CDAS操作。

與CTAS的執行相同,需要在建立MySQL和StarRocks相應的Catalog後,執行CDAS語句。建立文法樣本如下。

CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
 as DATABASE mysql.test_cdc including table
 'tabl1','tbl2','tbl3';

相關文檔

Realtime ComputeFlink不僅支援CTAS語句同步資料至StarRocks,還支援通過資料攝入YAML方式同步資料至StarRocks。詳情請參見資料攝入