本文介绍基于Flink创建Paimon DLF Catalog,读取MySQL CDC数据并写入OSS,进一步将元数据同步到DLF,进而使用MaxCompute的external schema进行数据湖联邦查询。
适用范围
支持地域
地域名称
地域ID
华东1(杭州)
cn-hangzhou
华东2(上海)
cn-shanghai
华北2(北京)
cn-beijing
华北3(张家口)
cn-zhangjiakou
华南1(深圳)
cn-shenzhen
中国香港
cn-hongkong
新加坡
ap-southeast-1
德国(法兰克福)
eu-central-1
MaxCompute、OSS、DLF、Flink必须部署在同一地域。
操作步骤
前置准备
步骤一:授予MaxCompute访问DLF和OSS的权限
操作MaxCompute项目的账号未经授权无法访问DLF和OSS服务,授权方式包含如下两种:
步骤二:准备MySQL测试数据
如有其他MySQL测试数据,可跳过此步骤。
登录RDS 控制台。
在左侧导航栏,选择实例列表,在左上角选择地域。
在实例列表页面,单击目标实例ID/名称,进入实例详情页。
在左侧导航栏,单击数据库管理。
单击新建数据库。配置如下参数:
参数
是否必填
说明
示例
数据库(DB)名称
必填
长度为2~64个字符。
以字母开头,以字母或数字结尾。
由小写字母、数字、下划线或中划线组成。
数据库名称在实例内必须是唯一的。
数据库名称中如果包含
-,创建出的数据库的文件夹的名字中的-会变成@002d。
mysql_paimon支持字符集
必填
请按需选择字符集。
utf8授权账号
选填
选中需要访问本数据库的账号。本参数可以留空,创建数据库后再绑定账号。
此处仅会显示普通账号。高权限账号拥有所有数据库的所有权限,无需授权。
默认备注说明
选填
用于备注该数据库的相关信息,便于后续数据库管理,最多支持256个字符。
创建flink测试库。单击登录数据库,在左侧导航栏选择数据库实例,双击选中已创建的数据库,在右侧SQLConsole页面执行下列语句,创建测试表并写入测试数据。
如果实例存在,但实例展开后未找到目标数据库,可能是:
登录账号无目标数据库的访问权限:可前往RDS实例详情页的账号管理页面手动修改账号权限或更换登录的数据库账号
元数据未同步导致目录无法显示:请将鼠标悬浮在目标数据库所属实例上,单击实例名右侧的
按钮,即可刷新数据库列表,显示目标数据库。
-- 创建表 CREATE TABLE sales ( id INT NOT NULL AUTO_INCREMENT, year INT NOT NULL, amount DECIMAL(10,2) NOT NULL, product_name VARCHAR(100) NOT NULL, customer_name VARCHAR(100) NOT NULL, order_date DATE NOT NULL, region VARCHAR(50) NOT NULL, status VARCHAR(20) NOT NULL, PRIMARY KEY (id,year) ) PARTITION BY RANGE (year) ( PARTITION p2020 VALUES LESS THAN (2021), PARTITION p2021 VALUES LESS THAN (2022), PARTITION p2022 VALUES LESS THAN (2023), PARTITION p2023 VALUES LESS THAN (2024) ); -- 写入数据 INSERT INTO sales (year, amount, product_name, customer_name, order_date, region, status) VALUES (2020, 100.00, 'Product A', 'Customer 1', '2020-01-01', 'Region 1', 'Completed'), (2020, 200.00, 'Product B', 'Customer 2', '2020-02-01', 'Region 2', 'Pending'), (2021, 150.00, 'Product C', 'Customer 3', '2021-03-01', 'Region 3', 'Completed'), (2021, 300.00, 'Product D', 'Customer 4', '2021-04-01', 'Region 4', 'Pending'), (2022, 250.00, 'Product E', 'Customer 5', '2022-05-01', 'Region 5', 'Completed'), (2022, 400.00, 'Product F', 'Customer 6', '2022-06-01', 'Region 6', 'Pending'), (2023, 350.00, 'Product G', 'Customer 7', '2023-07-01', 'Region 7', 'Completed'), (2023, 500.00, 'Product H', 'Customer 8', '2023-08-01', 'Region 8', 'Pending'), (2020, 450.00, 'Product I', 'Customer 9', '2020-09-01', 'Region 1', 'Completed'), (2021, 600.00, 'Product J', 'Customer 10', '2021-10-01', 'Region 2', 'Pending');查询测试表数据。
SELECT * FROM sales;返回结果:

步骤三:准备DLF元数据库
登录OSS控制台,创建Bucket,本示例中Bucket名为
mc-lakehouse-dlf-oss。详情请参见创建存储空间。在Bucket下新建目录
flink_paimon。登录数据湖构建(DLF)控制台,在左上角选择地域。
在左侧导航栏,选择。
在元数据管理页面,单击数据库页签。
在default数据目录下单击新建数据库。配置如下参数:
参数
是否必填
说明
所属数据目录
必填
示例中是default数据目录。
数据库名称:
必填
自定义数据库名称,以字母开头,长度为1-128位,允许字符为a-z、A-Z、0-9_,例如
db_dlf_oss。数据库描述:
选填
自定义描述。
选择路径:
必填
数据库存储位置,例如
oss://mc-lakehouse-dlf-oss/flink_paimon/。
步骤四:基于Flink创建Paimon、MySQL catalog
创建Paimon catalog:
登录Flink控制台,在左上角选择地域。
单击目标工作空间名称,然后在左侧导航栏,选择数据管理。
在右侧Catalog列表 界面,单击创建Catalog 。在弹出的创建 Catalog 对话框里,选择Apache Paimon,单击下一步 并配置如下参数:
参数
是否必填
说明
metastore
必填
元数据存储类型。本示例中选择
dlf。catalog name
必填
选择需要关联版本的DLF Catalog,本示例选择
v1.0版本。warehouse
必填
OSS服务中所指定的数仓目录。本示例中
oss://mc-lakehouse-dlf-oss/flink_paimon/。fs.oss.endpoint
必填
OSS服务的endpoint,例如杭州地域为
oss-cn-hangzhou-internal.aliyuncs.com。fs.oss.accessKeyId
必填
访问OSS服务所需的Access Key ID。
fs.oss.accessKeySecret
必填
访问OSS服务所需的Access Key Secret。
dlf.catalog.accessKeyId
必填
访问DLF服务所需的Access Key ID。
dlf.catalog.accessKeySecret
必填
访问DLF服务所需的Access Key Secret。
创建MySQL catalog:
登录Flink控制台,在左上角选择地域。
添加白名单。
单击目标工作空间对应的操作列详情。
在弹出的工作空间详情交互面板中,复制交换机的网段信息。
登录RDS 控制台。
在左侧导航栏,选择实例列表,在左上角选择地域。
在实例列表页面,单击目标实例ID/名称,进入实例详情页。
在左侧导航栏,单击白名单与安全组。
在白名单设置页签,单击修改。
在弹出的修改白名单分组对话框,组内白名单位置添加复制下来的网段信息,单击确定。
登录Flink控制台,在左上角选择地域。
单击目标工作空间名称,然后在左侧导航栏,选择数据管理。
在右侧Catalog列表 界面,单击创建Catalog 。在弹出的创建 Catalog 对话框里,选择MySQL,单击下一步 并配置如下参数:
参数
是否必填
说明
catalog name
必填
自定义MySQL Catalog名称。例如
mysql-catalog。hostname
必填
MySQL数据库的IP地址或者Hostname。
可登录RDS MySQL控制台,在数据库实例详情页,单击数据库连接查看数据库内网地址、外网地址及内网端口。
在跨VPC或公网访问时需要打通网络,详情请参见网络连通性。
port
默认
连接到服务器的端口,默认为3306。
default database
必填
默认数据库名称。例如
mysql_paimon。username
必填
连接MySQL数据库服务器时使用的用户名。可登录RDS MySQL控制台,在数据库实例详情页,单击账号管理查看。
password
必填
连接MySQL数据库服务器时使用的密码。可登录RDS MySQL控制台,在数据库实例详情页,单击账号管理查看。
步骤五:基于Flink读MySQL写Paimon并同步元数据到DLF
登录Flink控制台,在左上角选择地域。
单击目标工作空间名称,然后在左侧导航栏,选择。
在作业草稿页签,单击
,新建文件夹。右键文件夹,选择新建流作业,在弹出的新建作业草稿对话框,填写文件名称并选择引擎版本。
在文件中写入如下CREATE TABLE AS(CTAS)SQL语句。注意根据实际命名修改代码中的相关命名。
CREATE TABLE IF NOT EXISTS `<dlf_meta_db_name>`.`<OSS_bucket_name>`.`sales` AS TABLE `<mysql_catalog_name>`.`<RDS_mysql_name>`.`sales`; -- 按照本文示例命名可直接复制下方代码 CREATE TABLE IF NOT EXISTS `db_dlf_oss`.`flink_paimon`.`sales` AS TABLE `mysql-catalog`.`mysql_paimon`.`sales`;(可选)单击右上方的深度检查,确认作业Flink SQL语句中是否存在语法错误。
单击右上角部署,在弹出的部署新版本对话框中填写备注、作业标签和部署目标等信息,然后单击确定。
单击目标工作空间名称,然后在左侧导航栏,选择。
在作业运维页面,单击目标作业名称,进入作业部署详情页面。
在目标作业部署详情页右上角,单击启动,选择无状态启动后,单击启动。
查询Paimon数据。
在左侧导航栏,选择。
在查询脚本页签,单击
,新建查询脚本。运行如下代码:
SELECT * FROM `<paimon_catalog_name>`.`flink_paimon`.`sales`;返回结果如下:

