本文為您介紹在EMR Serverless Spark叢集中如何以Iceberg REST對接 DLF Catalog。
前提條件
已建立與DLF執行個體同地區的Serverless Spark工作空間,詳情請參見建立工作空間。
如果是RAM使用者,在進行資料操作之前,需要先授予相應的資源許可權。詳情請參見資料授權管理。
使用限制
目前支援以下類型的任務:
SQL會話:管理SQL會話。
Thrift Server:管理Spark Thrift Server會話。
批任務:批任務開發。
步驟一:Catalog授權
登入資料湖構建控制台。
在Catalogs列表頁面,單擊Catalog名稱,進入Catalog詳情頁。
如果要授予整個 catalog 許可權,則直接單擊許可權。否則點擊進入對應資料庫或者表,再點擊許可權目錄,進行許可權授予。
在授權頁面,配置以下資訊,單擊確定。
使用者/角色:選擇RAM使用者/RAM角色。
選擇授權對象:在下拉式清單中選擇AliyunECSInstanceForEMRRole。
說明如果使用者下拉式清單中未找到AliyunECSInstanceForEMRRole,可以在使用者管理頁面單擊同步。
預置權限類別型:可自訂選擇讀取許可權,或者直接使用 Data Reader/Data Editor。
步驟二:讀寫資料
串連Catalog
建立SQL會話,請參見管理SQL會話。引擎版本請使用esr-4.7.0、esr-3.6.0及以上版本。
使用資料目錄Catalog
採用資料目錄Catalog方式,則無需在會話中配置參數,只需在資料目錄頁面單擊添加資料目錄,然後在SparkSQL開發中直接選擇資料目錄即可。
使用自訂Catalog
在自訂配置下的Spark 配置中添加以下配置。
重要以下配置樣本中,
iceberg_catalog為自訂的 Catalog 名稱,用於在 Spark 中註冊一個基於 Iceberg REST Catalog 的 Iceberg 表管理服務。該Catalog 通過 REST API 與阿里雲 DLF服務對接,您可根據實際環境修改 Catalog 名稱及相關參數。${regionID}:替換為實際的地區ID,例如cn-hangzhou,請參見服務存取點。${catalogName}:替換為在DLF中建立的Catalog 名稱。${access_key_id},${access_key_secret}:阿里雲賬戶AK/SK。
# 啟用Iceberg Spark擴充 spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions # 註冊名為 iceberg_catalog 的 Spark Catalog spark.sql.catalog.iceberg_catalog org.apache.iceberg.spark.SparkCatalog # 指定底層Catalog實現為Iceberg REST Catalog spark.sql.catalog.iceberg_catalog.catalog-impl org.apache.iceberg.rest.RESTCatalog # DLF Iceberg 服務的 REST API 地址(需替換 ${regionID} 為實際地區 ID) spark.sql.catalog.iceberg_catalog.uri http://${regionID}-vpc.dlf.aliyuncs.com/iceberg # 指定關聯的 DLF Catalog 名稱 spark.sql.catalog.iceberg_catalog.warehouse ${catalogName} # 使用 DLF 定製的 FileIO 實現 spark.sql.catalog.iceberg_catalog.io-impl org.apache.iceberg.rest.DlfFileIO # 啟用 SigV4 簽名認證 spark.sql.catalog.iceberg_catalog.rest.auth.type sigv4 spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type none spark.sql.catalog.iceberg_catalog.rest.signing-region ${regionID} spark.sql.catalog.iceberg_catalog.rest.signing-name DlfNext # 訪問憑證 spark.sql.catalog.iceberg_catalog.rest.access-key-id ${access_key_id} spark.sql.catalog.iceberg_catalog.rest.secret-access-key ${access_key_secret}讀寫資料
SQL任務完整的開發流程樣本,請參見SparkSQL開發快速入門。
說明不指定資料庫時,建立資料表會預設建在Catalog下的
default資料庫中,也可建立並指定其他資料庫。-- 建立資料庫 CREATE DATABASE IF NOT EXISTS db; -- 建立非分區表 CREATE TABLE iceberg_catalog.db.tbl ( id BIGINT NOT NULL COMMENT 'unique id', data STRING ) USING iceberg; -- 插入非分區表資料 INSERT INTO iceberg_catalog.db.tbl VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); -- 查詢非分區表所有資料 SELECT * FROM iceberg_catalog.db.tbl; -- 按條件查詢非分區表 SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2; -- 更新非分區表資料 UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3; -- 重新查詢確認更新 SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3; -- 刪除非分區表資料 DELETE FROM iceberg_catalog.db.tbl WHERE id = 1; -- 重新查詢確認刪除 SELECT * FROM iceberg_catalog.db.tbl; -- 建立分區表 CREATE TABLE iceberg_catalog.db.part_tbl ( id BIGINT, data STRING, category STRING, ts TIMESTAMP ) USING iceberg PARTITIONED BY (category); -- 插入分區表資料 INSERT INTO iceberg_catalog.db.part_tbl VALUES (100, 'Data1', 'A', to_timestamp('2025-01-01 12:00:00')), (200, 'Data2', 'B', to_timestamp('2025-01-02 14:00:00')), (300, 'Data3', 'A', to_timestamp('2025-01-01 15:00:00')), (400, 'Data4', 'C', to_timestamp('2025-01-03 10:00:00')); -- 查詢分區表所有資料 SELECT * FROM iceberg_catalog.db.part_tbl; -- 查詢 bucket(16, id) = 0 的資料 SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0; -- 查詢 day(ts) = '2025-01-01' 的資料 SELECT * FROM iceberg_catalog.db.part_tbl WHERE days(ts) = '2025-01-01'; -- 查詢某個 category 的資料 SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A'; -- 多條件組合查詢(bucket + day + category) SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0 AND days(ts) = '2025-01-01' AND category = 'A'; -- 彙總統計每個分類的資料數量 SELECT category, COUNT(*) AS count FROM iceberg_catalog.db.part_tbl GROUP BY category; -- 刪除資料庫(謹慎操作),刪除前需確保該db下tbl為空白 -- DROP DATABASE iceberg_catalog.db;