本文为您介绍如何通过Spark读取或写入数据至Hologres的操作方法。

背景信息

Spark是用于大规模数据处理的统一分析引擎,Hologres已经与Spark(社区版以及EMR Spark版)高效打通,快速助力企业搭建数据仓库。Hologres提供的Spark Connector,支持Spark以批处理的方式将数据写入Hologres,同时Spark支持读取多种数据源(例如文件、Hive、MySQL、PostgreSQL等)。

Hologres兼容PostgreSQL,因此Spark也可以用读取PostgreSQL的方式直接读取Hologres数据,进行ETL处理,再写入Hologres及其他数据源,完成大数据开发抽取、处理、加载的完整闭环。

前提条件

  • 实例版本需为V0.9及以上版本。请在Hologres管控台的实例详情页查看当前实例版本,如实例是V0.9以下版本,请您使用自助升级或通过搜索(钉钉群号:32314975)加入实时数仓Hologres交流群申请升级实例。
  • 需要安装对应版本的Spark环境,能够运行spark-shell命令。

通过Spark Connector写入(推荐使用)

Hologres当前支持使用内置的Spark Connector将Spark数据写入Hologres,相比其他写入方式,调用基于Holo Client实现Connector写入的方式性能更优。具体操作步骤如下,阿里云也为您提供了相关的使用示例,详情请参见通过Spark Connector写入使用示例

  1. 获取JAR包。
    Spark2和Spark3上均已支持Connector写入,Spark写入Hologres时需要引用connector的JAR包,当前已经发布到Maven中央仓库,在项目中参照如下pom文件进行配置。
    说明 相关Connector也已开源,详情请参见hologres-connectors
    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>hologres-connector-spark-2.x</artifactId>
        <version>1.0.1</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>
    当前Hologres已自动生成JAR文件,下载链接如下。
  2. 使用JAR包。
    执行如下命令,启动Spark。
    spark-shell --jars hologres-connector-spark-2.x-1.0-SNAPSHOT-jar-with-dependencies.jar
  3. 配置Spark。
    在Spark中执行如下命令,以连接Hologres将数据加载至目标表中。
    • 代码:
      df.write
        .format("hologres")
        .option(SourceProvider.USERNAME, "AccessKey ID") //阿里云账号的AccessKey ID。
        .option(SourceProvider.PASSWORD, "Accesskey SECRET") //阿里云账号的Accesskey SECRET。
        .option(SourceProvider.ENDPOINT, "Ip:Port") //Hologres的Ip和Port。
        .option(SourceProvider.DATABASE, "Database")
        .option(SourceProvider.TABLE, "Table")
        .save()
      说明 其中如下代码可以进行替换。
      • 原代码:
        .option(SourceProvider.endpoint, "Ip:Port")
        .option(SourceProvider.database, "Database")
      • 替换代码:
        .option(SourceProvider.jdbcUrl, "jdbc:postgresql://Ip:Port/Database")
    • 参数释义:
      参数名默认值是否必填参数描述
      USERNAME登录Hologres账号的AccessKey ID。您可以单击AccessKey 管理
      PASSWORD登录Hologres账号的AccessKey Secret。您可以单击AccessKey 管理
      TABLEHologres用于接收数据的表名称。
      ENDPOINTJDBCURL二选一Hologres实例的网络域名。

      您可以进入Hologres管理控制台实例详情页,从实例配置获取主机和端口号。

      DATABASEJDBCURL二选一Hologres接收数据的表所在数据库名称。
      JDBCURLENDPOINT+DATABASE组合设置二选一Hologres的JDBCURL
      INPUT_DATA_SCHEMA_DDLspark3.x必填,值为<your_DataFrame>.schema.toDDLSpark中DataFrame的DDL。
      WRITE_MODEINSERT_OR_REPLACE当INSERT目标表为有主键的表时采用不同策略。
      • INSERT_OR_IGNORE:当主键冲突时,不写入。
      • INSERT_OR_UPDATE:当主键冲突时,更新相应列。
      • INSERT_OR_REPLACE:当主键冲突时,更新所有列。
      WRITE_BATCH_SIZE512每个写入线程的最大批次大小,在经过WRITE_MODE合并后的Put数量达到WRITE_BATCH_SIZE时进行一次批量提交。
      WRITE_BATCH_BYTE_SIZE2 MB每个写入线程的最大批次Byte大小,在经过WRITE_MODE合并后的Put数据字节数达到WRITE_BATCH_BYTE_SIZE时进行一次批量提交。
      WRITE_MAX_INTERVAL_MS10000 ms距离上次提交超过WRITE_MAX_INTERVAL_MS会触发一次批量提交。
      WRITE_FAIL_STRATEGYTYR_ONE_BY_ONE当某一批次提交失败时,会将批次内的记录逐条提交(保序),单条提交失败的记录将会跟随异常被抛出。
      WRITE_THREAD_SIZE1写入并发线程数(每个并发占用1个数据库连接)。

      在一个Spark作业中,占用的总连接数与Spark并发相关,关系为总连接数= spark.default.parallelism * WRITE_THREAD_SIZE

      DYNAMIC_PARTITIONfalse若为true,写入分区表父表时,当分区不存在时自动创建分区。
      RETRY_COUNT3当连接故障时,写入和查询的重试次数。
      RETRY_SLEEP_INIT_MS1000 ms每次重试的等待时间=RETRY_SLEEP_INIT_MS+RETRY_COUNT*RETRY_SLEEP_STEP_MS
      RETRY_SLEEP_STEP_MS10*1000 ms每次重试的等待时间=RETRY_SLEEP_INIT_MS+RETRY_COUNT*RETRY_SLEEP_STEP_MS
      CONNECTION_MAX_IDLE_MS60000 ms写入线程和点查线程数据库连接的最大IDLE时间,超过此时间的连接将被释放。

