本文为您介绍如何在已经创建好的E-MapReduce(简称EMR)集群中使用Iceberg。

背景信息

阿里云EMR-5.3.0及后续版本,已经将Iceberg集成到各个开源组件中,包括Spark和Hive等引擎。本文以数据湖元数据为例,详细配置请参见数据湖元数据配置

前提条件

已在E-MapReduce控制台上,创建Hadoop的EMR-5.3.0及后续版本的集群,详情请参见创建集群

使用限制

仅EMR-5.3.0及后续版本的Hadoop集群支持Iceberg。

操作步骤

  1. 使用SSH方式登录到集群,详情信息请参见登录集群
  2. 执行以下命令,通过Spark SQL读写Iceberg配置。

    在Spark SQL中操作Iceberg,首先需要配置Catalog。Catalog的配置以spark.sql.catalog.<catalog_name>作为前缀。

    以下是在Spark SQL中使用数据湖元数据的配置,Catalog名称为dlf_catalog,其他选项的值请参见数据湖元数据配置
    spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
     --conf spark.sql.catalog.dlf_catalog=org.apache.iceberg.spark.SparkCatalog \
     --conf spark.sql.catalog.dlf_catalog.catalog-impl=org.apache.iceberg.aliyun.dlf.DlfCatalog \
     --conf spark.sql.catalog.dlf_catalog.io-impl=org.apache.iceberg.hadoop.HadoopFileIO \
     --conf spark.sql.catalog.dlf_catalog.oss.endpoint=<yourOSSEndpoint> \
     --conf spark.sql.catalog.dlf_catalog.warehouse=<yourOSSWarehousePath> \
     --conf spark.sql.catalog.dlf_catalog.access.key.id=<yourAccessKeyId> \
     --conf spark.sql.catalog.dlf_catalog.access.key.secret=<yourAccessKeySecret> \
     --conf spark.sql.catalog.dlf_catalog.dlf.catalog-id=<yourCatalogId> \
     --conf spark.sql.catalog.dlf_catalog.dlf.endpoint=<yourDLFEndpoint> \
     --conf spark.sql.catalog.dlf_catalog.dlf.region-id=<yourDLFRegionId>
    当返回信息中包含如下信息时,表示已进入spark-sql命令行。
    spark-sql>
  3. 基本操作。
    说明 本文示例中的dlf_catalog为您创建的Catalog名称。为保证操作的数据库和表都是在指定的Catalog下,您需要在命令中的数据库和表名前带上dlf_catalog。例如:
    • 访问dlf_catalog下的数据库iceberg_db,代码中应为dlf_catalog.iceberg_db
    • 访问dlf_catalog下,数据库名为iceberg_db下的表sample,代码中应为dlf_catalog.iceberg_db.sample
    • 创建库
      CREATE DATABASE IF NOT EXISTS dlf_catalog.iceberg_db;
    • 创建表
      CREATE TABLE IF NOT EXISTS dlf_catalog.iceberg_db.sample(
          id BIGINT COMMENT 'unique id', 
          data STRING
      ) 
      USING iceberg;
      Iceberg表支持COMMENT、PARTITIONED BY、LOCATION和TBLPROPERTIES等语法。如果通过TBLPROPERTIES设置表级别属性,代码示例如下。
      CREATE TABLE IF NOT EXISTS dlf_catalog.iceberg_db.sample(
          id BIGINT COMMENT 'unique id', 
          data STRING
      ) 
      USING iceberg
      TBLPROPERTIES (
          'write.format.default'='parquet'
      );
    • 写入数据
      INSERT INTO dlf_catalog.iceberg_db.sample VALUES (1, 'a'), (2, 'b'), (3, 'c');
    • 查询数据
      SELECT * FROM dlf_catalog.iceberg_db.sample;
      SELECT count(1) AS count, data FROM dlf_catalog.iceberg_db.sample GROUP BY data;
    • 更新数据
      UPDATE dlf_catalog.iceberg_db.sample SET data = 'x' WHERE id = 3;
    • 删除数据
      DELETE FROM dlf_catalog.iceberg_db.sample WHERE id = 3;