This topic describes how to use the Spark connector provided by Hologres to read data from Apache Spark and write the data to Hologres.

Background information

Apache Spark is a unified analytics engine that processes large amounts of data. Hologres is integrated with both the Apache Spark community and Apache Spark on Amazon EMR to help you build data warehouses. Hologres provides the Spark connector to allow you to batch write data from Apache Spark to Hologres. You can use the connector to read data from multiple types of sources, such as files, Hive tables, MySQL tables, and PostgreSQL tables.

Hologres is compatible with PostgreSQL. You can use the Spark connector to read Hologres data based on the PostgreSQL Java Database Connectivity (JDBC) driver. Then, you can extract, transform, and load (ETL) the data and write it to Hologres or other destinations.

Prerequisites

  • The version of your Hologres instance is V0.9 or later. You can view the version of your Hologres instance on the instance details page in the Hologres console. If the version of your Hologres instance is earlier than V0.9, submit a ticket to update the instance.
  • Apache Spark of a specific version that supports the spark-shell command is installed.

Use the Spark connector to write data to Hologres (recommended)

We recommend that you use the built-in Spark connector of Hologres to write data to Hologres. The Spark connector works together with Holo Client. Compared with other methods of writing data, the Spark connector provides better write performance. To use the Spark connector to write data, perform the following steps. For information about the sample code, see Example of using the Spark connector to write data to Hologres.

  1. Obtain a JAR package.
    The Spark connector is supported by Apache Spark 2 and Apache Spark 3. When you use the Spark connector to write data, you must reference a JAR package of the Spart connector. The JAR package is already published in the Maven Central Repository. You can refer to the following pom document for configurations.
    Note The Spark connector is open sourced on the alibabacloud-hologres-connectors page on GitHub.
    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>hologres-connector-spark-2.x</artifactId>
        <version>1.0.1</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>
    You can click the following links to download the packages provided by Hologres:
  2. Use the JAR package.
    Run the following command to start the Spark connector:
    spark-shell --jars hologres-connector-spark-2.x-1.0-SNAPSHOT-jar-with-dependencies.jar
  3. Configure the Apache Spark service.
    Run the following commands in Apache Spark to connect to Hologres and write data to the destination table:
    • df.write
        .format("hologres")
        .option(SourceProvider.USERNAME, "AccessKey ID") // The AccessKey ID of your Alibaba Cloud account. 
        .option(SourceProvider.PASSWORD, "Accesskey SECRET") // The AccessKey secret of your Alibaba Cloud account. 
        .option(SourceProvider.ENDPOINT, "Ip:Port") // The IP address and port of the Hologres instance. 
        .option(SourceProvider.DATABASE, "Database")
        .option(SourceProvider.TABLE, "Table")
        .save()
      Note The following two settings are alternative:
      • Setting 1:
        .option(SourceProvider.endpoint, "Ip:Port")
        .option(SourceProvider.database, "Database")
      • Setting 2:
        .option(SourceProvider.jdbcUrl, "jdbc:postgresql://Ip:Port/Database")
    • Parameters
      Parameter Default value Required Description
      USERNAME N/A Yes The AccessKey ID of your Alibaba Cloud account. You can obtain the AccessKey ID from the Security Management page.
      PASSWORD N/A Yes The AccessKey secret of your Alibaba Cloud account. You can obtain the AccessKey secret from the Security Management page.
      TABLE N/A Yes The name of the Hologres table to which to write data.
      ENDPOINT N/A Set the JDBCURL parameter or this parameter. The endpoint of the Hologres instance.

      You can view the IP address and port number of the Hologres instance on the Configurations tab of the instance details page in the Hologres console.

      DATABASE N/A Set the JDBCURL parameter or this parameter. The name of the Hologres database where the destination table resides.
      JDBCURL N/A Set the ENDPOINT and DATABASE parameters or only this parameter. The JDBC URL of the Hologres instance.
      INPUT_DATA_SCHEMA_DDL N/A This parameter is required for Spark 3.x. Set the value to <your_DataFrame>.schema.toDDL. The DDL statement for the DataFrame in Spark.
      WRITE_MODE INSERT_OR_REPLACE No The policy that is used to handle primary key conflicts. This parameter is available if the destination table contains a primary key. Valid values:
      • INSERT_OR_IGNORE: discards the data to be written if a primary key conflict occurs.
      • INSERT_OR_UPDATE: updates the corresponding columns of the destination table if a primary key conflict occurs.
      • INSERT_OR_REPLACE: updates all the columns of the destination table if a primary key conflict occurs.
      WRITE_BUFFER_SIZE 512 No The maximum number of requests allowed in a batch in a thread to write data. If the total number of the PUT requests reaches the WRITE_BUFFER_SIZE value after conflicts are handled based on the WRITE_MODE parameter, the data is submitted in a batch.
      WRITE_BATCH_BYTE_SIZE 2 MB No The maximum number of bytes allowed in a batch in a thread to write data. If the total number of bytes of the PUT requests reaches the WRITE_BATCH_BYTE_SIZE value after conflicts are handled based on the WRITE_MODE parameter, the data is submitted in a batch.
      WRITE_MAX_INTERVAL_MS 10000 ms No The intervals at which data is submitted in a batch.
      WRITE_FAIL_STRATEGY TYR_ONE_BY_ONE No The policy that is used to handle submission failures. If a batch cannot be submitted, Holo Client submits the data entries in the batch in the specified sequence. If a single data entry cannot be submitted, Holo Client returns an error message that contains the information of the data entry.
      WRITE_THREAD_SIZE 1 No The number of concurrent threads to write data. Each thread occupies one connection.

      The total number of connections occupied by a Spark job depends on the Spark parallelism. The total number of connections can be calculated by using the following formula: Total number of connections = spark.default.parallelism × Value of the WRITE_THREAD_SIZE parameter.

      DYNAMIC_PARTITION false No Specifies whether to automatically create a partition if the data is written to a parent table that has no partitions. Valid values: true and false.
      RETRY_COUNT 3 No The maximum number of retries allowed to read and write data if a connection failure occurs.
      RETRY_SLEEP_INIT_MS 1000 ms No The intervals at which retries are performed. The amount of time consumed by the retries for a request is calculated by using the following formula: Value of the RETRY_SLEEP_INIT_MS parameter + Value of the RETRY_COUNT parameter × Value of the RETRY_SLEEP_STEP_MS parameter.
      RETRY_SLEEP_STEP_MS 10*1000 ms No The amount of time required for a retry. The amount of time consumed by the retries for a request is calculated in the following way: Value of the RETRY_SLEEP_INIT_MS parameter + Value of the RETRY_COUNT parameter × Value of the RETRY_SLEEP_STEP_MS parameter.
      CONNECTION_MAX_IDLE_MS 60000 ms No The idle timeout period that applies to the connections used to read and write data. If a connection does not send or receive data by the time the idle timeout period ends, Holo Client automatically releases the connection.

