All Products
Search
Document Center

E-MapReduce:Use Spark to write data to an Iceberg table and read data from the table in batch mode

Last Updated:Aug 14, 2023

This topic describes how to use the Spark DataFrame API to write data to an Iceberg table and read data from the table in batch mode. In this topic, Spark 3.x is used.

Prerequisites

An EMR Hadoop cluster is created. For more information, see Create a cluster.
Note This topic applies only to a Hadoop cluster of EMR V3.38.0, EMR V5.4.0, or a minor version later than EMR V3.38.0 or EMR V5.4.0.

Procedure

  1. Create a Maven project and add dependencies to the project object model (POM) file of the project.

    Add Spark dependencies and Iceberg dependencies to the POM file. In the following code, dependencies for Spark 3.1.1 and dependencies for Iceberg 0.12.0 are added. The code is compiled by using the provided dependency scope. We recommend that you use the Iceberg software package that runs in your E-MapReduce (EMR) cluster.

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.1.1</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.1.1</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-core</artifactId>
        <version>0.12.0</version>
        <scope>provided</scope>
    </dependency>
    Note

    The Iceberg software package in your EMR cluster is different from an open source Iceberg dependency package. For example, a Data Lake Formation (DLF) catalog is automatically integrated with the Iceberg software package in your EMR cluster. We recommend that you add the open source Iceberg dependencies by using the provided dependency scope to compile the code on your on-premises machine, package the code, and then use the dependencies in your cluster environment to run the code.

  2. Configure a catalog.

    Before you call a Spark API to perform operations on an Iceberg table, add the required configuration items to the related SparkConf object to configure a catalog.

    The following commands show how to configure a catalog. The default name of the catalog and the parameters that you must configure vary based on the version of your cluster. For more information, see Configuration of DLF metadata. In the following configurations, DLF is used to manage metadata.

    • EMR V3.40 or a later minor version, and EMR V5.6.0 or later

      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.iceberg.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog")
    • EMR V3.39.X and EMR V5.5.X

      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf.warehouse", "<yourOSSWarehousePath>")
    • EMR V3.38.X, EMR V5.3.X, and EMR V5.4.X

      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
      Note

      You must configure environment variables before you can run the sample code. For more information about how to configure environment variables, see the Configure environment variables section in this topic.

  3. Write data to an Iceberg table.

    If you use Spark 3.x, you can use the DataFrameWriterV2 API to write data to an Iceberg table. We recommend that you do not use the DataFrameWriterV1 API. The following sample code provides an example on how to use the DataFrameWriterV2 API to write data to an Iceberg table.

    In the following example, <yourCatalogName> specifies the name of your catalog. You can replace <yourCatalogName> with the actual catalog name.

    Run the following code to create a data table:

    val df: DataFrame = ...
    df.writeTo("<yourCatalogName>.iceberg_db.sample").create()
    Note

    You can run the create, replace, or createOrReplace command to create a data table. You can also use the tableProperty method to configure properties for the data table and use the partitionedBy method to configure partition fields for the data table.

    You can run the following commands to append and overwrite data:

    • Write data to the table in append mode

      val df: DataFrame = ...
      df.writeTo("<yourCatalogName>.iceberg_db.sample").append()
    • Write data to the table in overwrite mode

      val df: DataFrame = ...
      df.writeTo("<yourCatalogName>.iceberg_db.sample").overwritePartitions()
  4. Read data from the table.

    Select a data read method based on the version of Spark that you use:

    • Spark 3.x (recommended)

       val df = spark.table("<yourCatalogName>.iceberg_db.sample")
    • Spark 2.4

      val df = spark.read.format("iceberg").load("<yourCatalogName>.iceberg_db.sample")

Example

In this example, the Spark DataFrame API is called to write data to an Iceberg table and read data from the table in batch mode.

Important

The parameters and the default name of the catalog vary based on the version of your cluster. In this example, DLF is used to manage metadata. In this example, an EMR V5.3.0 cluster and a catalog named dlf_catalog are used. For more information, see Configuration of DLF metadata.

  1. Use Spark SQL to create a database named iceberg_db for your test. For more information, see Use Iceberg.

  2. Write Spark code.

    Sample code in Scala:

    def main(args: Array[String]): Unit = {
    
      // Configure the parameters for the catalog. 
      val sparkConf = new SparkConf()
      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
    
      val spark = SparkSession
      .builder()
      .config(sparkConf)
      .appName("IcebergReadWriteTest")
      .getOrCreate()
    
      // Create or replace an Iceberg table in a DataFrame.
      val firstDF = spark.createDataFrame(Seq(
      (1, "a"), (2, "b"), (3, "c")
      )).toDF("id", "data")
    
      firstDF.writeTo("dlf_catalog.iceberg_db.sample").createOrReplace()
    
      // Write data from a DataFrame to an Iceberg table.
      val secondDF = spark.createDataFrame(Seq(
      (4, "d"), (5, "e"), (6, "f")
      )).toDF("id", "data")
    
      secondDF.writeTo("dlf_catalog.iceberg_db.sample").append()
    
      // Read data from the Iceberg table.
      val icebergTable = spark.table("dlf_catalog.iceberg_db.sample")
    
      icebergTable.show()
    }
  3. Package the code, and deploy the code to the EMR cluster.

    1. Check the Maven plug-ins that are used to compile the code in Scala. You can configure the following plug-in in the pom.xml file:

      <build>
          <plugins>
              <!-- the Maven Scala plugin will compile Scala source files -->
              <plugin>
                  <groupId>net.alchim31.maven</groupId>
                  <artifactId>scala-maven-plugin</artifactId>
                  <version>3.2.2</version>
                  <executions>
                      <execution>
                          <goals>
                              <goal>compile</goal>
                              <goal>testCompile</goal>
                          </goals>
                      </execution>
                  </executions>
              </plugin>
          </plugins>
      </build>
    2. Debug the code on your on-premises machine and run the following command to package the code:

      mvn clean install
    3. Log on to your EMR cluster in SSH mode. For more information, see Log on to a cluster.

    4. Upload the JAR package to the EMR cluster.

      In this example, the JAR package is uploaded to the root directory of the EMR cluster.

  4. Run the spark-submit command to submit the Spark job:

    spark-submit \
     --master yarn \
     --deploy-mode cluster \
     --driver-memory 1g \
     --executor-cores 1 \
     --executor-memory 1g \
     --num-executors 1 \
     --class com.aliyun.iceberg.IcebergTest \
     iceberg-demos.jar
    Note

    In this example, a JAR package named iceberg-demos.jar is used. You can change the value of the --class parameter and the name of the JAR package based on your business requirements.

    The following result is returned:

    +---+----+
    | id|data|
    +---+----+
    |  4|   d|
    |  1|   a|
    |  5|   e|
    |  6|   f|
    |  2|   b|
    |  3|   c|
    +---+----+