全部產品
Search
文件中心

Realtime Compute for Apache Flink:管理MySQL Catalog

更新時間:Sep 10, 2025

建立MySQL Catalog後,您可以在Flink控制台直接存取Catalog下MySQL執行個體中的表,並在Flink SQL作業中使用。本文為您介紹如何建立及使用MySQL Catalog。

背景資訊

MySQL Catalog具有以下功能特點:

  • 直接存取MySQL執行個體中的表,無需通過DDL語句手動註冊MySQL表,提升開發效率和正確性。

  • MySQL Catalog提供的表可以直接作為Flink SQL作業中的MySQL CDC源表、MySQL結果表和MySQL維表。

  • 支援RDS MySQL、PolarDB MySQL或自建MySQL。

  • 支援直接存取分庫分表邏輯表。

  • 支援配合CDAS和CTAS文法完成基於MySQL資料來源的整庫同步、分庫分表合并同步、表結構變更同步。

使用限制

  • MySQL與Flink需在相同VPC下,跨VPC或公網訪問時需要打通網路,詳情請參見網路連通性

  • 建立後不支援修改Catalog配置資訊。如需修改,請刪除後重新建立。

  • 僅支援查詢已有資料庫和表,不支援通過Flink建立資料庫和表。

  • 作為源表時僅支援流讀、不支援批讀。

    說明

    MySQL Catalog提供的表作為MySQL CDC源表時,需要在RDS MySQL、PolarDB MySQL或者自建MySQL上開啟Binlog等配置,詳情請參見配置MySQL

  • 無法識別建表語句中使用PolarDB特有文法的表。

    例如PARTITION BY KEY(`idempotent_id`) PARTITIONS 16, UNIQUE KEY `uk_order_id` (`order_id`)。

  • Realtime Compute引擎VVR 8.0.7及以上版本,建立後不支援使用視圖作為Flink的表。

  • MySQL僅支援5.7和8.0.x版本。

建立MySQL Catalog

支援UI與SQL命令兩種方式建立MySQL Catalog。

UI方式(推薦)

  1. 進入資料管理頁面。

    1. 登入Realtime Compute控制台,單擊目標工作空間操作列下的控制台

    2. 在左側導覽列,單擊資料管理

  2. 單擊建立Catalog,選擇MySQL,單擊下一步

  3. 填寫參數配置資訊。

    重要

    Catalog建立完成後不支援修改以下配置資訊。如需修改,請刪除已建立的Catalog後重新建立。

    配置資訊

    參數

    說明

    是否必填

    catalogname

    自訂MySQL Catalog名稱。

    hostname

    MySQL資料庫的IP地址或者Hostname。

    說明

    跨VPC公網訪問時需要打通網路,詳情請參見網路連通性

    port

    MySQL資料庫服務的連接埠號碼,預設值為3306。

    default-database

    預設的MySQL資料庫名稱。

    username

    MySQL資料庫服務的使用者名稱。

    password

    MySQL資料庫服務的密碼。

    為了避免AK明文等風險,建議通過變數方式填寫(圖片樣本使用了名為mysqlpw的變數),詳情請參見新增變數

  4. 單擊確定

    在左側中繼資料地區,可以查看建立的Catalog。

SQL命令

  1. 進入資料查詢頁面。

    1. 登入Realtime Compute控制台,單擊目標工作空間操作列下的控制台

    2. 在左側導覽列,單擊資料開發 > 資料查詢

  2. 單擊image,單擊建立查詢指令碼,填寫檔案名稱儲存位置後,單擊儲存

  3. 填寫如下代碼。

    CREATE CATALOG YourCatalogName WITH(
      'type' = 'mysql',
      'hostname' = 'rm-bp1gcn0q0j0******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'usertest',
      'password' = '${secret_values.mysqlpw}',
      'default-database' = 'flinktest',
      'catalog.table.metadata-columns'='table_name'
    );

    參數

    說明

    是否必填

    YourCatalogName

    自訂MySQL Catalog名稱。

    type

    類型,固定值為mysql。

    hostname

    MySQL資料庫的IP地址或者Hostname。

    說明

    跨VPC公網訪問時需要打通網路,詳情請參見網路連通性

    port

    MySQL資料庫服務的連接埠號碼,預設值為3306。

    default-database

    預設的MySQL資料庫名稱。

    username

    MySQL資料庫服務的使用者名稱。

    password

    MySQL資料庫服務的密碼。

    為了避免AK明文等風險,建議通過變數方式填寫(樣本使用了名為mysqlpw的變數),詳情請參見新增變數

    property-version

    Catalog的參數版本。填寫值為0(預設值)或1(推薦)。

    不同參數版本可用的參數集合和參數的預設值可能不同,區別詳情會在具體參數的說明部分描述。

    說明
    • 僅VVR 8.0.6及以上版本支援配置該參數。

    • VVR 11.1及以上版本預設值為1。

    catalog.table.metadata-columns

    指定擷取資料表時,表的Schema需要添加MySQL CDC源表的中繼資料列。預設不添加中繼資料列。

    多個中繼資料列使用英文分號(;)分隔,例如:op_ts;table_name;database_name

    說明
    • 僅Realtime Compute引擎VVR 6.0.5及以上版本支援該參數。

    • 當配置該參數時,返回的表Schema會額外添加指定的中繼資料列,這些列只適用於MySQL CDC源表,所以該Catalog返回的表只能用作資料來源表,不可以用作結果表或維表。

    catalog.table.treat-tinyint1-as-boolean

    擷取資料表Schema時,是否將MySQL的TinyInt(1)和Boolean類型映射為Flink Boolean類型。參數取值如下:

    • true:映射為Boolean類型。

    • false:映射為TINYINT類型。

    參數預設值:

    • property-version=0時,預設值為true;

    • property-version=1時,預設值為false。

    說明
    • 僅Realtime Compute引擎VVR 8.0.4及以上版本支援配置該參數。

    • 不建議MySQL使用TinyInt(1)儲存0和1以外的數值,請選擇合適的資料類型做映射,參見類型映射

  4. 選中建立Catalog的代碼後,單擊左側程式碼數上的運行

    返回The following statement has been executed successfully!表示建立成功。

    image

