All Products
Search
Document Center

Hologres:Use Spark to write data to Hologres

Last Updated:Feb 27, 2024

This topic describes how to use Spark to read data from multiple types of sources and write the data to Hologres.

Background information

Spark is an analytics engine that processes a large amount of data in a centralized manner. Hologres is integrated with both Apache Spark and E-MapReduce (EMR) Spark to help you build data warehouses in an efficient manner. Hologres provides the Spark connector that you can use to write data to Hologres in batch mode. You can use Spark to read data from multiple types of sources, such as files, Hive tables, MySQL tables, and PostgreSQL tables.

Hologres is compatible with PostgreSQL. You can use Spark to read Hologres data based on the PostgreSQL Java Database Connectivity (JDBC) driver. Then, you can extract, transform, and load (ETL) the data and write the processed data back to Hologres or to other destinations.

Prerequisites

  • The version of your Hologres instance is V0.9 or later. You can view the version of your Hologres instance on the instance details page in the Hologres console. If the version of your Hologres instance is earlier than V0.9, manually upgrade your Hologres instance in the Hologres console or join the Hologres DingTalk group for technical support. For more information about how to manually upgrade your Hologres instance in the Hologres console, see Instance upgrades. For more information about how to obtain technical support, see Obtain online support for Hologres.

  • A Spark environment that is of a version supported by Hologres is installed. This way, you can run the spark-shell command in the Spark environment.

(Recommended) Use the Spark connector to write data to Hologres

We recommend that you use the built-in Spark connector of Hologres to write data to Hologres. The Spark connector is used together with Holo Client. Compared with other methods of writing data, the Spark connector provides better write performance. To use the Spark connector to write data, perform the following steps. For more information about the sample code, see the Example of using the Spark connector to write data to Hologres section in this topic.

Preparations

  1. Obtain a JAR package.

    The Spark connector is available for Spark 2 and Spark 3. When you use the Spark connector to write data to Hologres, you must reference a JAR package of the Spark connector. The JAR package is already published in the Maven central repository. You can refer to the following pom.xml file for configuration.

    Note

    Relevant connectors are also open-source. For more information, visit the alibabacloud-hologres-connectors page.

    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>hologres-connector-spark-2.x</artifactId>
        <version>1.0.1</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>

    You can click the following links to download the JAR packages that are provided by Hologres:

  2. Use the JAR package.

    Run the following command to start Spark and load the connector:

    spark-shell --jars hologres-connector-spark-2.x-1.0-SNAPSHOT-jar-with-dependencies.jar

    You can also run the following command to start PySpark and load the connector:

    pyspark --jars hologres-connector-spark-2.x-1.0-SNAPSHOT-jar-with-dependencies.jar

Example of using the Spark connector to write data to Hologres

