This topic describes how to use Iceberg in an E-MapReduce (EMR) cluster.

Background information

In this topic, metadata is managed by using Data Lake Formation (DLF). For more information about metadata configurations, see Configuration of DLF metadata.

Prerequisites

A Hadoop cluster of EMR V5.3.0 or later is created in the EMR console. For more information, see Create a cluster.

Limits

Iceberg Spark SQL extensions are not supported in Spark 2.4. Therefore, if you use a cluster of EMR V3.38.X or a later minor version, you can use only the Spark DataFrame API to perform operations related to Iceberg. This topic describes how to use Spark SQL to perform operations related to Iceberg in a cluster of EMR V5.3.0 or later.

Procedure

  1. Log on to your EMR cluster in SSH mode. For more information, see Log on to a cluster.
  2. Run the following commands to configure parameters related to Iceberg.

    Before you can use Spark SQL to perform operations related to Iceberg, you must configure a catalog. The name of the catalog must be in the spark.sql.catalog.<catalog_name> format.

    The following commands show how to configure a catalog. The default name of the catalog and the parameters that you must configure vary based on the version of your cluster. For more information, see Configuration of DLF metadata. In the following configurations, DLF is used to manage metadata.

    • EMR V5.6.0 or later
      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.iceberg.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \
    • EMR V5.5.X
      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.dlf=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.dlf.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \
       --conf spark.sql.catalog.dlf.warehouse=<yourOSSWarehousePath> \
      Note You can leave the spark.sql.catalog.dlf.warehouse parameter empty. If you do not specify this parameter, the default warehouse path is used.
    • EMR V5.3.X and EMR V5.4.X
      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.dlf_catalog=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.dlf_catalog.catalog-impl=org.apache.iceberg.aliyun.dlf.DlfCatalog \
       --conf spark.sql.catalog.dlf_catalog.io-impl=org.apache.iceberg.hadoop.HadoopFileIO \
       --conf spark.sql.catalog.dlf_catalog.oss.endpoint=<yourOSSEndpoint> \
       --conf spark.sql.catalog.dlf_catalog.warehouse=<yourOSSWarehousePath> \
       --conf spark.sql.catalog.dlf_catalog.access.key.id=<yourAccessKeyId> \
       --conf spark.sql.catalog.dlf_catalog.access.key.secret=<yourAccessKeySecret> \
       --conf spark.sql.catalog.dlf_catalog.dlf.catalog-id=<yourCatalogId> \
       --conf spark.sql.catalog.dlf_catalog.dlf.endpoint=<yourDLFEndpoint> \
       --conf spark.sql.catalog.dlf_catalog.dlf.region-id=<yourDLFRegionId>
    If the output contains the following information, the Spark SQL CLI is started:
    spark-sql>
  3. Perform basic operations.
    Notice In the following example, <yourCatalogName> specifies the name of your catalog. You can replace <yourCatalogName> with the actual catalog name.
    • Create a database
      CREATE DATABASE IF NOT EXISTS <yourCatalogName>.iceberg_db;
    • Create a table
      CREATE TABLE IF NOT EXISTS <yourCatalogName>.iceberg_db.sample(
          id BIGINT COMMENT 'unique id', 
          data STRING
      ) 
      USING iceberg;
      The statement that is used to create an Iceberg table can include the COMMENT, PARTITIONED BY, LOCATION, and TBLPROPERTIES clauses. The following code shows how to use the TBLPROPERTIES clause to configure table-level properties:
      CREATE TABLE IF NOT EXISTS <yourCatalogName>.iceberg_db.sample(
          id BIGINT COMMENT 'unique id', 
          data STRING
      ) 
      USING iceberg
      TBLPROPERTIES (
          'write.format.default'='parquet'
      );
    • Insert data into the table
      INSERT INTO <yourCatalogName>.iceberg_db.sample VALUES (1, 'a'), (2, 'b'), (3, 'c');
    • Query data
      SELECT * FROM <yourCatalogName>.iceberg_db.sample;
      SELECT count(1) AS count, data FROM dlf_catalog.iceberg_db.sample GROUP BY data;
    • Update data
      UPDATE <yourCatalogName>.iceberg_db.sample SET data = 'x' WHERE id = 3;
    • Delete data
      DELETE FROM <yourCatalogName>.iceberg_db.sample WHERE id = 3;