All Products
Search
Document Center

E-MapReduce:Use Iceberg

Last Updated:Mar 12, 2024

This topic describes how to use Iceberg in an E-MapReduce (EMR) cluster.

Background information

In this topic, metadata is managed by using Data Lake Formation (DLF). For more information about metadata configurations, see Configuration of DLF metadata.

Prerequisites

A Hadoop cluster of EMR V5.3.0 or later is created in the EMR console. For more information, see Create a cluster.

Limits

Iceberg Spark SQL extensions are not supported in Spark 2.4. Therefore, if you use a cluster of EMR V3.38.X or a later minor version, you can use only the Spark DataFrame API to perform operations related to Iceberg. This topic describes how to use Spark SQL to perform operations related to Iceberg in a cluster of EMR V5.3.0 or later.

Procedure

  1. Log on to your EMR cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following commands to configure parameters related to Iceberg.

    Before you can use Spark SQL to perform operations related to Iceberg, you must configure a catalog. The name of the catalog must be in the spark.sql.catalog.<catalog_name> format.

    The following commands show how to configure a catalog. The default name of the catalog and the parameters that you must configure vary based on the version of your cluster. For more information, see Configuration of DLF metadata. In the following configurations, DLF is used to manage metadata.

    • EMR V5.6.0 or later

      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.iceberg.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \
    • EMR V5.5.X

      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.dlf=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.dlf.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \
       --conf spark.sql.catalog.dlf.warehouse=<yourOSSWarehousePath> \
      Note

      You can leave the spark.sql.catalog.dlf.warehouse parameter empty. If you do not specify this parameter, the default warehouse path is used.

    • EMR V5.3.X and EMR V5.4.X

      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.dlf_catalog=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.dlf_catalog.catalog-impl=org.apache.iceberg.aliyun.dlf.DlfCatalog \
       --conf spark.sql.catalog.dlf_catalog.io-impl=org.apache.iceberg.hadoop.HadoopFileIO \
       --conf spark.sql.catalog.dlf_catalog.oss.endpoint=<yourOSSEndpoint> \
       --conf spark.sql.catalog.dlf_catalog.warehouse=<yourOSSWarehousePath> \
       --conf spark.sql.catalog.dlf_catalog.access.key.id=<yourAccessKeyId> \
       --conf spark.sql.catalog.dlf_catalog.access.key.secret=<yourAccessKeySecret> \
       --conf spark.sql.catalog.dlf_catalog.dlf.catalog-id=<yourCatalogId> \
       --conf spark.sql.catalog.dlf_catalog.dlf.endpoint=<yourDLFEndpoint> \
       --conf spark.sql.catalog.dlf_catalog.dlf.region-id=<yourDLFRegionId>

    If the output contains the following information, the Spark SQL CLI is started:

    spark-sql>
  3. Perform basic operations.

    Important

    In the following example, <yourCatalogName> specifies the name of your catalog. You can replace <yourCatalogName> with the actual catalog name.

    • Create a database

      CREATE DATABASE IF NOT EXISTS <yourCatalogName>.iceberg_db;
    • Create a table

      CREATE TABLE IF NOT EXISTS <yourCatalogName>.iceberg_db.sample(
          id BIGINT COMMENT 'unique id', 
          data STRING
      ) 
      USING iceberg;

      The statement that is used to create an Iceberg table can include the COMMENT, PARTITIONED BY, LOCATION, and TBLPROPERTIES clauses. The following code shows how to use the TBLPROPERTIES clause to configure table-level properties:

      CREATE TABLE IF NOT EXISTS <yourCatalogName>.iceberg_db.sample(
          id BIGINT COMMENT 'unique id', 
          data STRING
      ) 
      USING iceberg
      TBLPROPERTIES (
          'write.format.default'='parquet'
      );
    • Insert data into the table

      INSERT INTO <yourCatalogName>.iceberg_db.sample VALUES (1, 'a'), (2, 'b'), (3, 'c');
    • Query data

      SELECT * FROM <yourCatalogName>.iceberg_db.sample;
      SELECT count(1) AS count, data FROM <yourCatalogName>.iceberg_db.sample GROUP BY data;
    • Update data

      UPDATE <yourCatalogName>.iceberg_db.sample SET data = 'x' WHERE id = 3;
    • Delete data

      DELETE FROM <yourCatalogName>.iceberg_db.sample WHERE id = 3;