本文为您介绍在EMR on ECS Spark环境中如何以Iceberg REST访问DLF Catalog。
前提条件
版本要求:已创建EMR集群且版本 ≥ 5.12.0,组件选择Spark3,且Spark3使用JDK 11。
地域要求:EMR与DLF在同一地域,且已将EMR集群所在的VPC加入DLF的白名单中。
权限要求:如果是RAM用户,在进行数据操作之前,需要先授予相应的资源权限。详情请参见数据授权管理。
下载依赖配置
授予角色DLF权限
授予AliyunECSInstanceForEMRRole角色RAM权限(EMR产品化集成后可以省略该步骤)。
使用阿里云账号或RAM管理员登录RAM控制台。
单击,查询AliyunECSInstanceForEMRRole角色。
单击操作列的新增授权,进入新增授权页面。
在权限策略中,查询并勾选AliyunDLFFullAccess,单击确认新增授权。

授予AliyunECSInstanceForEMRRole角色DLF权限。
登录数据湖构建控制台。
在Catalogs列表页面,单击Catalog名称,进入Catalog详情页。
单击权限页签,单击授权。
在授权页面,配置以下信息,单击确定。
用户/角色:选择RAM用户/RAM角色。
选择授权对象:在下拉列表中选择AliyunECSInstanceForEMRRole。
说明如果用户下拉列表中未找到AliyunECSInstanceForEMRRole,可以在用户管理页面单击同步。
预置权限类型:选择Data Editor。
连接Catalog
依赖配置
直接将四个依赖包加入到$SPARK_HOME/jars中。
会话启动
在Terminal中执行spark-sql命令,注意替换对应参数。
${regionID}:为实际region,如cn-hangzhou。
${catalogName}:替换成在DLF中创建好的catalog名称。
spark-sql \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.iceberg_catalog.catalog-impl=org.apache.iceberg.rest.RESTCatalog \
--conf spark.sql.catalog.iceberg_catalog.uri=http://${regionID}-vpc.dlf.aliyuncs.com/iceberg \
--conf spark.sql.catalog.iceberg_catalog.warehouse=${catalogName} \
--conf spark.sql.catalog.iceberg_catalog.io-impl=org.apache.iceberg.rest.DlfFileIO \
--conf spark.sql.catalog.iceberg_catalog.rest.auth.type=sigv4 \
--conf spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type=none \
--conf spark.sql.catalog.iceberg_catalog.rest.signing-region=${regionID} \
--conf spark.sql.catalog.iceberg_catalog.client.credentials-provider=org.apache.iceberg.rest.credentials.DlfEcsTokenCredentialsProvider \
--conf spark.sql.catalog.iceberg_catalog.rest.signing-name=DlfNext \
--master local读写数据
Iceberg 表不具备自动存储回收机制。为避免存储成本激增,请务必参考Spark Procedures以定期清理过期快照和孤立文件。
不指定数据库时,创建数据表会默认建在Catalog的default数据库中,也可创建并指定其他数据库。
-- 切换到 Iceberg Catalog
USE iceberg_catalog;
-- 创建数据库
CREATE DATABASE db;
-- 创建非分区表
CREATE TABLE iceberg_catalog.db.tbl (
id BIGINT NOT NULL COMMENT 'unique id',
data STRING
)
USING iceberg;
-- 插入非分区表数据
INSERT INTO iceberg_catalog.db.tbl VALUES
(1, 'Alice'),
(2, 'Bob'),
(3, 'Charlie');
-- 查询非分区表所有数据
SELECT * FROM iceberg_catalog.db.tbl;
-- 按条件查询非分区表
SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2;
-- 更新非分区表数据
UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3;
-- 再次查询确认更新
SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3;
-- 删除非分区表数据
DELETE FROM iceberg_catalog.db.tbl WHERE id = 1;
-- 再次查询确认删除
SELECT * FROM iceberg_catalog.db.tbl;
-- 创建分区表
CREATE TABLE iceberg_catalog.db.part_tbl (
id BIGINT,
data STRING,
category STRING,
ts TIMESTAMP
)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category);
-- 插入分区表数据
INSERT INTO iceberg_catalog.db.part_tbl VALUES
(100, 'Data1', 'A', to_timestamp('2025-01-01 12:00:00')),
(200, 'Data2', 'B', to_timestamp('2025-01-02 14:00:00')),
(300, 'Data3', 'A', to_timestamp('2025-01-01 15:00:00')),
(400, 'Data4', 'C', to_timestamp('2025-01-03 10:00:00'));
-- 查询分区表所有数据
SELECT * FROM iceberg_catalog.db.part_tbl;
-- 查询 bucket(16, id) = 0 的数据
SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0;
-- 查询 day(ts) = '2025-01-01' 的数据
SELECT * FROM iceberg_catalog.db.part_tbl WHERE days(ts) = '2025-01-01';
-- 查询某个 category 的数据
SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A';
-- 多条件组合查询(bucket + day + category)
SELECT * FROM iceberg_catalog.db.part_tbl
WHERE bucket(16, id) = 0
AND days(ts) = '2025-01-01'
AND category = 'A';
-- 聚合统计每个分类的数据数量
SELECT category, COUNT(*) AS count
FROM iceberg_catalog.db.part_tbl
GROUP BY category;
-- 清理测试数据(可选)
TRUNCATE TABLE iceberg_catalog.db.tbl;
TRUNCATE TABLE iceberg_catalog.db.part_tbl;
-- 删除数据库(谨慎操作)
DROP DATABASE iceberg_catalog.db;