全部产品
Search
文档中心

开源大数据平台E-MapReduce:使用Iceberg

更新时间:Feb 04, 2026

Iceberg是一种开放的数据湖表格式。您可以借助Iceberg快速地在HDFS或者阿里云OSS上构建自己的数据湖存储服务。本文为您介绍如何在EMR Serverless Spark中实现Iceberg表的读取与写入操作。

前提条件

已创建工作空间,详情请参见创建工作空间

操作流程

说明

SparkSQL与Notebook均支持对Iceberg表的读写操作。本文将以SparkSQL任务为例进行介绍。

步骤一:创建会话资源

  1. 进入会话管理页面。

    1. 登录E-MapReduce控制台

    2. 在左侧导航栏,选择EMR Serverless > Spark

    3. Spark页面,单击目标工作空间名称。

    4. EMR Serverless Spark页面,单击左侧导航栏中的会话管理

  2. SQL会话页面,单击创建SQL会话

  3. 创建SQL会话页面的Spark配置区域,配置以下信息,单击创建。详情请参见管理SQL会话

    Spark对Iceberg的读写基于Catalog,您可以根据具体场景进行选择。更多Catalog信息,请参见管理数据目录

    使用数据目录Catalog

    若采用数据目录Catalog方式,则无需在会话中配置参数,只需在数据目录页面单击添加数据目录,然后在SparkSQL开发中直接选择数据目录即可。

    说明
    • 如果要访问DLF(原DLF2.5)中的Iceberg,引擎版本请使用esr-4.7.0、esr-3.6.0及以上版本。

    • 如果要访问DLF-Legacy(原DLF1.0)或Hive MetaStore中的Iceberg,引擎版本推荐使用esr-4.3.0、esr-3.3.0、esr-2.7.0及以上版本。

    使用自定义Catalog

    DLF(原DLF 2.5)

    说明

    引擎版本要求esr-4.7.0、esr-3.6.0及以上版本。

    spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.iceberg_catalog org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.iceberg_catalog.catalog-impl org.apache.iceberg.rest.RESTCatalog
    spark.sql.catalog.iceberg_catalog.uri http://<regionID>-vpc.dlf.aliyuncs.com
    spark.sql.catalog.iceberg_catalog.warehouse  <catalog_name>
    spark.sql.catalog.iceberg_catalog.io-impl org.apache.iceberg.rest.DlfFileIO
    spark.sql.catalog.iceberg_catalog.rest.auth.type sigv4
    spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type none
    spark.sql.catalog.iceberg_catalog.rest.signing-region <regionID>
    spark.sql.catalog.iceberg_catalog.rest.signing-name DlfNext
    spark.sql.catalog.iceberg_catalog.rest.access-key-id <access_key_id>
    spark.sql.catalog.iceberg_catalog.rest.secret-access-key <access_key_secret>

    涉及参数如下所示。

    参数

    说明

    示例值

    spark.sql.extensions

    启用Iceberg Spark扩展。

    固定值:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.iceberg_catalog

    注册名为 iceberg_catalog 的 Spark Catalog。

    固定值:org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.iceberg_catalog.catalog-impl

    指定底层Catalog实现为Iceberg REST Catalog

    org.apache.iceberg.rest.RESTCatalog

    spark.sql.catalog.iceberg_catalog.uri

    DLF Iceberg 服务的 REST API 地址,格式为http://<regionID>-vpc.dlf.aliyuncs.com

    http://cn-hangzhou-vpc.dlf.aliyuncs.com

    spark.sql.catalog.iceberg_catalog.warehouse

    指定关联的 DLF Catalog 名称。

    说明

    不建议关联数据共享创建的DLF Catalog。

    <catalog_name>

    spark.sql.catalog.iceberg_catalog.io-impl

    使用 DLF 定制的 FileIO 实现。

    固定值:org.apache.iceberg.rest.DlfFileIO

    spark.sql.catalog.iceberg_catalog.rest.auth.type

    启用 AWS SigV4 签名认证机制,用于对 REST 请求进行身份验证。

    sigv4

    spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type

    禁用委托认证,由客户端直接提供 AK/SK 进行签名。

    none

    spark.sql.catalog.iceberg_catalog.rest.signing-region

    指定签名所用的地域(Region),必须与 DLF 服务所在地域一致。

    cn-hangzhou

    spark.sql.catalog.iceberg_catalog.rest.signing-name

    指定签名所用的服务名称

    固定值:DlfNext

    spark.sql.catalog.iceberg_catalog.rest.access-key-id

    阿里云账号或者RAM用户的AccessKey ID。

    <access_key_id>

    spark.sql.catalog.iceberg_catalog.rest.secret-access-key

    阿里云账号或者RAM用户的AccessKey Secret。

    <access_key_secret>

    DLF-Legacy(原DLF1.0)

    说明

    引擎版本要求esr-4.3.0、esr-3.3.0、esr-2.7.0及以上版本。

    元数据保存在DLF-Legacy(原DLF1.0)中。

    spark.sql.extensions                          org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.<catalogName>               org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.catalog-impl  org.apache.iceberg.aliyun.dlf.hive.DlfCatalog
    spark.sql.catalog.<catalogName>.dlf.catalog.id <catalog_name>

    涉及参数说明如下所示。

    参数

    说明

    示例值

    spark.sql.extensions

    启用Iceberg Spark扩展。

    固定值:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.<catalogName>

    注册一个名为 <catalogName> 的 Catalog。

    固定值:org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.catalog-impl

    采用阿里云 DLF-Legacy 专用的 Hive 兼容实现,直连 DLF-Legacy 元数据服务。

    固定值:org.apache.iceberg.aliyun.dlf.hive.DlfCatalog

    spark.sql.catalog.<catalogName>.dlf.catalog.id

    指定关联的 DLF Catalog 名称。

    <catalog_name>

    Hive MetaStore

    元数据保存在指定的Hive MetaStore中。

    spark.sql.extensions                          org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.<catalogName>               org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.catalog-impl  org.apache.iceberg.hive.HiveCatalog
    spark.sql.catalog.<catalogName>.uri           thrift://<yourHMSUri>:<port>

    涉及参数如下表所示。

    参数

    说明

    示例值

    spark.sql.extensions

    启用Iceberg Spark扩展。

    固定值:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.<catalogName>

    注册一个名为 <catalogName> 的 Catalog

    固定值:org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.catalog-impl

    指定该 Catalog 使用 Iceberg 官方的 HiveCatalog 实现,通过 Hive Metastore 存储和读取 Iceberg 表的元数据。

    固定值:org.apache.iceberg.hive.HiveCatalog

    spark.sql.catalog.<catalogName>.uri

    Hive MetaStore的URI。格式为thrift://<Hive metastore的IP地址>:9083

    <Hive metastore的IP地址>为HMS服务的内网IP地址。如果您需要指定外部Metastore服务,请参见连接外部Hive Metastore Service

    thrift://192.168.**.**:9083

    FileSystem

    元数据保存在文件系统中。

    spark.sql.extensions                          org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    spark.sql.catalog.<catalogName>               org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.type          hadoop
    spark.sql.catalog.<catalogName>.warehouse     oss://<yourBucketName>/warehouse

    涉及参数如下表所示。

    参数

    说明

    示例值

    spark.sql.extensions

    启用Iceberg Spark扩展。

    固定值:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    spark.sql.catalog.<catalogName>

    注册一个名为 <catalogName> 的 Catalog。

    固定值:org.apache.iceberg.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.type

    指定 Catalog 类型为 hadoop。表示使用HadoopCatalog,将元数据直接存储在文件系统中,无需 Hive Metastore。

    hadoop

    spark.sql.catalog.<catalogName>.warehouse

    指定元数据存储路径。代码中的<yourBucketName>表示OSS上的Bucket名称。

    oss://<yourBucketName>/warehouse