通过Spark Connector写入使用示例

根据如下示例步骤为您介绍,如何通过Spark Connector将数据写入Hologres。

  1. 创建Hologres表。
    在Hologres中执行如下SQL命令创建目标表,用来接受数据。
    CREATE TABLE tb008 (
      id BIGINT primary key,
      counts INT,
      name TEXT,
      price NUMERIC(38, 18),
      out_of_stock BOOL,
      weight DOUBLE PRECISION,
      thick FLOAT,
      time TIMESTAMPTZ,
      dt DATE, 
      by bytea,
      inta int4[],
      longa int8[],
      floata float4[],
      doublea float8[],
      boola boolean[],
      stringa text[]
    );
  2. Spark准备数据并写入Hologres。
    1. 在命令行运行命令开启Spark。
      spark-shell --jars hologres-connector-spark-2.x-1.0-SNAPSHOT-jar-with-dependencies.jar
    2. spark-shell里使用命令load spark-test.scala执行测试文件,加载测试示例。
      spark-test.scala文件示例如下。
      import java.sql.{Timestamp, Date}
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.Row
      import com.alibaba.hologres.spark2.sink.SourceProvider
      
      val byteArray = Array(1.toByte, 2.toByte, 3.toByte, 'b'.toByte, 'a'.toByte)
      val intArray = Array(1, 2, 3)
      val longArray = Array(1L, 2L, 3L)
      val floatArray = Array(1.2F, 2.44F, 3.77F)
      val doubleArray = Array(1.222, 2.333, 3.444)
      val booleanArray = Array(true, false, false)
      val stringArray = Array("abcd", "bcde", "defg")
      
      val data = Seq(
        Row(-7L, 100, "phone1", BigDecimal(1234.567891234), false, 199.35, 6.7F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("2021-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(6L, -10, "phone2", BigDecimal(1234.56), true, 188.45, 7.8F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("1970-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(1L, 10, "phone3\"", BigDecimal(1234.56), true, 111.45, null, Timestamp.valueOf("2020-02-29 00:12:33"), Date.valueOf("2020-07-23"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray)
      )
      
      
      val schema = StructType(Array(
        StructField("id", LongType),
        StructField("counts", IntegerType),
        StructField("name", StringType, false), //false表示此Field不允许为null
        StructField("price", DecimalType(38, 12)),
        StructField("out_of_stock", BooleanType),
        StructField("weight", DoubleType),
        StructField("thick", FloatType),
        StructField("time", TimestampType),
        StructField("dt", DateType),
        StructField("by", BinaryType),
        StructField("inta", ArrayType(IntegerType)),
        StructField("longa", ArrayType(LongType)),
        StructField("floata", ArrayType(FloatType)),
        StructField("doublea", ArrayType(DoubleType)),
        StructField("boola", ArrayType(BooleanType)),
        StructField("stringa", ArrayType(StringType))
      ))
      
      
      val df = spark.createDataFrame(
        spark.sparkContext.parallelize(data),
        schema
      )
      df.show()
      
      //配置导入数据至Hologres的信息。
      df.write.format("hologres") //必须配置为hologres
        .option(SourceProvider.USERNAME, "your_username") //阿里云账号的AccessKey ID。
        .option(SourceProvider.PASSWORD, "your_password") //阿里云账号的Accesskey SECRET。
        .option(SourceProvider.ENDPOINT, "Ip:Port") //Hologres的Ip和Port。
        .option(SourceProvider.DATABASE, "test_database") //Hologres的数据库名称,示例为test_database。
        .option(SourceProvider.TABLE, "tb008") //Hologres用于接收数据的表名称,示例为tb008。
        .option(SourceProvider.WRITE_BATCH_SIZE, 512) // 写入攒批大小
        .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) // Dataframe对应的DDL,仅spark3.x需要
        .mode(SaveMode.Append) // 仅spark3.x需要
        .save()
  3. 查询写入的数据。
    在Hologres侧查询目标表,即可确认写入的数据,示例如下图所示。测试示例数据

通过Spark读取数据源数据并写入Hologres

  1. Spark从数据源读取数据。
    Spark支持从不同数据源读取数据,具体数据源分类如下。
    • Spark支持以Hologres为数据源。
      Hologres兼容PostgreSQL,因为Spark可以用读取PostgreSQL的方式读取Hologres中的数据。读取代码如下。
      说明 在使用JDBC方式进行读取前,请前往官网下载Postgresql JDBC Jar,本文以postgresql-42.2.18版本为例,在spark-shell启动时执行./bin/spark-shell --jars /path/to/postgresql-42.2.18.jar加载该jar,可以与hologres-connector的jar包一同加载。
      // Read from some table, for example: tb008
      val readDf = spark.read
        .format("jdbc") //使用postgresql jdbc driver读取holo
        .option("driver","org.postgresql.Driver")
        .option("url", "jdbc:postgresql://Ip:Por/test_database")
        .option("dbtable", "tb008")
        .option("user", "your_username")
        .option("password", "your_password")
        .load()
    • Spark支持其他数据源(如Parquet格式的文件)。
      Spark支持从其他数据源中读取数据写入Hologres中,例如使用Spark从Hive中读取数据,代码如下。
      import org.apache.spark.{SparkConf, SparkContext}
      import org.apache.spark.sql.hive.HiveContext
      
      val sparkConf = new SparkConf()
      val sc = new SparkContext(sparkConf)
      val hiveContext = new HiveContext(sc)
      
      // Read from some table, for example: phone
      val readDf = hiveContext.sql("select * from hive_database.phone")
  2. Spark将读到的数据写入Hologres。
    import com.alibaba.hologres.spark2.sink.SourceProvider
    
    -- Write to hologres table
    df.write
      .format("hologres")
      .option(SourceProvider.USERNAME, "your_username")
      .option(SourceProvider.PASSWORD, "your_password")
      .option(SourceProvider.ENDPOINT, "Ip:Port")
      .option(SourceProvider.DATABASE, "test_database")
      .option(SourceProvider.TABLE, table)
      .option(SourceProvider.WRITE_BATCH_SIZE, 512) -- 写入攒批大小
      .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) -- 仅spark3.x需要
      .mode(SaveMode.Append) // 仅spark3.x需要
      .save()

通过Spark实时写入数据至Hologres

  1. 在Hologres创建一张表,用于接收数据,创建代码如下。
    CREATE TABLE test_table_stream
    (
        value text,
        count bigint
    );
  2. 读取本地端口输入行,进行词频统计并实时写入Hologres中,相关示例代码如下。
     val spark = SparkSession
          .builder
          .appName("StreamToHologres")
          .master("local[*]")
          .getOrCreate()
    
        spark.sparkContext.setLogLevel("WARN")
        import spark.implicits._
    
        val lines = spark.readStream
          .format("socket")
          .option("host", "localhost")
          .option("port", 9999)
          .load()
    
        -- Split the lines into words
        val words = lines.as[String].flatMap(_.split(" "))
    
        -- Generate running word count
        val wordCounts = words.groupBy("value").count()
    
        wordCounts.writeStream
            .outputMode(OutputMode.Complete())
            .format("hologres")
            .option(SourceProvider.USERNAME, "your_username")
            .option(SourceProvider.PASSWORD, "your_password")
            .option(SourceProvider.JDBCURL, "jdbc:postgresql://Ip:Port/dbname")
            .option(SourceProvider.TABLE, "test_table_stream")
            .option("batchsize", 1)
            .option("isolationLevel", "NONE")
            .option("checkpointLocation", checkpointLocation)
            .start()
            .awaitTermination()

数据类型映射

Spark与Hologres的数据类型映射如下表所示。

Spark类型Hologres类型
IntegerTypeINT
LongTypeBIGINT
StringTypeTEXT
DecimalTypeNUMERIC(38, 18)
BooleanTypeBOOL
DoubleTypeDOUBLE PRECISION
FloatTypeFLOAT
TimestampTypeTIMESTAMPTZ
DateTypeDATE
BinaryTypeBYTEA
ArrayType(IntegerType)int4[]
ArrayType(LongType)int8[]
ArrayType(FloatTypefloat4[]
ArrayType(DoubleType)float8[]
ArrayType(BooleanType)boolean[]
ArrayType(StringType)text[]