本文为您介绍如何通过Spark DataFrame API以批处理的方式读写Iceberg表。本文以Spark 3.x操作Iceberg表为例。

前提条件

已在E-MapReduce控制台上,创建Hadoop的EMR-5.3.0及后续版本的集群,详情请参见创建集群

使用限制

仅EMR-5.3.0及后续版本的Hadoop集群支持Iceberg。

操作步骤

  1. 新建Maven项目,引入Pom依赖。
    引入Spark及Iceberg的依赖,以下代码示例指定了Spark 3.1.1与Iceberg 0.12.0版本,使用provided引包编译,运行时使用集群上的软件包。
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.1.1</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.1.1</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-core</artifactId>
        <version>0.12.0</version>
        <scope>provided</scope>
    </dependency>
    说明 由于EMR集群的Iceberg软件包与开源依赖包存在一定差异,例如EMR Iceberg默认集成了DLF Catalog,所以建议您在本地使用provided方式引入开源Iceberg依赖进行代码编译,打包放到集群上运行时使用集群环境中的依赖。
  2. 配置Catalog。
    使用Spark API操作Iceberg表,首先需要配置Catalog,在SparkConf中加入必要配置项即可。
    sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
    sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
    sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
    sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
    sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
    sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>")
    sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>")
    sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
    sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
    sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
    说明 示例中的dlf_catalog为Catalog名称,其他配置项含义请参见数据湖元数据配置
  3. 写表。
    Spark 3.x支持DataFrameWriterV2 API写入数据到Iceberg表。目前v1 DataFrame API已不推荐使用,以下代码以V2 API写Iceberg表sample为例。
    创建数据表
    val df: DataFrame = ...
    df.writeTo("dlf_catalog.iceberg_db.sample").create()
    说明 创建表支持create、replace以及createOrReplace语义,另外支持通过tableProperty和partitionedBy配置表的属性与分区字段。
    您可以通过以下命令追加或覆盖数据:
    • 追加数据
      val df: DataFrame = ...
      df.writeTo("dlf_catalog.iceberg_db.sample").append()
    • 覆盖数据
      val df: DataFrame = ...
      df.writeTo("dlf_catalog.iceberg_db.sample").overwritePartitions()
  4. 读表。
    请根据您Spark的版本,选择读表的方式:
    • Spark 3.x(推荐)
       val df = spark.table("dlf_catalog.iceberg_db.sample")
    • Spark 2.4
      val df = spark.read.format("iceberg").load("dlf_catalog.iceberg_db.sample")

示例

本示例是使用Spark DataFrame API批式读写Iceberg表。

  1. 通过Spark SQL创建测试使用的数据库iceberg_db,详细信息请参见基础使用
  2. 编写Spark代码。
    以Scala版代码为例,代码示例如下。
    def main(args: Array[String]): Unit = {
    
      // 配置使用数据湖元数据
      val sparkConf = new SparkConf()
      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
    
      val spark = SparkSession
      .builder()
      .config(sparkConf)
      .appName("IcebergReadWriteTest")
      .getOrCreate()
    
      // 从DataFrame中创建或替换Iceberg表
      val firstDF = spark.createDataFrame(Seq(
      (1, "a"), (2, "b"), (3, "c")
      )).toDF("id", "data")
    
      firstDF.writeTo("dlf_catalog.iceberg_db.sample").createOrReplace()
    
      // 将DataFrame写入Iceberg表
      val secondDF = spark.createDataFrame(Seq(
      (4, "d"), (5, "e"), (6, "f")
      )).toDF("id", "data")
    
      secondDF.writeTo("dlf_catalog.iceberg_db.sample").append()
    
      // 读Iceberg表
      val icebergTable = spark.table("dlf_catalog.iceberg_db.sample")
    
      icebergTable.show()
    }
    说明 示例中的dlf_catalog为Catalog名称,其他配置项含义请参见数据湖元数据配置
  3. 打包程序并部署到EMR集群。
    1. 检查编译Scala代码的Maven插件,可以在pom.xml中配置如下插件。
      <build>
          <plugins>
              <!-- the Maven Scala plugin will compile Scala source files -->
              <plugin>
                  <groupId>net.alchim31.maven</groupId>
                  <artifactId>scala-maven-plugin</artifactId>
                  <version>3.2.2</version>
                  <executions>
                      <execution>
                          <goals>
                              <goal>compile</goal>
                              <goal>testCompile</goal>
                          </goals>
                      </execution>
                  </executions>
              </plugin>
          </plugins>
      </build>
    2. 您可以在本地完成代码调试后,通过如下命令打包。
      mvn clean install
    3. 使用SSH方式登录到集群,详情信息请参见登录集群
    4. 上传JAR包至EMR集群。
      本示例是上传到EMR集群的根目录下。
  4. 执行以下命令,通过spark-submit运行Spark作业。
    spark-submit \
     --master yarn \
     --deploy-mode cluster \
     --driver-memory 1g \
     --executor-cores 1 \
     --executor-memory 1g \
     --num-executors 1 \
     --class com.aliyun.iceberg.IcebergTest \
     iceberg-demos.jar
    说明 iceberg-demos.jar为您打包好的JAR包。--class和JAR包请根据您实际信息修改。
    运行结果如下。
    +---+----+
    | id|data|
    +---+----+
    |  4|   d|
    |  1|   a|
    |  5|   e|
    |  6|   f|
    |  2|   b|
    |  3|   c|
    +---+----+