Iceberg是一种开放的数据湖表格式。您可以借助Iceberg快速地在HDFS或者阿里云OSS上构建自己的数据湖存储服务。本文为您介绍如何在EMR Serverless Spark中实现Iceberg表的读取与写入操作。
前提条件
已创建工作空间,详情请参见创建工作空间。
操作流程
SparkSQL与Notebook均支持对Iceberg表的读写操作。本文将以SparkSQL任务为例进行介绍。
步骤一:创建会话资源
进入会话管理页面。
在左侧导航栏,选择。
在Spark页面,单击目标工作空间名称。
在EMR Serverless Spark页面,单击左侧导航栏中的会话管理。
在SQL会话页面,单击创建SQL会话。
在创建SQL会话页面的Spark配置区域,配置以下信息,单击创建。详情请参见管理SQL会话。
Spark对Iceberg的读写基于Catalog,您可以根据具体场景进行选择。更多Catalog信息,请参见管理数据目录。
使用数据目录Catalog
若采用数据目录Catalog方式,则无需在会话中配置参数,只需在数据目录页面单击添加数据目录,然后在SparkSQL开发中直接选择数据目录即可。
说明如果要访问DLF(原DLF2.5)中的Iceberg,引擎版本请使用esr-4.7.0、esr-3.6.0及以上版本。
如果要访问DLF-Legacy(原DLF1.0)或Hive MetaStore中的Iceberg,引擎版本推荐使用esr-4.3.0、esr-3.3.0、esr-2.7.0及以上版本。
使用自定义Catalog
DLF(原DLF 2.5)
说明引擎版本要求esr-4.7.0、esr-3.6.0及以上版本。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.iceberg_catalog org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.iceberg_catalog.catalog-impl org.apache.iceberg.rest.RESTCatalog spark.sql.catalog.iceberg_catalog.uri http://<regionID>-vpc.dlf.aliyuncs.com spark.sql.catalog.iceberg_catalog.warehouse <catalog_name> spark.sql.catalog.iceberg_catalog.io-impl org.apache.iceberg.rest.DlfFileIO spark.sql.catalog.iceberg_catalog.rest.auth.type sigv4 spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type none spark.sql.catalog.iceberg_catalog.rest.signing-region <regionID> spark.sql.catalog.iceberg_catalog.rest.signing-name DlfNext spark.sql.catalog.iceberg_catalog.rest.access-key-id <access_key_id> spark.sql.catalog.iceberg_catalog.rest.secret-access-key <access_key_secret>涉及参数如下所示。
参数
说明
示例值
spark.sql.extensions启用Iceberg Spark扩展。
固定值:
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensionsspark.sql.catalog.iceberg_catalog注册名为 iceberg_catalog 的 Spark Catalog。
固定值:
org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.iceberg_catalog.catalog-impl指定底层Catalog实现为Iceberg REST Catalog
org.apache.iceberg.rest.RESTCatalogspark.sql.catalog.iceberg_catalog.uriDLF Iceberg 服务的 REST API 地址,格式为
http://<regionID>-vpc.dlf.aliyuncs.com。http://cn-hangzhou-vpc.dlf.aliyuncs.comspark.sql.catalog.iceberg_catalog.warehouse指定关联的 DLF Catalog 名称。
说明不建议关联数据共享创建的DLF Catalog。
<catalog_name>spark.sql.catalog.iceberg_catalog.io-impl使用 DLF 定制的 FileIO 实现。
固定值:
org.apache.iceberg.rest.DlfFileIOspark.sql.catalog.iceberg_catalog.rest.auth.type启用 AWS SigV4 签名认证机制,用于对 REST 请求进行身份验证。
sigv4spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type禁用委托认证,由客户端直接提供 AK/SK 进行签名。
nonespark.sql.catalog.iceberg_catalog.rest.signing-region指定签名所用的地域(Region),必须与 DLF 服务所在地域一致。
cn-hangzhouspark.sql.catalog.iceberg_catalog.rest.signing-name指定签名所用的服务名称
固定值:
DlfNextspark.sql.catalog.iceberg_catalog.rest.access-key-id阿里云账号或者RAM用户的AccessKey ID。
<access_key_id>spark.sql.catalog.iceberg_catalog.rest.secret-access-key阿里云账号或者RAM用户的AccessKey Secret。
<access_key_secret>DLF-Legacy(原DLF1.0)
说明引擎版本要求esr-4.3.0、esr-3.3.0、esr-2.7.0及以上版本。
元数据保存在DLF-Legacy(原DLF1.0)中。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.<catalogName> org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.<catalogName>.catalog-impl org.apache.iceberg.aliyun.dlf.hive.DlfCatalog spark.sql.catalog.<catalogName>.dlf.catalog.id <catalog_name>涉及参数说明如下所示。
参数
说明
示例值
spark.sql.extensions启用Iceberg Spark扩展。
固定值:
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensionsspark.sql.catalog.<catalogName>注册一个名为
<catalogName>的 Catalog。固定值:
org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.<catalogName>.catalog-impl采用阿里云 DLF-Legacy 专用的 Hive 兼容实现,直连 DLF-Legacy 元数据服务。
固定值:
org.apache.iceberg.aliyun.dlf.hive.DlfCatalogspark.sql.catalog.<catalogName>.dlf.catalog.id指定关联的 DLF Catalog 名称。
<catalog_name>Hive MetaStore
元数据保存在指定的Hive MetaStore中。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.<catalogName> org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.<catalogName>.catalog-impl org.apache.iceberg.hive.HiveCatalog spark.sql.catalog.<catalogName>.uri thrift://<yourHMSUri>:<port>涉及参数如下表所示。
参数
说明
示例值
spark.sql.extensions启用Iceberg Spark扩展。
固定值:
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensionsspark.sql.catalog.<catalogName>注册一个名为
<catalogName>的 Catalog固定值:
org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.<catalogName>.catalog-impl指定该 Catalog 使用 Iceberg 官方的 HiveCatalog 实现,通过 Hive Metastore 存储和读取 Iceberg 表的元数据。
固定值:
org.apache.iceberg.hive.HiveCatalogspark.sql.catalog.<catalogName>.uriHive MetaStore的URI。格式为
thrift://<Hive metastore的IP地址>:9083。<Hive metastore的IP地址>为HMS服务的内网IP地址。如果您需要指定外部Metastore服务,请参见连接外部Hive Metastore Service。thrift://192.168.**.**:9083FileSystem
元数据保存在文件系统中。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.<catalogName> org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.<catalogName>.type hadoop spark.sql.catalog.<catalogName>.warehouse oss://<yourBucketName>/warehouse涉及参数如下表所示。
参数
说明
示例值
spark.sql.extensions启用Iceberg Spark扩展。
固定值:
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensionsspark.sql.catalog.<catalogName>注册一个名为
<catalogName>的 Catalog。固定值:
org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.<catalogName>.type指定 Catalog 类型为
hadoop。表示使用HadoopCatalog,将元数据直接存储在文件系统中,无需 Hive Metastore。hadoopspark.sql.catalog.<catalogName>.warehouse指定元数据存储路径。代码中的
<yourBucketName>表示OSS上的Bucket名称。oss://<yourBucketName>/warehouse
步骤二:读写Iceberg表
进入SQL开发页面。
在EMR Serverless Spark页面,单击左侧导航栏中的数据开发。
在开发目录页签下,单击
图标。在新建对话框中,输入名称(例如users_task),类型使用默认的SparkSQL,然后单击确定。
拷贝如下代码到新增的SparkSQL页签(users_task)中。
说明在未指定数据库的情况下,创建数据表将默认位于Catalog下的default数据库中,用户亦可创建并指定其他数据库。
-- 创建数据库 CREATE DATABASE IF NOT EXISTS iceberg_catalog.db; -- 创建非分区表 CREATE TABLE iceberg_catalog.db.tbl ( id BIGINT NOT NULL COMMENT 'unique id', data STRING ) USING iceberg; -- 插入非分区表数据 INSERT INTO iceberg_catalog.db.tbl VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); -- 查询非分区表所有数据 SELECT * FROM iceberg_catalog.db.tbl; -- 按条件查询非分区表 SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2; -- 更新非分区表数据 UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3; -- 再次查询确认更新 SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3; -- 删除非分区表数据 DELETE FROM iceberg_catalog.db.tbl WHERE id = 1; -- 再次查询确认删除 SELECT * FROM iceberg_catalog.db.tbl; -- 创建分区表 CREATE TABLE iceberg_catalog.db.part_tbl ( id BIGINT, data STRING, category STRING, ts TIMESTAMP, dt DATE ) USING iceberg PARTITIONED BY (dt, category); -- 插入分区表数据 INSERT INTO iceberg_catalog.db.part_tbl VALUES (1 , 'data-01', 'A', timestamp'2026-01-01 10:00:00', date'2026-01-01'), (2 , 'data-02', 'A', timestamp'2026-01-01 11:00:00', date'2026-01-01'), (3 , 'data-03', 'A', timestamp'2026-01-02 09:30:00', date'2026-01-02'), (4 , 'data-04', 'B', timestamp'2026-01-02 12:15:00', date'2026-01-02'), (5 , 'data-05', 'B', timestamp'2026-01-03 08:05:00', date'2026-01-03'), (6 , 'data-06', 'B', timestamp'2026-01-03 14:20:00', date'2026-01-03'), (7 , 'data-07', 'C', timestamp'2026-01-04 16:45:00', date'2026-01-04'), (8 , 'data-08', 'C', timestamp'2026-01-04 18:10:00', date'2026-01-04'), (9 , 'data-09', 'C', timestamp'2026-01-05 07:55:00', date'2026-01-05'), (10, 'data-10', 'A', timestamp'2026-01-05 13:35:00', date'2026-01-05'); -- 查询分区表所有数据 SELECT * FROM iceberg_catalog.db.part_tbl; -- 查询 dt='2026-01-01' 的数据 SELECT * FROM iceberg_catalog.db.part_tbl WHERE dt='2026-01-01'; -- 查询某个 category 的数据 SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A'; -- 多条件组合查询(bucket + day + category) SELECT * FROM iceberg_catalog.db.part_tbl WHERE dt='2026-01-01' AND category = 'A'; -- 聚合统计每个分类的数据数量 SELECT category, COUNT(*) AS count FROM iceberg_catalog.db.part_tbl GROUP BY category; -- 删除数据库(谨慎操作),删除前需确保该db下tbl为空 -- DROP DATABASE iceberg_catalog.db;在会话下拉列表中选择刚刚创建的SQL会话实例,并单击“运行”按钮。成功执行后,您可以在下方查看运行结果。

相关文档
SQL任务和任务编排完整的开发流程示例,请参见SparkSQL开发快速入门。
更多Iceberg相关用法和配置,请参见Apache Iceberg。
创建SQL会话资源的具体操作,请参见管理SQL会话。
创建Notebook会话资源的具体操作,请参见管理Notebook会话。