本文介紹基於Flink建立Paimon DLF Catalog,讀取MySQL CDC資料並寫入OSS,進一步將中繼資料同步到DLF,進而使用MaxCompute的external schema進行資料湖聯邦查詢。
適用範圍
支援地區
地區名稱
地區ID
華東1(杭州)
cn-hangzhou
華東2(上海)
cn-shanghai
華北2(北京)
cn-beijing
華北3(張家口)
cn-zhangjiakou
華南1(深圳)
cn-shenzhen
中國香港
cn-hongkong
新加坡
ap-southeast-1
德國(法蘭克福)
eu-central-1
MaxCompute、OSS、DLF、Flink必須部署在同一地區。
操作步驟
前置準備
步驟一:授予MaxCompute訪問DLF和OSS的許可權
操作MaxCompute專案的帳號未經授權無法訪問DLF和OSS服務,授權方式包含如下兩種:
步驟二:準備MySQL測試資料
如有其他MySQL測試資料,可跳過此步驟。
登入RDS 控制台。
在左側導覽列,選擇執行個體列表,在左上方選擇地區。
在執行個體列表頁面,單擊目標執行個體ID/名稱,進入執行個體詳情頁。
在左側導覽列,單擊資料庫管理。
單擊建立資料庫。配置如下參數:
參數
是否必填
說明
樣本
資料庫(DB)名稱
必填
長度為2~64個字元。
以字母開頭,以字母或數字結尾。
由小寫字母、數字、底線或中劃線組成。
資料庫名稱在執行個體內必須是唯一的。
資料庫名稱中如果包含
-,建立出的資料庫的檔案夾的名字中的-會變成@002d。
mysql_paimon支援字元集
必填
請按需選擇字元集。
utf8授權帳號
選填
選中需要訪問本資料庫的帳號。本參數可以留空,建立資料庫後再綁定帳號。
此處僅會顯示普通帳號。高許可權帳號擁有所有資料庫的所有許可權,無需授權。
預設備忘說明
選填
用於備忘該資料庫的相關資訊,便於後續資料庫管理,最多支援256個字元。
建立flink測試庫。單擊登入資料庫,在左側導覽列選擇資料庫執行個體,雙擊選中已建立的資料庫,在右側SQLConsole頁面執行下列語句,建立測試表並寫入測試資料。
如果執行個體存在,但執行個體展開後未找到目標資料庫,可能是:
登入帳號無目標資料庫的存取權限:可前往RDS執行個體詳情頁的帳號管理頁面手動修改帳號許可權或更換登入的資料庫帳號
中繼資料未同步導致目錄無法顯示:請將滑鼠懸浮在目標資料庫所屬執行個體上,單擊執行個體名右側的
按鈕,即可重新整理資料庫列表,顯示目標資料庫。
-- 建立表 CREATE TABLE sales ( id INT NOT NULL AUTO_INCREMENT, year INT NOT NULL, amount DECIMAL(10,2) NOT NULL, product_name VARCHAR(100) NOT NULL, customer_name VARCHAR(100) NOT NULL, order_date DATE NOT NULL, region VARCHAR(50) NOT NULL, status VARCHAR(20) NOT NULL, PRIMARY KEY (id,year) ) PARTITION BY RANGE (year) ( PARTITION p2020 VALUES LESS THAN (2021), PARTITION p2021 VALUES LESS THAN (2022), PARTITION p2022 VALUES LESS THAN (2023), PARTITION p2023 VALUES LESS THAN (2024) ); -- 寫入資料 INSERT INTO sales (year, amount, product_name, customer_name, order_date, region, status) VALUES (2020, 100.00, 'Product A', 'Customer 1', '2020-01-01', 'Region 1', 'Completed'), (2020, 200.00, 'Product B', 'Customer 2', '2020-02-01', 'Region 2', 'Pending'), (2021, 150.00, 'Product C', 'Customer 3', '2021-03-01', 'Region 3', 'Completed'), (2021, 300.00, 'Product D', 'Customer 4', '2021-04-01', 'Region 4', 'Pending'), (2022, 250.00, 'Product E', 'Customer 5', '2022-05-01', 'Region 5', 'Completed'), (2022, 400.00, 'Product F', 'Customer 6', '2022-06-01', 'Region 6', 'Pending'), (2023, 350.00, 'Product G', 'Customer 7', '2023-07-01', 'Region 7', 'Completed'), (2023, 500.00, 'Product H', 'Customer 8', '2023-08-01', 'Region 8', 'Pending'), (2020, 450.00, 'Product I', 'Customer 9', '2020-09-01', 'Region 1', 'Completed'), (2021, 600.00, 'Product J', 'Customer 10', '2021-10-01', 'Region 2', 'Pending');查詢測試表資料。
SELECT * FROM sales;返回結果:

步驟三:準備DLF中繼資料庫
登入OSS控制台,建立Bucket,本樣本中Bucket名為
mc-lakehouse-dlf-oss。詳情請參見建立儲存空間。在Bucket下建立目錄
flink_paimon。登入資料湖構建(DLF)控制台,在左上方選擇地區。
在左側導覽列,選擇。
在元数据管理頁面,單擊数据库頁簽。
在default数据目录下單擊新建数据库。配置如下參數:
參數
是否必填
說明
所属数据目录
必填
樣本中是default資料目錄。
数据库名称:
必填
自訂資料庫名稱,以字母開頭,長度為1-128位,允許字元為a-z、A-Z、0-9_,例如
db_dlf_oss。数据库描述:
選填
自訂描述。
选择路径:
必填
資料庫儲存位置,例如
oss://mc-lakehouse-dlf-oss/flink_paimon/。
步驟四:基於Flink建立Paimon、MySQL catalog
建立Paimon catalog:
登入Flink控制台,在左上方選擇地區。
單擊目標工作空間名稱,然後在左側導覽列,選擇資料管理。
在右側Catalog列表 介面,單擊建立Catalog 。在彈出的建立 Catalog 對話方塊裡,選擇Apache Paimon,單擊下一步 並配置如下參數:
參數
是否必填
說明
metastore
必填
中繼資料存放區類型。本樣本中選擇
dlf。catalog name
必填
選擇需要關聯版本的DLF Catalog,本樣本選擇
v1.0版本。warehouse
必填
OSS服務中所指定的數倉目錄。本樣本中
oss://mc-lakehouse-dlf-oss/flink_paimon/。fs.oss.endpoint
必填
OSS服務的endpoint,例如杭州地區為
oss-cn-hangzhou-internal.aliyuncs.com。fs.oss.accessKeyId
必填
訪問OSS服務所需的Access Key ID。
fs.oss.accessKeySecret
必填
訪問OSS服務所需的Access Key Secret。
dlf.catalog.accessKeyId
必填
訪問DLF服務所需的Access Key ID。
dlf.catalog.accessKeySecret
必填
訪問DLF服務所需的Access Key Secret。
建立MySQL catalog:
登入Flink控制台,在左上方選擇地區。
添加白名單。
單擊目標工作空間對應的操作列詳情。
在彈出的工作空間詳情互動面板中,複製交換器的網段資訊。
登入RDS 控制台。
在左側導覽列,選擇執行個體列表,在左上方選擇地區。
在執行個體列表頁面,單擊目標執行個體ID/名稱,進入執行個體詳情頁。
在左側導覽列,單擊白名單與安全性群組。
在白名單設定頁簽,單擊修改。
在彈出的修改白名單分組對話方塊,組內白名單位置添加複製下來的網段資訊,單擊確定。
登入Flink控制台,在左上方選擇地區。
單擊目標工作空間名稱,然後在左側導覽列,選擇資料管理。
在右側Catalog列表 介面,單擊建立Catalog 。在彈出的建立 Catalog 對話方塊裡,選擇MySQL,單擊下一步 並配置如下參數:
參數
是否必填
說明
catalog name
必填
自訂MySQL Catalog名稱。例如
mysql-catalog。hostname
必填
MySQL資料庫的IP地址或者Hostname。
可登入RDS MySQL控制台,在資料庫執行個體詳情頁,單擊資料庫連接查看資料庫內網地址、外網地址及內網連接埠。
在跨VPC或公網訪問時需要打通網路,詳情請參見網路連通性。
port
預設
串連到伺服器的連接埠,預設為3306。
default database
必填
預設資料庫名稱。例如
mysql_paimon。username
必填
串連MySQL資料庫伺服器時使用的使用者名稱。可登入RDS MySQL控制台,在資料庫執行個體詳情頁,單擊帳號管理查看。
password
必填
串連MySQL資料庫伺服器時使用的密碼。可登入RDS MySQL控制台,在資料庫執行個體詳情頁,單擊帳號管理查看。
步驟五:基於Flink讀MySQL寫Paimon並同步中繼資料到DLF
登入Flink控制台,在左上方選擇地區。
單擊目標工作空間名稱,然後在左側導覽列,選擇。
在作業草稿頁簽,單擊
,建立檔案夾。右鍵檔案夾,選擇建立流作業,在彈出的新增作業草稿對話方塊,填寫檔案名稱並選擇引擎版本。
在檔案中寫入如下CREATE TABLE AS(CTAS)SQL語句。注意根據實際命名修改代碼中的相關命名。
CREATE TABLE IF NOT EXISTS `<dlf_meta_db_name>`.`<OSS_bucket_name>`.`sales` AS TABLE `<mysql_catalog_name>`.`<RDS_mysql_name>`.`sales`; -- 按照本文樣本命名可直接複製下方代碼 CREATE TABLE IF NOT EXISTS `db_dlf_oss`.`flink_paimon`.`sales` AS TABLE `mysql-catalog`.`mysql_paimon`.`sales`;(可選)單擊右上方的深度檢查,確認作業Flink SQL語句中是否存在語法錯誤。
單擊右上方部署,在彈出的部署新版本對話方塊中填寫備忘、作業標籤和部署目標等資訊,然後單擊確定。
單擊目標工作空間名稱,然後在左側導覽列,選擇。
在作業營運頁面,單擊目標作業名稱,進入作業部署詳情頁面。
在目標作業部署詳情頁右上方,單擊啟動,選擇無狀態啟動後,單擊啟動。
查詢Paimon資料。
在左側導覽列,選擇。
在查詢指令碼頁簽,單擊
,建立查詢指令碼。運行如下代碼:
SELECT * FROM `<paimon_catalog_name>`.`flink_paimon`.`sales`;返回結果如下:

進入OSS控制台,查看
mc-lakehouse-dlf-oss/flink_paimon/,產生sales/檔案夾,組建檔案如圖所示:
登入資料湖構建(DLF)控制台,在左上方選擇地區。
在左側導覽列,選擇。
單擊資料庫名
flink_paimon,可查看到已產生的表,如圖所示:
步驟六:MaxCompute建立DLF+OSS外部資料源
登入MaxCompute控制台,在左上方選擇地區。
在左側導覽列,選擇 。
在外部数据源頁面,單擊创建外部数据源。
在彈出的新增外部数据源對話方塊,根據介面提示配置相關參數。參數說明如下:
參數
是否必填
說明
外部数据源类型
必填
選擇DLF+OSS。
外部数据源名称
必填
可自訂命名。命名規則如下:
以字母開頭,且只能包含小寫字母、底線和數字。
不能超過128個字元。
例如
mysql_paimon_dlf。外部数据源描述
選填
根據需要填寫。
地域
必填
預設為當前地區。
DLF Endpoint
必填
預設為當前地區的DLF Endpoint。
OSS Endpoint
必填
預設為當前地區的OSS Endpoint。
RoleARN
必填
RAM角色的ARN資訊。此角色需要包含能夠同時訪問DLF和OSS服務的許可權。
登入RAM控制台。
在左側導覽列選擇。
在基礎資訊地區,可以擷取ARN資訊。
樣本:
acs:ram::124****:role/aliyunodpsdefaultrole。外部数据源补充属性
選填
特殊聲明的外部資料源補充屬性。指定後,使用此外部資料源的任務可以按照參數定義的行為訪問源系統。
說明支援的具體參數請關注後續官網文檔更新說明,具體參數將隨產品能力演化逐步放開。
單擊確認,完成外部資料源的建立。
在外部数据源頁面,單擊目標資料來源對應的操作的详情,可查看資料來源詳細資料。
步驟七:建立外部schema
串連至MaxCompute,輸入以下命令:
SET odps.namespace.schema=true;
CREATE EXTERNAL SCHEMA IF NOT EXISTS <external_schema>
WITH <external_data_source>
ON '<dlf_data_catalogue>.dlf_database';參數說明如下:
步驟八:使用SQL訪問OSS資料
登入MaxCompute用戶端,查詢external schema內的表。
SET odps.namespace.schema=true;
use schema es_mc_dlf_oss_paimon;
SHOW tables IN es_mc_dlf_oss_paimon;
-- 返回結果:
ALIYUN$xxx:sales
OK查詢external schema內表資料。
SET odps.namespace.schema=true;
SELECT * FROM <maxcompute_project_name>.es_mc_dlf_oss_paimon.sales;
-- 返回結果如下:
+------------+------------+------------+--------------+---------------+------------+------------+------------+
| id | year | amount | product_name | customer_name | order_date | region | status |
+------------+------------+------------+--------------+---------------+------------+------------+------------+
| 1 | 2020 | 100 | Product A | Customer 1 | 2020-01-01 | Region 1 | Completed |
| 2 | 2020 | 200 | Product B | Customer 2 | 2020-02-01 | Region 2 | Pending |
| 3 | 2021 | 150 | Product C | Customer 3 | 2021-03-01 | Region 3 | Completed |
| 4 | 2021 | 300 | Product D | Customer 4 | 2021-04-01 | Region 4 | Pending |
| 5 | 2022 | 250 | Product E | Customer 5 | 2022-05-01 | Region 5 | Completed |
| 6 | 2022 | 400 | Product F | Customer 6 | 2022-06-01 | Region 6 | Pending |
| 7 | 2023 | 350 | Product G | Customer 7 | 2023-07-01 | Region 7 | Completed |
| 8 | 2023 | 500 | Product H | Customer 8 | 2023-08-01 | Region 8 | Pending |
| 9 | 2020 | 450 | Product I | Customer 9 | 2020-09-01 | Region 1 | Completed |
| 10 | 2021 | 600 | Product J | Customer 10 | 2021-10-01 | Region 2 | Pending |
+------------+------------+------------+--------------+---------------+------------+------------+------------+