The following example shows how to use the Spark connector to write data to Hologres.

  1. Create a table in Hologres.

    Execute the following SQL statement in Hologres to create a table to which you want to write data:

    CREATE TABLE tb008 (
      id BIGINT primary key,
      counts INT,
      name TEXT,
      price NUMERIC(38, 18),
      out_of_stock BOOL,
      weight DOUBLE PRECISION,
      thick FLOAT,
      time TIMESTAMPTZ,
      dt DATE, 
      by bytea,
      inta int4[],
      longa int8[],
      floata float4[],
      doublea float8[],
      boola boolean[],
      stringa text[]
    );
  2. Prepare data in Spark and write the data to Hologres.

    1. Run the following command in the CLI to enable the Spark connector:

      spark-shell --jars hologres-connector-spark-2.x-1.0-SNAPSHOT-jar-with-dependencies.jar
    2. Run the load spark-test.scala command in spark-shell to load sample data.

      The spark-test.scala file contains the following data:

      import java.sql.{Timestamp, Date}
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.Row
      import com.alibaba.hologres.spark2.sink.SourceProvider
      
      val byteArray = Array(1.toByte, 2.toByte, 3.toByte, 'b'.toByte, 'a'.toByte)
      val intArray = Array(1, 2, 3)
      val longArray = Array(1L, 2L, 3L)
      val floatArray = Array(1.2F, 2.44F, 3.77F)
      val doubleArray = Array(1.222, 2.333, 3.444)
      val booleanArray = Array(true, false, false)
      val stringArray = Array("abcd", "bcde", "defg")
      
      val data = Seq(
        Row(-7L, 100, "phone1", BigDecimal(1234.567891234), false, 199.35, 6.7F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("2021-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(6L, -10, "phone2", BigDecimal(1234.56), true, 188.45, 7.8F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("1970-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(1L, 10, "phone3\"", BigDecimal(1234.56), true, 111.45, null, Timestamp.valueOf("2020-02-29 00:12:33"), Date.valueOf("2020-07-23"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray)
      )
      
      
      val schema = StructType(Array(
        StructField("id", LongType),
        StructField("counts", IntegerType),
        StructField("name", StringType, false), // The value false indicates that the value of this field cannot be null in the table.
        StructField("price", DecimalType(38, 12)),
        StructField("out_of_stock", BooleanType),
        StructField("weight", DoubleType),
        StructField("thick", FloatType),
        StructField("time", TimestampType),
        StructField("dt", DateType),
        StructField("by", BinaryType),
        StructField("inta", ArrayType(IntegerType)),
        StructField("longa", ArrayType(LongType)),
        StructField("floata", ArrayType(FloatType)),
        StructField("doublea", ArrayType(DoubleType)),
        StructField("boola", ArrayType(BooleanType)),
        StructField("stringa", ArrayType(StringType))
      ))
      
      
      val df = spark.createDataFrame(
        spark.sparkContext.parallelize(data),
        schema
      )
      df.show()
      
      // Configure the following parameters to write the sample data to Hologres. 
      df.write.format("hologres") // Set the value to hologres.
        .option(SourceProvider.USERNAME, "your_username") // The AccessKey ID of your Alibaba Cloud account. 
        .option(SourceProvider.PASSWORD, "your_password") // The AccessKey secret of your Alibaba Cloud account. 
        .option(SourceProvider.ENDPOINT, "Ip:Port") // The IP address and the port number of your Hologres instance. 
        .option(SourceProvider.DATABASE, "test_database") // The name of the Hologres database. The name is test_database in this example. 
        .option(SourceProvider.TABLE, "tb008") // The name of the Hologres table to which you want to write data. The table name is tb008 in this example. 
        .option(SourceProvider.WRITE_BATCH_SIZE, 512) // The maximum number of requests allowed in a batch.
        .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) // The data definition language (DDL) statement for DataFrame. This parameter is required only for Spark 3.X.
        .mode(SaveMode.Append) // The save mode. This parameter is required only for Spark 3.X.
        .save()
  3. Query data in the destination table.

    You can query data in the destination table in the Hologres console to check the written data. The following figure shows an example.测试示例数据

Example of using the PySpark connector to write data to Hologres

  1. Run the following command to start PySpark and load the PySpark connector:

    pyspark --jars hologres-connector-spark-3.x-1.3-SNAPSHOT-jar-with-dependencies.jar

  2. Use metadata to create a DataFrame object and call the connector to write data to Hologres. The operation is similar to the operation when you use the Spark connector.

    data = [[1, "Elia"], [2, "Teo"], [3, "Fang"]]
    df = spark.createDataFrame(data, schema="id LONG, name STRING")
    df.show()
    
    df2.write.format("hologres").option(
      "username", "your_username").option(
      "password", "your_password").option(
      "endpoint", "hologres_endpoint").option(
      "database", "test_database").option(
      "table", "tb008").save()
    

Use Spark to read data from a specific type of source and write the data to Hologres

  1. Read data from a specific type of source.

    You can use Spark to read data from different types of sources. The following examples show how to read data from Hologres or another type of source:

    • Read data from Hologres

      Hologres is compatible with PostgreSQL. You can use Spark to read Hologres data based on the PostgreSQL JDBC driver. The following sample code is for reference only.

      Note

      Before you read Hologres data, download the PostgreSQL JDBC JAR package on the official website. In this example, postgresql-42.2.18 is used. Then, run the ./bin/spark-shell --jars /path/to/postgresql-42.2.18.jar command in spark-shell to load the PostgreSQL JDBC JAR package. You can also load the PostgreSQL JDBC JAR package together with the JAR package of the Spark connector.

      // Read from some table, for example: tb008
      val readDf = spark.read
        .format("jdbc") // Read Hologres data based on the PostgreSQL JDBC driver.
        .option("driver","org.postgresql.Driver")
        .option("url", "jdbc:postgresql://Ip:Por/test_database")
        .option("dbtable", "tb008")
        .option("user", "your_username")
        .option("password", "your_password")
        .load()
    • Read data from another type of source such as a Parquet file

      You can use Spark to read data from other types of sources, such as a Parquet file or a Hive table. The following example provides the sample code:

      import org.apache.spark.{SparkConf, SparkContext}
      import org.apache.spark.sql.hive.HiveContext
      
      val sparkConf = new SparkConf()
      val sc = new SparkContext(sparkConf)
      val hiveContext = new HiveContext(sc)
      
      // Read from some table, for example: phone
      val readDf = hiveContext.sql("select * from hive_database.phone")
  2. Write the data to Hologres.

    import com.alibaba.hologres.spark2.sink.SourceProvider
    
    -- Write to hologres table
    df.write
      .format("hologres")
      .option(SourceProvider.USERNAME, "your_username")
      .option(SourceProvider.PASSWORD, "your_password")
      .option(SourceProvider.ENDPOINT, "Ip:Port")
      .option(SourceProvider.DATABASE, "test_database")
      .option(SourceProvider.TABLE, table)
      .option(SourceProvider.WRITE_BATCH_SIZE, 512) -- The maximum number of requests allowed in a batch.
      .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) -- The DDL statement for DataFrame. This parameter is required only for Spark 3.X.
      .mode(SaveMode.Append) // The save mode. This parameter is required only for Spark 3.X.
      .save()

Use Spark to write data to Hologres in real time

  1. Execute the following statement to create a table to which you want to write data in Hologres:

    CREATE TABLE test_table_stream
    (
        value text,
        count bigint
    );
  2. Read data from your on-premises machine. Collect word frequency statistics and write the statistics to Hologres in real time. The following example provides sample code:

    • Code

       val spark = SparkSession
            .builder
            .appName("StreamToHologres")
            .master("local[*]")
            .getOrCreate()
      
          spark.sparkContext.setLogLevel("WARN")
          import spark.implicits._
      
          val lines = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load()
      
          -- Split the lines into words
          val words = lines.as[String].flatMap(_.split(" "))
      
          -- Generate running word count
          val wordCounts = words.groupBy("value").count()
      
          wordCounts.writeStream
              .outputMode(OutputMode.Complete())
              .format("hologres")
              .option(SourceProvider.USERNAME, "your_username")
              .option(SourceProvider.PASSWORD, "your_password")
              .option(SourceProvider.JDBCURL, "jdbc:postgresql://Ip:Port/dbname")
              .option(SourceProvider.TABLE, "test_table_stream")
              .option("batchsize", 1)
              .option("isolationLevel", "NONE")
              .option("checkpointLocation", checkpointLocation)
              .start()
              .awaitTermination()
    • Parameters

      Parameter

      Default value

      Required

      Description

      USERNAME

      No default value

      Yes

      The AccessKey ID of your Alibaba Cloud account. You can obtain the AccessKey ID on the Security Management page.

      We recommend that you configure environment variables and obtain the AccessKey ID and AccessKey secret from the environment variables. This helps reduce the leak risk.

      PASSWORD

      No default value

      Yes

      The AccessKey secret of your Alibaba Cloud account. You can obtain the AccessKey secret on the Security Management page.

      We recommend that you configure environment variables and obtain the AccessKey ID and AccessKey secret from the environment variables. This helps reduce the leak risk.

      TABLE

      No default value

      Yes

      The name of the Hologres table to which you want to write data.

      ENDPOINT

      No default value

      Configure this parameter or the JDBCURL parameter.

      The endpoint of your Hologres instance.

      You can obtain the endpoint of your Hologres instance in the Network Information section of the instance details page in the Hologres console.

      DATABASE

      No default value

      Configure this parameter or the JDBCURL parameter.

      The name of the Hologres database in which the destination table resides.

      JDBCURL

      No default value

      Configure only this parameter or the ENDPOINT and DATABASE parameters.

      The JDBC URL of your Hologres instance.

      COPY_WRITE_MODE

      true

      No

      Specifies whether to write data in the fixed copy mode. Fixed copy is a new feature that is supported in Hologres V1.3. In the fixed copy mode, data is written in the streaming mode rather than in batches. Therefore, data writing in the fixed copy mode provides higher throughput and lower data latency, and consumes less client memory resources than data writing by using the INSERT statement.

      Note

      If you want to use the fixed copy mode, the connector version must be V1.3.0 or later, and the Hologres engine version must be r1.3.34 or later.

      COPY_WRITE_DIRTY_DATA_CHECK

      false

      No

      Specifies whether to check dirty data. This parameter takes effect only when COPY_WRITE_MODE is set to true. If you set this parameter to true and dirty data is generated, the system can locate the row to which data fails to be written.

      Note

      Checking of dirty data negatively affects the write performance. We recommend that you set this parameter to true only in the troubleshooting process.

      COPY_WRITE_DIRECT_CONNECT

      true for scenarios that allow direct connections

      No

      This parameter takes effect only when COPY_WRITE_MODE is set to true. The amount of data that can be written in the copy mode is determined based on the throughput of the VPC endpoint. The system checks whether the environment can directly connect to the Hologres FE node when data is written in the copy mode. Direct connections are used by default if the environment can directly connect to the Hologres FE node. If this parameter is set to false, direct connections are not used.

      INPUT_DATA_SCHEMA_DDL

      No default value

      This parameter is required for Spark 3.X. Set this parameter to <your_DataFrame>.schema.toDDL.

      The DDL statement for DataFrame in Spark.

      WRITE_MODE

      INSERT_OR_REPLACE

      No

      The policy that is used to handle primary key conflicts. This parameter is required if the destination table has a primary key. Valid values:

      • INSERT_OR_IGNORE: discards the data that you want to write if a primary key conflict occurs.

      • INSERT_OR_UPDATE: updates the relevant columns in the destination table if a primary key conflict occurs.

      • INSERT_OR_REPLACE: updates all columns in the destination table if a primary key conflict occurs.

      WRITE_BATCH_SIZE

      512

      No

      The maximum number of requests that are allowed in a batch in a thread to write data. If the total number of PUT requests reaches the upper limit that is specified by the WRITE_BATCH_SIZE parameter after conflicts are handled based on the WRITE_MODE parameter, the data is submitted in a batch.

      WRITE_BATCH_BYTE_SIZE

      2 MB

      No

      The maximum number of bytes that are allowed in a batch in a thread to write data. If the total number of bytes of the PUT requests reaches the upper limit that is specified by the WRITE_BATCH_BYTE_SIZE parameter after conflicts are handled based on the WRITE_MODE parameter, the data is submitted in a batch.

      WRITE_MAX_INTERVAL_MS

      10000 ms

      No

      The interval at which data is submitted in a batch.

      WRITE_FAIL_STRATEGY

      TYR_ONE_BY_ONE

      No

      The policy that is used to handle submission failures. If a batch cannot be submitted, Holo Client submits the data entries one at a time in the batch in the specified sequence. If a data entry cannot be submitted, Holo Client returns an error message that contains the information about the data entry.

      WRITE_THREAD_SIZE

      1

      No

      The number of parallel threads that are used to write data. Each thread occupies one connection.

      The total number of connections occupied by a Spark job varies based on the Spark parallelism. The total number of connections can be calculated by using the following formula: Total number of connections = Value of spark.default.parallelism × Value of WRITE_THREAD_SIZE.

      DYNAMIC_PARTITION

      false

      No

      Specifies whether to automatically create a partition if the data is written to a parent table that has no partitions. Valid values: true and false. true: A partition is automatically created if the data is written to a parent table that has no partitions.

      RETRY_COUNT

      3

      No

      The maximum number of retries allowed to write and query data if a connection failure occurs.

      RETRY_SLEEP_INIT_MS

      1000 ms

      No

      The interval at which a retry is performed. The amount of time consumed by the retries for a request is calculated by using the following formula: Value of the RETRY_SLEEP_INIT_MS parameter + Value of the RETRY_COUNT parameter × Value of the RETRY_SLEEP_STEP_MS parameter.

      RETRY_SLEEP_STEP_MS

      10*1000 ms

      No

      The amount of time required for a retry. The amount of time consumed by the retries for a request is calculated by using the following formula: Value of the RETRY_SLEEP_INIT_MS parameter + Value of the RETRY_COUNT parameter × Value of the RETRY_SLEEP_STEP_MS parameter.

      CONNECTION_MAX_IDLE_MS

      60000 ms

      No

      The idle timeout period for the connections that are used to read and write data. If a connection remains idle for a period of time longer than the specified idle timeout period, Holo Client automatically releases the connection.

      FIXED_CONNECTION_MODE

      false

      No

      Specifies whether to use fixed connections. In a non-fixed copy mode, such as the INSERT mode, data writes and point queries do not occupy connections.

      Note

      The fixed connection feature is in beta release and is available only when the connector version is V1.2.0 or later and the Hologres engine version is r1.3.0 or later.

Data type mappings

The following table describes data type mappings between Spark and Hologres.

Spark data type

Hologres data type

IntegerType

INT

LongType

BIGINT

StringType

TEXT

DecimalType

NUMERIC(38, 18)

BooleanType

BOOL

DoubleType

DOUBLE PRECISION

FloatType

FLOAT

TimestampType

TIMESTAMPTZ

DateType

DATE

BinaryType

BYTEA

ArrayType(IntegerType)

int4[]

ArrayType(LongType)

int8[]

ArrayType(FloatType

float4[]

ArrayType(DoubleType)

float8[]

ArrayType(BooleanType)

boolean[]

ArrayType(StringType)

text[]