Iceberg是一種開放的資料湖表格式。您可以藉助Iceberg快速地在HDFS或者阿里雲OSS上構建自己的資料湖儲存服務。本文為您介紹如何在EMR Serverless Spark中實現Iceberg表的讀取與寫入操作。
前提條件
已建立工作空間,詳情請參見建立工作空間。
操作流程
SparkSQL與Notebook均支援對Iceberg表的讀寫操作。本文將以SparkSQL任務為例進行介紹。
步驟一:建立會話資源
進入會話管理頁面。
在左側導覽列,選擇。
在Spark頁面,單擊目標工作空間名稱。
在EMR Serverless Spark頁面,單擊左側導覽列中的會話管理。
在SQL會話頁面,單擊建立SQL會話。
在建立SQL會話頁面的Spark配置地區,配置以下資訊,單擊创建。詳情請參見管理SQL會話。
Spark對Iceberg的讀寫基於Catalog,您可以根據具體情境進行選擇。更多Catalog資訊,請參見管理資料目錄。
使用資料目錄Catalog
若採用資料目錄Catalog方式,則無需在會話中配置參數,只需在数据目录頁面單擊添加数据目录,然後在SparkSQL開發中直接選擇資料目錄即可。
說明如果要訪問DLF(原DLF2.5)中的Iceberg,引擎版本請使用esr-4.7.0、esr-3.6.0及以上版本。
如果要訪問DLF-Legacy(原DLF1.0)或Hive MetaStore中的Iceberg,引擎版本推薦使用esr-4.3.0、esr-3.3.0、esr-2.7.0及以上版本。
使用自訂Catalog
DLF(原DLF 2.5)
說明引擎版本要求esr-4.7.0、esr-3.6.0及以上版本。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.iceberg_catalog org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.iceberg_catalog.catalog-impl org.apache.iceberg.rest.RESTCatalog spark.sql.catalog.iceberg_catalog.uri http://<regionID>-vpc.dlf.aliyuncs.com spark.sql.catalog.iceberg_catalog.warehouse <catalog_name> spark.sql.catalog.iceberg_catalog.io-impl org.apache.iceberg.rest.DlfFileIO spark.sql.catalog.iceberg_catalog.rest.auth.type sigv4 spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type none spark.sql.catalog.iceberg_catalog.rest.signing-region <regionID> spark.sql.catalog.iceberg_catalog.rest.signing-name DlfNext spark.sql.catalog.iceberg_catalog.rest.access-key-id <access_key_id> spark.sql.catalog.iceberg_catalog.rest.secret-access-key <access_key_secret>涉及參數如下所示。
參數
說明
樣本值
spark.sql.extensions啟用Iceberg Spark擴充。
固定值:
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensionsspark.sql.catalog.iceberg_catalog註冊名為 iceberg_catalog 的 Spark Catalog。
固定值:
org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.iceberg_catalog.catalog-impl指定底層Catalog實現為Iceberg REST Catalog
org.apache.iceberg.rest.RESTCatalogspark.sql.catalog.iceberg_catalog.uriDLF Iceberg 服務的 REST API 地址,格式為
http://<regionID>-vpc.dlf.aliyuncs.com。http://cn-hangzhou-vpc.dlf.aliyuncs.comspark.sql.catalog.iceberg_catalog.warehouse指定關聯的 DLF Catalog 名稱。
說明不建議關聯資料共用建立的DLF Catalog。
<catalog_name>spark.sql.catalog.iceberg_catalog.io-impl使用 DLF 定製的 FileIO 實現。
固定值:
org.apache.iceberg.rest.DlfFileIOspark.sql.catalog.iceberg_catalog.rest.auth.type啟用 AWS SigV4 簽名認證機制,用於對 REST 請求進行身分識別驗證。
sigv4spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type禁用委託認證,由用戶端直接提供 AK/SK 進行簽名。
nonespark.sql.catalog.iceberg_catalog.rest.signing-region指定簽名所用的地區(Region),必須與 DLF 服務所在地區一致。
cn-hangzhouspark.sql.catalog.iceberg_catalog.rest.signing-name指定簽名所用的服務名稱
固定值:
DlfNextspark.sql.catalog.iceberg_catalog.rest.access-key-id阿里雲帳號或者RAM使用者的AccessKey ID。
<access_key_id>spark.sql.catalog.iceberg_catalog.rest.secret-access-key阿里雲帳號或者RAM使用者的AccessKey Secret。
<access_key_secret>DLF-Legacy(原DLF1.0)
說明引擎版本要求esr-4.3.0、esr-3.3.0、esr-2.7.0及以上版本。
中繼資料儲存在DLF-Legacy(原DLF1.0)中。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.<catalogName> org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.<catalogName>.catalog-impl org.apache.iceberg.aliyun.dlf.hive.DlfCatalog spark.sql.catalog.<catalogName>.dlf.catalog.id <catalog_name>涉及參數說明如下所示。
參數
說明
樣本值
spark.sql.extensions啟用Iceberg Spark擴充。
固定值:
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensionsspark.sql.catalog.<catalogName>註冊一個名為
<catalogName>的 Catalog。固定值:
org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.<catalogName>.catalog-impl採用阿里雲 DLF-Legacy 專用的 Hive 相容實現,直連 DLF-Legacy 中繼資料服務。
固定值:
org.apache.iceberg.aliyun.dlf.hive.DlfCatalogspark.sql.catalog.<catalogName>.dlf.catalog.id指定關聯的 DLF Catalog 名稱。
<catalog_name>Hive MetaStore
中繼資料儲存在指定的Hive MetaStore中。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.<catalogName> org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.<catalogName>.catalog-impl org.apache.iceberg.hive.HiveCatalog spark.sql.catalog.<catalogName>.uri thrift://<yourHMSUri>:<port>涉及參數如下表所示。
參數
說明
樣本值
spark.sql.extensions啟用Iceberg Spark擴充。
固定值:
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensionsspark.sql.catalog.<catalogName>註冊一個名為
<catalogName>的 Catalog固定值:
org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.<catalogName>.catalog-impl指定該 Catalog 使用 Iceberg 官方的 HiveCatalog 實現,通過 Hive Metastore 儲存和讀取 Iceberg 表的中繼資料。
固定值:
org.apache.iceberg.hive.HiveCatalogspark.sql.catalog.<catalogName>.uriHive MetaStore的URI。格式為
thrift://<Hive metastore的IP地址>:9083。<Hive metastore的IP地址>為HMS服務的內網IP地址。如果您需要指定外部Metastore服務,請參見串連外部Hive Metastore Service。thrift://192.168.**.**:9083FileSystem
中繼資料儲存在檔案系統中。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.<catalogName> org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.<catalogName>.type hadoop spark.sql.catalog.<catalogName>.warehouse oss://<yourBucketName>/warehouse涉及參數如下表所示。
參數
說明
樣本值
spark.sql.extensions啟用Iceberg Spark擴充。
固定值:
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensionsspark.sql.catalog.<catalogName>註冊一個名為
<catalogName>的 Catalog。固定值:
org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.<catalogName>.type指定 Catalog 類型為
hadoop。表示使用HadoopCatalog,將中繼資料直接儲存在檔案系統中,無需 Hive Metastore。hadoopspark.sql.catalog.<catalogName>.warehouse指定中繼資料存放區路徑。代碼中的
<yourBucketName>表示OSS上的Bucket名稱。oss://<yourBucketName>/warehouse
步驟二:讀寫Iceberg表
進入SQL開發頁面。
在EMR Serverless Spark頁面,單擊左側導覽列中的数据开发。
在開發目錄頁簽下,單擊
表徵圖。在建立對話方塊中,輸入名稱(例如users_task),類型使用預設的SparkSQL,然後單擊確定。
拷貝如下代碼到新增的SparkSQL頁簽(users_task)中。
說明在未指定資料庫的情況下,建立資料表將預設位於Catalog下的default資料庫中,使用者亦可建立並指定其他資料庫。
-- 建立資料庫 CREATE DATABASE IF NOT EXISTS iceberg_catalog.db; -- 建立非分區表 CREATE TABLE iceberg_catalog.db.tbl ( id BIGINT NOT NULL COMMENT 'unique id', data STRING ) USING iceberg; -- 插入非分區表資料 INSERT INTO iceberg_catalog.db.tbl VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); -- 查詢非分區表所有資料 SELECT * FROM iceberg_catalog.db.tbl; -- 按條件查詢非分區表 SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2; -- 更新非分區表資料 UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3; -- 重新查詢確認更新 SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3; -- 刪除非分區表資料 DELETE FROM iceberg_catalog.db.tbl WHERE id = 1; -- 重新查詢確認刪除 SELECT * FROM iceberg_catalog.db.tbl; -- 建立分區表 CREATE TABLE iceberg_catalog.db.part_tbl ( id BIGINT, data STRING, category STRING, ts TIMESTAMP, dt DATE ) USING iceberg PARTITIONED BY (dt, category); -- 插入分區表資料 INSERT INTO iceberg_catalog.db.part_tbl VALUES (1 , 'data-01', 'A', timestamp'2026-01-01 10:00:00', date'2026-01-01'), (2 , 'data-02', 'A', timestamp'2026-01-01 11:00:00', date'2026-01-01'), (3 , 'data-03', 'A', timestamp'2026-01-02 09:30:00', date'2026-01-02'), (4 , 'data-04', 'B', timestamp'2026-01-02 12:15:00', date'2026-01-02'), (5 , 'data-05', 'B', timestamp'2026-01-03 08:05:00', date'2026-01-03'), (6 , 'data-06', 'B', timestamp'2026-01-03 14:20:00', date'2026-01-03'), (7 , 'data-07', 'C', timestamp'2026-01-04 16:45:00', date'2026-01-04'), (8 , 'data-08', 'C', timestamp'2026-01-04 18:10:00', date'2026-01-04'), (9 , 'data-09', 'C', timestamp'2026-01-05 07:55:00', date'2026-01-05'), (10, 'data-10', 'A', timestamp'2026-01-05 13:35:00', date'2026-01-05'); -- 查詢分區表所有資料 SELECT * FROM iceberg_catalog.db.part_tbl; -- 查詢 dt='2026-01-01' 的資料 SELECT * FROM iceberg_catalog.db.part_tbl WHERE dt='2026-01-01'; -- 查詢某個 category 的資料 SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A'; -- 多條件組合查詢(day + category) SELECT * FROM iceberg_catalog.db.part_tbl WHERE dt='2026-01-01' AND category = 'A'; -- 彙總統計每個分類的資料數量 SELECT category, COUNT(*) AS count FROM iceberg_catalog.db.part_tbl GROUP BY category; -- 刪除資料庫(謹慎操作),刪除前需確保該db下tbl為空白 -- DROP DATABASE iceberg_catalog.db;在會話下拉式清單中選擇剛剛建立的SQL會話執行個體,並單擊“運行”按鈕。成功執行後,您可以在下方查看運行結果。

相關文檔
SQL任務和任務編排完整的開發流程樣本,請參見SparkSQL開發快速入門。
更多Iceberg相關用法和配置,請參見Apache Iceberg。
建立SQL會話資源的具體操作,請參見管理SQL會話。
建立Notebook會話資源的具體操作,請參見管理Notebook會話。