步骤二:读写Iceberg

  1. 进入SQL开发页面。

    EMR Serverless Spark页面,单击左侧导航栏中的数据开发

  2. 开发目录页签下,单击image图标。

  3. 新建对话框中,输入名称(例如users_task),类型使用默认的SparkSQL,然后单击确定

  4. 拷贝如下代码到新增的SparkSQL页签(users_task)中。

    说明

    在未指定数据库的情况下,创建数据表将默认位于Catalog下的default数据库中,用户亦可创建并指定其他数据库。

    -- 创建数据库
    CREATE DATABASE IF NOT EXISTS iceberg_catalog.db;
    
    -- 创建非分区表
    CREATE TABLE iceberg_catalog.db.tbl (
        id BIGINT NOT NULL COMMENT 'unique id',
        data STRING
    )
    USING iceberg;
    
    -- 插入非分区表数据
    INSERT INTO iceberg_catalog.db.tbl VALUES
    (1, 'Alice'),
    (2, 'Bob'),
    (3, 'Charlie');
    
    -- 查询非分区表所有数据
    SELECT * FROM iceberg_catalog.db.tbl;
    
    -- 按条件查询非分区表
    SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2;
    
    -- 更新非分区表数据
    UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3;
    
    -- 再次查询确认更新
    SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3;
    
    -- 删除非分区表数据
    DELETE FROM iceberg_catalog.db.tbl WHERE id = 1;
    
    -- 再次查询确认删除
    SELECT * FROM iceberg_catalog.db.tbl;
    
    -- 创建分区表
    CREATE TABLE iceberg_catalog.db.part_tbl (
        id BIGINT,
        data STRING,
        category STRING,
        ts TIMESTAMP,
        dt DATE
    )
    USING iceberg
    PARTITIONED BY (dt, category);
    
    -- 插入分区表数据
    INSERT INTO iceberg_catalog.db.part_tbl VALUES
      (1 , 'data-01', 'A', timestamp'2026-01-01 10:00:00', date'2026-01-01'),
      (2 , 'data-02', 'A', timestamp'2026-01-01 11:00:00', date'2026-01-01'),
      (3 , 'data-03', 'A', timestamp'2026-01-02 09:30:00', date'2026-01-02'),
      (4 , 'data-04', 'B', timestamp'2026-01-02 12:15:00', date'2026-01-02'),
      (5 , 'data-05', 'B', timestamp'2026-01-03 08:05:00', date'2026-01-03'),
      (6 , 'data-06', 'B', timestamp'2026-01-03 14:20:00', date'2026-01-03'),
      (7 , 'data-07', 'C', timestamp'2026-01-04 16:45:00', date'2026-01-04'),
      (8 , 'data-08', 'C', timestamp'2026-01-04 18:10:00', date'2026-01-04'),
      (9 , 'data-09', 'C', timestamp'2026-01-05 07:55:00', date'2026-01-05'),
      (10, 'data-10', 'A', timestamp'2026-01-05 13:35:00', date'2026-01-05');
    
    
    -- 查询分区表所有数据
    SELECT * FROM iceberg_catalog.db.part_tbl;
    
    -- 查询 dt='2026-01-01' 的数据
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE dt='2026-01-01';
    
    -- 查询某个 category 的数据
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A';
    
    -- 多条件组合查询(bucket + day + category)
    SELECT * FROM iceberg_catalog.db.part_tbl 
    WHERE dt='2026-01-01'
      AND category = 'A';
    
    -- 聚合统计每个分类的数据数量
    SELECT category, COUNT(*) AS count 
    FROM iceberg_catalog.db.part_tbl 
    GROUP BY category;
    
    -- 删除数据库(谨慎操作),删除前需确保该db下tbl为空
    -- DROP DATABASE iceberg_catalog.db;
  5. 在会话下拉列表中选择刚刚创建的SQL会话实例,并单击“运行”按钮。成功执行后,您可以在下方查看运行结果。image

相关文档