查看和刪除MySQL Catalog

UI方式(推薦)

資料管理頁面,單擊Catalog列表下查看已建立的Catalog名稱

  • 查看:單擊目標Catalog名稱對應操作列的查看,查看Catalog下的資料庫和表資訊。

    表結構詳情暫不展示欄位comment資訊。

  • 刪除:單擊目標Catalog名稱對應操作列的刪除

    刪除操作僅刪除建立的Catalog,不會在相應服務中刪除這些表。刪除Catalog後使用該Catalog下表的已運行作業不受影響,但重新部署或啟動時會報錯(無法找到該表),請您謹慎操作。

SQL命令

  1. 資料查詢文本編輯地區,輸入以下命令。

    --查看Catalog下表在Flink中對應的Schema資訊,暫不支援展示欄位comment資訊。
    DESCRIBE `<catalogname>`.`<dbname>`.`<tablename>`;
    
    --刪除Catalog
    DROP CATALOG `<catalogname>`;
    說明

    刪除操作僅刪除建立的Catalog,不會在相應服務中刪除這些表。刪除Catalog後使用該Catalog下表的已運行作業不受影響,但重新部署或啟動時會報錯(無法找到該表),請您謹慎操作。

  2. 選中對應的命令,滑鼠右鍵選擇運行

    image

使用MySQL Catalog

從MySQL源表中讀取資料

INSERT INTO `<othersinktable>`
SELECT ...
FROM `<mysqlcatalog>`.`<dbname>`.`<tablename>` /*+ OPTIONS('server-id' = '6000-6008') */;

    MySQL Catalog作為MySQL CDC源表時,建議使用Table Hints來為作業指定不同的 server-id。如果源表需要多並發讀取,server-id還需要配置成範圍格式,範圍中的server-id個數需要大於等於並發度。

    讀取MySQL分庫分表邏輯表

    MySQL Catalog支援使用Regex,將庫名和表名作為邏輯表名,來讀取分庫分表的資料。

    例如,有一個分庫分表的MySQL資料庫,包括user01、user02和user99等多個表,分散在db01~db10等資料庫中,且所有表的Schema都相互相容,則可以通過如下Regex的庫名表名訪問到所有user的分庫分表。

    SELECT ... FROM `db.*`.`user.*` /*+ OPTIONS('server-id'='6000-6018') */;

    分庫分表的邏輯表會返回額外的_db_name (STRING) 和_table_name (STRING)兩個系統欄位,且這兩個欄位與原分表的主鍵會作為邏輯表的新聯合主鍵以保證主鍵的唯一性。如果user01~user99的主鍵均為id,則user邏輯表的聯合主鍵為(_db_name, _table_name, id)。

    MySQL Catalog支援結合Regex匹配所要同步的多張表,實現分庫分表合并同步,具體樣本請參見分庫分表合并同步

    使用CTAS和CDAS即時同步MySQL資料變更和結構變更

    CTAS支援單表同步、表結構變更同步、分庫分表合并同步、自訂計算列同步,支援新增CTAS語句加入資料同步作業,具體樣本及詳情請參見CREATE TABLE AS(CTAS)語句。CDAS支援整庫層級的表結構和資料的即時同步,以及表結構變更的同步,詳情請參見CREATE DATABASE AS(CDAS)語句

    -- 單表同步,即時同步表層級的表結構變更和資料變更。
    CREATE TABLE IF NOT EXISTS `<targetcatalog>`.`<targetdbname>`.`<targettablename>`
    WITH (...)
    AS TABLE `<mysqlcatalog>`.`<dbname>`.`<tablename>`
    /*+ OPTIONS('server-id'='6000-6018') */;
    
    -- 整庫同步,即時同步整庫層級的表結構變更和資料變更。
    CREATE DATABASE `<targetcatalog>`.`<targetdbname>` WITH (...)
    AS DATABASE `<mysqlcatalog>`.`<dbname>` INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='6000-6018') */;   

    例如,將MySQL資料同步到Hologres中,樣本及詳情請參見使用Hologres Catalog

    USE CATALOG holocatalog; --指定使用的catalog
    
    CREATE TABLE IF NOT EXISTS holotable  --指定同步後的表名,未填寫資料庫層級時自動同步到catalog下預設的資料庫  
    WITH ('jdbcWriteBatchSize' = '1024')   -- 可選,指定結果表的參數。
    AS TABLE mysqlcatalog.dbmysql.mysqltable   
    /*+ OPTIONS('server-id'='8001-8004') */;  -- 指定mysql-cdc源表的額外參數。

    從MySQL維表中讀取資料

    INSERT INTO `<othersinktable>`
    SELECT ...
    FROM `<othersourcetable>` AS e
    JOIN `<mysqlcatalog>`.`<dbname>`.`<tablename>` FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;

    寫入資料至MySQL表

    INSERT INTO `<mysqlcatalog>`.`<dbname>`.`<tablename>`
    SELECT ...
    FROM `<othersourcetable>`