本文为您介绍如何通过Spark读取或写入数据至Hologres的操作方法。

背景信息

Spark是用于大规模数据处理的统一分析引擎,Hologres已经与Saprk(社区版以及EMR Spark版)高效打通,快速助力企业搭建数据仓库。Hologres提供的Spark Connector,支持Spark以批处理的方式将数据写入Hologres,同时Spark支持读取多种数据源(例如文件、Hive、MySQPostgre等)。

Hologres兼容PostgreSQL,因此Spark也可以用读取PostgreSQL的方式直接读取Hologres数据,进行ETL处理,再写入Hologres及其他数据源,完成大数据开发抽取、处理、加载的完整闭环。

前提条件

  • 实例版本需为V0.9及以上版本。请在Hologres管控台的实例详情页查看当前实例版本,如实例是V0.9以下版本,请您提交工单由技术人员协助您升级实例。
  • 需要安装对应版本的Spark环境,能够运行spark-shell命令。

通过Spark Connector写入(推荐使用)

Hologres当前支持使用内置的Spark Connector将Spark数据写入Hologres,相比其他写入方式,调用基于Holo Client实现Connector写入的方式性能更优。具体操作步骤如下,阿里云也为您提供了相关的使用示例,详情请参见通过Spark Connector写入使用示例

  1. 获取JAR包。
    Spark2和Spark3上均已支持Connector写入,Spark写入Hologres时需要引用JAR包,当前Hologres已自动生成JAR文件,下载链接如下。
    说明 您可在GitHub中查看JAR文件的源代码,GitHub地址请参见hologres-connectors
  2. 使用JAR包。
    执行如下命令,启动Spark。
    spark-shell --jars hologres-connector-spark-2.x-1.0-SNAPSHOT-jar-with-dependencies.jar
  3. 配置Spark。
    在Spark中执行如下命令,以连接Hologres将数据加载至目标表中。
    • 代码:
      df.write
        .format("hologres")
        .option(SourceProvider.USERNAME, "AccessKey ID") //阿里云账号的AccessKey ID。
        .option(SourceProvider.PASSWORD, "Accesskey SECRET") //阿里云账号的Accesskey SECRET。
        .option(SourceProvider.ENDPOINT, "Ip:Port") //Hologres的Ip和Port。
        .option(SourceProvider.DATABASE, "Database")
        .option(SourceProvider.TABLE, "Table")
        .save()
      说明
      • batch.size是为了攒批写入提高效率,默认是1。即默认不赞批,对于批处理请设置合适的值。
      • 其中如下代码可进行替换。
        • 源代码:
          .option(SourceProvider.endpoint, "Ip:Port")
          .option(SourceProvider.database, "Database")
        • 替换代码:
          .option(SourceProvider.jdbcUrl, "jdbc:postgresql://Ip:Port/Database")
    • 参数释义:
      参数名 默认值 是否必填 参数描述
      USERNAME 登录Hologres账号的AccessKey ID。您可以单击AccessKey 管理
      PASSWORD 登录Hologres账号的AccessKey Secret。您可以单击AccessKey 管理
      TABLE Hologres用于接收数据的表名称。
      ENDPOINT JDBCURL二选一 Hologres实例的网络域名。

      您可以进入Hologres管理控制台实例详情页,从实例配置获取主机和端口号。

      DATABASE JDBCURL二选一 Hologres接收数据的表所在数据库名称。
      JDBCURL ENDPOINT+DATABASE组合设置二选一 Hologres的JDBCURL
      WRITE_MODE INSERT_OR_REPLACE 当INSERT目标表为有主键的表时采用不同策略。
      • INSERT_OR_IGNORE:当主键冲突时,不写入。
      • INSERT_OR_UPDATE:当主键冲突时,更新相应列。
      • INSERT_OR_REPLACE:当主键冲突时,更新所有列。
      WRITE_BUFFER_SIZE 512 每个写入线程的最大批次大小,在经过WRITE_MODE合并后的Put数量达到writeBatchSize时进行一次批量提交。
      WRITE_BATCH_BYTE_SIZE 2 MB 每个写入线程的最大批次Bytes大小,在经过WRITE_MODE合并后的Put数据字节数达到WRITE_BATCH_BYTE_SIZE时进行一次批量提交。
      WRITE_MAX_INTERVAL_MS 10000 ms 距离上次提交超过WRITE_MAX_INTERVAL_MS会触发一次批量提交。
      WRITE_FAIL_STRATEGY TYR_ONE_BY_ONE 当某一批次提交失败时,会将批次内的记录逐条提交(保序),单条提交失败的记录将会跟随异常被抛出。
      WRITE_THREAD_SIZE 1 写入并发线程数(每个并发占用1个数据库连接)。
      RETRY_COUNT 3 当连接故障时,写入和查询的重试次数。
      RETRY_SLEEP_INIT_MS 1000 ms 每次重试的等待时间=RETRY_SLEEP_INIT_MS+RETRY*RETRY_SLEEP_STEP_MS
      RETRY_SLEEP_STEP_MS 10*1000 ms 每次重试的等待时间=RETRY_SLEEP_INIT_MS+RETRY*RETRY_SLEEP_STEP_MS
      CONNECTION_MAX_IDLE_MS 60000 ms 写入线程和点查线程数据库连接的最大IDLE时间,超过连接将被释放。