Example of using the Spark connector to write data to Hologres

The following example shows how to use the Spark connector to write data to Hologres.

  1. Create a table in Hologres.
    Execute the following SQL statement in Hologres to create a Hologres table to which to write data:
    CREATE TABLE tb008 (
      id BIGINT primary key,
      counts INT,
      name TEXT,
      price NUMERIC(38, 18),
      out_of_stock BOOL,
      weight DOUBLE PRECISION,
      thick FLOAT,
      time TIMESTAMPTZ,
      dt DATE, 
      by bytea,
      inta int4[],
      longa int8[],
      floata float4[],
      doublea float8[],
      boola boolean[],
      stringa text[]
    );
  2. Use the Spark connector to prepare data and write the data to Hologres.
    1. Run the following command to enable the Spark connector:
      spark-shell --jars hologres-connector-spark-2.x-1.0-SNAPSHOT-jar-with-dependencies.jar
    2. Run the load spark-test.scala command in the Spark shell to write the sample data to Hologres.
      The spark-test.scala file contains the following sample code:
      import java.sql.{Timestamp, Date}
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.Row
      import com.alibaba.hologres.spark2.sink.SourceProvider
      
      val byteArray = Array(1.toByte, 2.toByte, 3.toByte, 'b'.toByte, 'a'.toByte)
      val intArray = Array(1, 2, 3)
      val longArray = Array(1L, 2L, 3L)
      val floatArray = Array(1.2F, 2.44F, 3.77F)
      val doubleArray = Array(1.222, 2.333, 3.444)
      val booleanArray = Array(true, false, false)
      val stringArray = Array("abcd", "bcde", "defg")
      
      val data = Seq(
        Row(-7L, 100, "phone1", BigDecimal(1234.567891234), false, 199.35, 6.7F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("2021-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(6L, -10, "phone2", BigDecimal(1234.56), true, 188.45, 7.8F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("1970-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(1L, 10, "phone3\"", BigDecimal(1234.56), true, 111.45, null, Timestamp.valueOf("2020-02-29 00:12:33"), Date.valueOf("2020-07-23"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray)
      )
      
      
      val schema = StructType(Array(
        StructField("id", LongType),
        StructField("counts", IntegerType),
        StructField("name", StringType, false), -- The false value indicates that the value of this field cannot be null in the table.
        StructField("price", DecimalType(38, 12)),
        StructField("out_of_stock", BooleanType),
        StructField("weight", DoubleType),
        StructField("thick", FloatType),
        StructField("time", TimestampType),
        StructField("dt", DateType),
        StructField("by", BinaryType),
        StructField("inta", ArrayType(IntegerType)),
        StructField("longa", ArrayType(LongType)),
        StructField("floata", ArrayType(FloatType)),
        StructField("doublea", ArrayType(DoubleType)),
        StructField("boola", ArrayType(BooleanType)),
        StructField("stringa", ArrayType(StringType))
      ))
      
      
      val df = spark.createDataFrame(
        spark.sparkContext.parallelize(data),
        schema
      )
      df.show()
      
      -- Set the following parameters to write the sample data to Hologres. 
      df.write.format("hologres") -- Set the value to hologres.
        .option(SourceProvider.USERNAME, "your_username") -- The AccessKey ID of your Alibaba Cloud account. 
        .option(SourceProvider.PASSWORD, "your_password") -- The AccessKey secret of your Alibaba Cloud account. 
        .option(SourceProvider.ENDPOINT, "Ip:Port") -- The IP address and port of the Hologres instance. 
        .option(SourceProvider.DATABASE, "test_database") -- The name of the Hologres database. In this example, the database name is test_database. 
        .option(SourceProvider.TABLE, "tb008") -- The name of the Hologres table to which to write data. In this example, the table name is tb008. 
        .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) -- The DDL statement of the DataFrame. This parameter is required only for Spark 3.x.
        .mode(SaveMode.Append) -- The save mode. This parameter is required only for Spark 3.x.
        .save()
  3. Query the written data.
    You can query the destination Hologres table in the Hologres console to check the written data, as shown in the following figure. Sample data

Use the Spark connector to read data and write the data to Hologres

  1. Read data from the data source.
    You can use the Spark connector to read data from different types of sources. The following two examples show how to read data from Hologres or another type of source:
    • Read data from Hologres
      Hologres is compatible with PostgreSQL. You can use the Spark connector to read Hologres data based on the PostgreSQL JDBC driver. The following example provides the sample code.
      Note Before you read Hologres data, you must download the PostgreSQL JDBC JAR package and run the ./bin/spark-shell --jars /path/to/postgresql-42.2.18.jar command in the Spark shell to load the PostgreSQL JDBC JAR package. You can also load the PostgreSQL JDBC JAR package together with the JAR package of the Spark connector.
      // Read from some table, for example: tb008
      val readDf = spark.read
        .format("jdbc") // Read Hologres data based on the PostgreSQL JDBC driver.
        .option("driver","org.postgresql.Driver")
        .option("url", "jdbc:postgresql://Ip:Por/test_database")
        .option("dbtable", "tb008")
        .option("user", "your_username")
        .option("password", "your_password")
        .load()
    • Read data from another type of source such as a Parquet file
      You can use the Spark connector to read data from other types of sources, such as a Parquet file or a Hive table. The following example provides the sample code:
      import org.apache.spark.{SparkConf, SparkContext}
      import org.apache.spark.sql.hive.HiveContext
      
      val sparkConf = new SparkConf()
      val sc = new SparkContext(sparkConf)
      val hiveContext = new HiveContext(sc)
      
      // Read from some table, for example: phone
      val readDf = hiveContext.sql("select * from hive_database.phone")
  2. Write the data to Hologres.
    import com.alibaba.hologres.spark2.sink.SourceProvider
    
    -- You can use the sample code. Alternatively, you can create a table.
    val table = createTableSql(readDf.schema, "tb009")
    
    val df = spark.createDataFrame(
      readDf.rdd,
      readDf.schema
    )
    
    -- Write to hologres table, for example: tb009
    df.write
      .format("hologres")
      .option(SourceProvider.USERNAME, "your_username")
      .option(SourceProvider.PASSWORD, "your_password")
      .option(SourceProvider.ENDPOINT, "Ip:Port")
      .option(SourceProvider.DATABASE, "test_database")
      .option(SourceProvider.TABLE, table)
      .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) -- The DDL statement of the DataFrame. This parameter is required only for Spark 3.x.
      .mode(SaveMode.Append) -- The save mode. This parameter is required only for Spark 3.x.
      .save()

Use the Spark connector to write data to Hologres in real time

  1. Execute the following statement to create a Hologres table to which to write data:
    CREATE TABLE test_table_stream
    (
        value text,
        count bigint
    );
  2. Read data from your on-premises machine. Collect word frequency statistics and write them to Hologres. The following sample code provides an example:
     val spark = SparkSession
          .builder
          .appName("StreamToHologres")
          .master("local[*]")
          .getOrCreate()
    
        spark.sparkContext.setLogLevel("WARN")
        import spark.implicits._
    
        val lines = spark.readStream
          .format("socket")
          .option("host", "localhost")
          .option("port", 9999)
          .load()
    
        -- Split the lines into words
        val words = lines.as[String].flatMap(_.split(" "))
    
        -- Generate running word count
        val wordCounts = words.groupBy("value").count()
    
        wordCounts.writeStream
            .outputMode(OutputMode.Complete())
            .format("hologres")
            .option(SourceProvider.USERNAME, "your_username")
            .option(SourceProvider.PASSWORD, "your_password")
            .option(SourceProvider.JDBCURL, "jdbc:postgresql://Ip:Port/dbname")
            .option(SourceProvider.TABLE, "test_table_stream")
            .option("batchsize", 1)
            .option("isolationLevel", "NONE")
            .option("checkpointLocation", checkpointLocation)
            .start()
            .awaitTermination()

Data type mapping

The following table describes the data type mappings between Spark and Hologres.

Spark data type Hologres data type
IntegerType INT
LongType BIGINT
StringType TEXT
DecimalType NUMERIC(38, 18)
BooleanType BOOL
DoubleType DOUBLE PRECISION
FloatType FLOAT
TimestampType TIMESTAMPTZ
DateType DATE
BinaryType BYTEA
ArrayType(IntegerType) int4[]
ArrayType(LongType) int8[]
ArrayType(FloatType float4[]
ArrayType(DoubleType) float8[]
ArrayType(BooleanType) boolean[]
ArrayType(StringType) text[]