进入OSS控制台,查看
mc-lakehouse-dlf-oss/flink_paimon/,生成sales/文件夹,生成文件如图所示:
登录数据湖构建(DLF)控制台,在左上角选择地域。
在左侧导航栏,选择。
单击数据库名
flink_paimon,可查看到已生成的表,如图所示:
步骤六:MaxCompute创建DLF+OSS外部数据源
登录MaxCompute控制台,在左上角选择地域。
在左侧导航栏,选择 。
在外部数据源页面,单击创建外部数据源。
在弹出的新增外部数据源对话框,根据界面提示配置相关参数。参数说明如下:
参数
是否必填
说明
外部数据源类型
必填
选择DLF+OSS。
外部数据源名称
必填
可自定义命名。命名规则如下:
以字母开头,且只能包含小写字母、下划线和数字。
不能超过128个字符。
例如
mysql_paimon_dlf。外部数据源描述
选填
根据需要填写。
地域
必填
默认为当前地域。
DLF Endpoint
必填
默认为当前地域的DLF Endpoint。
OSS Endpoint
必填
默认为当前地域的OSS Endpoint。
RoleARN
必填
RAM角色的ARN信息。此角色需要包含能够同时访问DLF和OSS服务的权限。
登录RAM控制台。
在左侧导航栏选择。
在基础信息区域,可以获取ARN信息。
示例:
acs:ram::124****:role/aliyunodpsdefaultrole。外部数据源补充属性
选填
特殊声明的外部数据源补充属性。指定后,使用此外部数据源的任务可以按照参数定义的行为访问源系统。
说明支持的具体参数请关注后续官网文档更新说明,具体参数将随产品能力演进逐步放开。
单击确认,完成外部数据源的创建。
在外部数据源页面,单击目标数据源对应的操作的详情,可查看数据源详细信息。
步骤七:创建外部schema
连接至MaxCompute,输入以下命令:
SET odps.namespace.schema=true;
CREATE EXTERNAL SCHEMA IF NOT EXISTS <external_schema>
WITH <external_data_source>
ON '<dlf_data_catalogue>.dlf_database';参数说明如下:
步骤八:使用SQL访问OSS数据
登录MaxCompute客户端,查询external schema内的表。
SET odps.namespace.schema=true;
use schema es_mc_dlf_oss_paimon;
SHOW tables IN es_mc_dlf_oss_paimon;
-- 返回结果:
ALIYUN$xxx:sales
OK查询external schema内表数据。
SET odps.namespace.schema=true;
SELECT * FROM <maxcompute_project_name>.es_mc_dlf_oss_paimon.sales;
-- 返回结果如下:
+------------+------------+------------+--------------+---------------+------------+------------+------------+
| id | year | amount | product_name | customer_name | order_date | region | status |
+------------+------------+------------+--------------+---------------+------------+------------+------------+
| 1 | 2020 | 100 | Product A | Customer 1 | 2020-01-01 | Region 1 | Completed |
| 2 | 2020 | 200 | Product B | Customer 2 | 2020-02-01 | Region 2 | Pending |
| 3 | 2021 | 150 | Product C | Customer 3 | 2021-03-01 | Region 3 | Completed |
| 4 | 2021 | 300 | Product D | Customer 4 | 2021-04-01 | Region 4 | Pending |
| 5 | 2022 | 250 | Product E | Customer 5 | 2022-05-01 | Region 5 | Completed |
| 6 | 2022 | 400 | Product F | Customer 6 | 2022-06-01 | Region 6 | Pending |
| 7 | 2023 | 350 | Product G | Customer 7 | 2023-07-01 | Region 7 | Completed |
| 8 | 2023 | 500 | Product H | Customer 8 | 2023-08-01 | Region 8 | Pending |
| 9 | 2020 | 450 | Product I | Customer 9 | 2020-09-01 | Region 1 | Completed |
| 10 | 2021 | 600 | Product J | Customer 10 | 2021-10-01 | Region 2 | Pending |
+------------+------------+------------+--------------+---------------+------------+------------+------------+