通过Spark Connector写入使用示例

根据如下示例步骤为您介绍,如何通过Spark Connector将数据写入Hologres。

  1. 创建Hologres表。
    在Hologres中执行如下SQL命令创建目标表,用来接受数据。
    CREATE TABLE tb008 (
      id BIGINT primary key,
      counts INT,
      name TEXT,
      price NUMERIC(38, 18),
      out_of_stock BOOL,
      weight DOUBLE PRECISION,
      thick FLOAT,
      time TIMESTAMPTZ,
      dt DATE, 
      by bytea,
      inta int4[],
      longa int8[],
      floata float4[],
      doublea float8[],
      boola boolean[],
      stringa text[]
    );
  2. Spark准备数据并写入Hologres。
    1. 在命令行运行命令开启spark。
      spark-shell --jars hologres-connector-spark-2.x-1.0-SNAPSHOT-jar-with-dependencies.jar
    2. spark-shell里使用命令load spark-test.scala执行测试文件,加载测试示例。
      spark-test.scala文件示例如下。
      import java.sql.{Timestamp, Date}
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.Row
      import com.alibaba.hologres.spark2.sink.SourceProvider
      
      val byteArray = Array(1.toByte, 2.toByte, 3.toByte, 'b'.toByte, 'a'.toByte)
      val intArray = Array(1, 2, 3)
      val longArray = Array(1L, 2L, 3L)
      val floatArray = Array(1.2F, 2.44F, 3.77F)
      val doubleArray = Array(1.222, 2.333, 3.444)
      val booleanArray = Array(true, false, false)
      val stringArray = Array("abcd", "bcde", "defg")
      
      val data = Seq(
        Row(-7L, 100, "phone1", BigDecimal(1234.567891234), false, 199.35, 6.7F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("2021-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(6L, -10, "phone2", BigDecimal(1234.56), true, 188.45, 7.8F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("1970-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(1L, 10, "phone3\"", BigDecimal(1234.56), true, 111.45, null, Timestamp.valueOf("2020-02-29 00:12:33"), Date.valueOf("2020-07-23"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray)
      )
      
      
      val schema = StructType(Array(
        StructField("id", LongType),
        StructField("counts", IntegerType),
        StructField("name", StringType, false), //false表示此Field不允许为null
        StructField("price", DecimalType(38, 12)),
        StructField("out_of_stock", BooleanType),
        StructField("weight", DoubleType),
        StructField("thick", FloatType),
        StructField("time", TimestampType),
        StructField("dt", DateType),
        StructField("by", BinaryType),
        StructField("inta", ArrayType(IntegerType)),
        StructField("longa", ArrayType(LongType)),
        StructField("floata", ArrayType(FloatType)),
        StructField("doublea", ArrayType(DoubleType)),
        StructField("boola", ArrayType(BooleanType)),
        StructField("stringa", ArrayType(StringType))
      ))
      
      
      val df = spark.createDataFrame(
        spark.sparkContext.parallelize(data),
        schema
      )
      df.show()
      
      //配置导入数据至Hologres的信息。
      df.write.format("hologres") //必须配置为hologres
        .option(SourceProvider.USERNAME, "your_username") //阿里云账号的AccessKey ID。
        .option(SourceProvider.PASSWORD, "your_password") //阿里云账号的Accesskey SECRET。
        .option(SourceProvider.ENDPOINT, "Ip:Port") //Hologres的Ip和Port。
        .option(SourceProvider.DATABASE, "test_database") //Hologres的数据库名称,示例为test_database。
        .option(SourceProvider.TABLE, "tb008") //Hologres用于接收数据的表名称,示例为tb008。
        .save()
  3. 查询写入的数据。
    在Hologres侧查询目标表,即可确认写入的数据,示例如下图所示。测试示例数据

通过Spark读取数据源数据并写入Hologres

  1. Spark从数据源读取数据。
    Spark支持从不同数据源读取数据,具体数据源分类如下。
    • Spark支持以Hologres为数据源。
      Hologres兼容PostgreSQL,因为Spark可以用读取PostgreSQL的方式读取Hologres中的数据。读取代码如下。
      // Read from some table, for example: tb008
      val readDf = spark.read
        .format("jdbc") //
        .option("driver","org.postgresql.Driver")
        .option("url", "jdbc:postgresql://Ip:Por/test_database")
        .option("dbtable", "tb008")
        .option("user", "your_username")
        .option("password", "your_password")
        .load()
    • Spark支持其他数据源(如Parquet格式的文件)。
      Spark支持从其他数据源中读取数据写入Hologres中,例如使用Spark从Hive中读取数据,代码如下。
      import org.apache.spark.{SparkConf, SparkContext}
      import org.apache.spark.sql.hive.HiveContext
      
      val sparkConf = new SparkConf()
      val sc = new SparkContext(sparkConf)
      val hiveContext = new HiveContext(sc)
      
      // Read from some table, for example: phone
      val readDf = hiveContext.sql("select * from hive_database.phone")
  2. Spark将读到的数据写入Hologres。
    import com.alibaba.hologres.spark2.sink.SourceProvider
    
    // 函数实现见测试用例,也可以手动创建数据表
    val table = createTableSql(readDf.schema, "tb009")
    
    val df = spark.createDataFrame(
      readDf.rdd,
      readDf.schema
    )
    
    // Write to hologres table, for example: tb009
    df.write
      .format("hologres")
      .option(SourceProvider.USERNAME, "your_username")
      .option(SourceProvider.PASSWORD, "your_password")
      .option(SourceProvider.ENDPOINT, "Ip:Port")
      .option(SourceProvider.DATABASE, "test_database")
      .option(SourceProvider.TABLE, table)
      .save()

通过Spark实时写入数据至Hologres

  1. 在Hologres创建一张表,用于接收数据,创建代码如下。
    CREATE TABLE test_table_stream
    (
        value text,
        count bigint
    );
  2. 读取本地端口输入行,进行词频统计并实时写入Hologres中,相关示例代码如下。
     val spark = SparkSession
          .builder
          .appName("StreamToHologres")
          .master("local[*]")
          .getOrCreate()
    
        spark.sparkContext.setLogLevel("WARN")
        import spark.implicits._
    
        val lines = spark.readStream
          .format("socket")
          .option("host", "localhost")
          .option("port", 9999)
          .load()
    
        // Split the lines into words
        val words = lines.as[String].flatMap(_.split(" "))
    
        // Generate running word count
        val wordCounts = words.groupBy("value").count()
    
        wordCounts.writeStream
            .outputMode(OutputMode.Complete())
            .format("hologres")
            .option(SourceProvider.USERNAME, "your_username")
            .option(SourceProvider.PASSWORD, "your_password")
            .option(SourceProvider.JDBCURL, "jdbc:postgresql://Ip:Port/dbname")
            .option(SourceProvider.TABLE, "test_table_stream")
            .option("batchsize", 1)
            .option("isolationLevel", "NONE")
            .option("checkpointLocation", checkpointLocation)
            .start()
            .awaitTermination()