This topic describes how to use Spark to read data from multiple types of sources and write the data to Hologres.
Background information
Spark is an analytics engine that processes a large amount of data in a centralized manner. Hologres is integrated with both Apache Spark and E-MapReduce (EMR) Spark to help you build data warehouses in an efficient manner. Hologres provides the Spark connector that you can use to write data to Hologres in batch mode. You can use Spark to read data from multiple types of sources, such as files, Hive tables, MySQL tables, and PostgreSQL tables.
Hologres is compatible with PostgreSQL. You can use Spark to read Hologres data based on the PostgreSQL Java Database Connectivity (JDBC) driver. Then, you can extract, transform, and load (ETL) the data and write the processed data back to Hologres or to other destinations.
Prerequisites
The version of your Hologres instance is V0.9 or later. You can view the version of your Hologres instance on the instance details page in the Hologres console. If the version of your Hologres instance is earlier than V0.9, manually upgrade your Hologres instance in the Hologres console or join the Hologres DingTalk group for technical support. For more information about how to manually upgrade your Hologres instance in the Hologres console, see Instance upgrades. For more information about how to obtain technical support, see Obtain online support for Hologres.
A Spark environment that is of a version supported by Hologres is installed. This way, you can run the
spark-shell
command in the Spark environment.
(Recommended) Use the Spark connector to write data to Hologres
We recommend that you use the built-in Spark connector of Hologres to write data to Hologres. The Spark connector is used together with Holo Client. Compared with other methods of writing data, the Spark connector provides better write performance. To use the Spark connector to write data, perform the following steps. For more information about the sample code, see the Example of using the Spark connector to write data to Hologres section in this topic.
Preparations
Obtain a JAR package.
The Spark connector is available for Spark 2 and Spark 3. When you use the Spark connector to write data to Hologres, you must reference a JAR package of the Spark connector. The JAR package is already published in the Maven central repository. You can refer to the following pom.xml file for configuration.
NoteRelevant connectors are also open-source. For more information, visit the alibabacloud-hologres-connectors page.
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-2.x</artifactId> <version>1.0.1</version> <classifier>jar-with-dependencies</classifier> </dependency>
You can click the following links to download the JAR packages that are provided by Hologres:
Use the JAR package.
Run the following command to start Spark and load the connector:
spark-shell --jars hologres-connector-spark-2.x-1.0-SNAPSHOT-jar-with-dependencies.jar
You can also run the following command to start PySpark and load the connector:
pyspark --jars hologres-connector-spark-2.x-1.0-SNAPSHOT-jar-with-dependencies.jar
Example of using the Spark connector to write data to Hologres
The following example shows how to use the Spark connector to write data to Hologres.
Create a table in Hologres.
Execute the following SQL statement in Hologres to create a table to which you want to write data:
CREATE TABLE tb008 ( id BIGINT primary key, counts INT, name TEXT, price NUMERIC(38, 18), out_of_stock BOOL, weight DOUBLE PRECISION, thick FLOAT, time TIMESTAMPTZ, dt DATE, by bytea, inta int4[], longa int8[], floata float4[], doublea float8[], boola boolean[], stringa text[] );
Prepare data in Spark and write the data to Hologres.
Run the following command in the CLI to enable the Spark connector:
spark-shell --jars hologres-connector-spark-2.x-1.0-SNAPSHOT-jar-with-dependencies.jar
Run the
load spark-test.scala
command in spark-shell to load sample data.The spark-test.scala file contains the following data:
import java.sql.{Timestamp, Date} import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import com.alibaba.hologres.spark2.sink.SourceProvider val byteArray = Array(1.toByte, 2.toByte, 3.toByte, 'b'.toByte, 'a'.toByte) val intArray = Array(1, 2, 3) val longArray = Array(1L, 2L, 3L) val floatArray = Array(1.2F, 2.44F, 3.77F) val doubleArray = Array(1.222, 2.333, 3.444) val booleanArray = Array(true, false, false) val stringArray = Array("abcd", "bcde", "defg") val data = Seq( Row(-7L, 100, "phone1", BigDecimal(1234.567891234), false, 199.35, 6.7F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("2021-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray), Row(6L, -10, "phone2", BigDecimal(1234.56), true, 188.45, 7.8F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("1970-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray), Row(1L, 10, "phone3\"", BigDecimal(1234.56), true, 111.45, null, Timestamp.valueOf("2020-02-29 00:12:33"), Date.valueOf("2020-07-23"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray) ) val schema = StructType(Array( StructField("id", LongType), StructField("counts", IntegerType), StructField("name", StringType, false), // The value false indicates that the value of this field cannot be null in the table. StructField("price", DecimalType(38, 12)), StructField("out_of_stock", BooleanType), StructField("weight", DoubleType), StructField("thick", FloatType), StructField("time", TimestampType), StructField("dt", DateType), StructField("by", BinaryType), StructField("inta", ArrayType(IntegerType)), StructField("longa", ArrayType(LongType)), StructField("floata", ArrayType(FloatType)), StructField("doublea", ArrayType(DoubleType)), StructField("boola", ArrayType(BooleanType)), StructField("stringa", ArrayType(StringType)) )) val df = spark.createDataFrame( spark.sparkContext.parallelize(data), schema ) df.show() // Configure the following parameters to write the sample data to Hologres. df.write.format("hologres") // Set the value to hologres. .option(SourceProvider.USERNAME, "your_username") // The AccessKey ID of your Alibaba Cloud account. .option(SourceProvider.PASSWORD, "your_password") // The AccessKey secret of your Alibaba Cloud account. .option(SourceProvider.ENDPOINT, "Ip:Port") // The IP address and the port number of your Hologres instance. .option(SourceProvider.DATABASE, "test_database") // The name of the Hologres database. The name is test_database in this example. .option(SourceProvider.TABLE, "tb008") // The name of the Hologres table to which you want to write data. The table name is tb008 in this example. .option(SourceProvider.WRITE_BATCH_SIZE, 512) // The maximum number of requests allowed in a batch. .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) // The data definition language (DDL) statement for DataFrame. This parameter is required only for Spark 3.X. .mode(SaveMode.Append) // The save mode. This parameter is required only for Spark 3.X. .save()
Query data in the destination table.
You can query data in the destination table in the Hologres console to check the written data. The following figure shows an example.
Example of using the PySpark connector to write data to Hologres
Run the following command to start PySpark and load the PySpark connector:
pyspark --jars hologres-connector-spark-3.x-1.3-SNAPSHOT-jar-with-dependencies.jar
Use metadata to create a DataFrame object and call the connector to write data to Hologres. The operation is similar to the operation when you use the Spark connector.
data = [[1, "Elia"], [2, "Teo"], [3, "Fang"]] df = spark.createDataFrame(data, schema="id LONG, name STRING") df.show() df2.write.format("hologres").option( "username", "your_username").option( "password", "your_password").option( "endpoint", "hologres_endpoint").option( "database", "test_database").option( "table", "tb008").save()
Use Spark to read data from a specific type of source and write the data to Hologres
Read data from a specific type of source.
You can use Spark to read data from different types of sources. The following examples show how to read data from Hologres or another type of source:
Read data from Hologres
Hologres is compatible with PostgreSQL. You can use Spark to read Hologres data based on the PostgreSQL JDBC driver. The following sample code is for reference only.
NoteBefore you read Hologres data, download the PostgreSQL JDBC JAR package on the official website. In this example,
postgresql-42.2.18
is used. Then, run the./bin/spark-shell --jars /path/to/postgresql-42.2.18.jar
command in spark-shell to load the PostgreSQL JDBC JAR package. You can also load the PostgreSQL JDBC JAR package together with the JAR package of the Spark connector.// Read from some table, for example: tb008 val readDf = spark.read .format("jdbc") // Read Hologres data based on the PostgreSQL JDBC driver. .option("driver","org.postgresql.Driver") .option("url", "jdbc:postgresql://Ip:Por/test_database") .option("dbtable", "tb008") .option("user", "your_username") .option("password", "your_password") .load()
Read data from another type of source such as a Parquet file
You can use Spark to read data from other types of sources, such as a Parquet file or a Hive table. The following example provides the sample code:
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val hiveContext = new HiveContext(sc) // Read from some table, for example: phone val readDf = hiveContext.sql("select * from hive_database.phone")
Write the data to Hologres.
import com.alibaba.hologres.spark2.sink.SourceProvider -- Write to hologres table df.write .format("hologres") .option(SourceProvider.USERNAME, "your_username") .option(SourceProvider.PASSWORD, "your_password") .option(SourceProvider.ENDPOINT, "Ip:Port") .option(SourceProvider.DATABASE, "test_database") .option(SourceProvider.TABLE, table) .option(SourceProvider.WRITE_BATCH_SIZE, 512) -- The maximum number of requests allowed in a batch. .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) -- The DDL statement for DataFrame. This parameter is required only for Spark 3.X. .mode(SaveMode.Append) // The save mode. This parameter is required only for Spark 3.X. .save()
Use Spark to write data to Hologres in real time
Execute the following statement to create a table to which you want to write data in Hologres:
CREATE TABLE test_table_stream ( value text, count bigint );
Read data from your on-premises machine. Collect word frequency statistics and write the statistics to Hologres in real time. The following example provides sample code:
Code
val spark = SparkSession .builder .appName("StreamToHologres") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() -- Split the lines into words val words = lines.as[String].flatMap(_.split(" ")) -- Generate running word count val wordCounts = words.groupBy("value").count() wordCounts.writeStream .outputMode(OutputMode.Complete()) .format("hologres") .option(SourceProvider.USERNAME, "your_username") .option(SourceProvider.PASSWORD, "your_password") .option(SourceProvider.JDBCURL, "jdbc:postgresql://Ip:Port/dbname") .option(SourceProvider.TABLE, "test_table_stream") .option("batchsize", 1) .option("isolationLevel", "NONE") .option("checkpointLocation", checkpointLocation) .start() .awaitTermination()
Parameters
Parameter
Default value
Required
Description
USERNAME
No default value
Yes
The AccessKey ID of your Alibaba Cloud account. You can obtain the AccessKey ID on the Security Management page.
We recommend that you configure environment variables and obtain the AccessKey ID and AccessKey secret from the environment variables. This helps reduce the leak risk.
PASSWORD
No default value
Yes
The AccessKey secret of your Alibaba Cloud account. You can obtain the AccessKey secret on the Security Management page.
We recommend that you configure environment variables and obtain the AccessKey ID and AccessKey secret from the environment variables. This helps reduce the leak risk.
TABLE
No default value
Yes
The name of the Hologres table to which you want to write data.
ENDPOINT
No default value
Configure this parameter or the JDBCURL parameter.
The endpoint of your Hologres instance.
You can obtain the endpoint of your Hologres instance in the Network Information section of the instance details page in the Hologres console.
DATABASE
No default value
Configure this parameter or the JDBCURL parameter.
The name of the Hologres database in which the destination table resides.
JDBCURL
No default value
Configure only this parameter or the ENDPOINT and DATABASE parameters.
The JDBC URL of your Hologres instance.
COPY_WRITE_MODE
true
No
Specifies whether to write data in the fixed copy mode. Fixed copy is a new feature that is supported in Hologres V1.3. In the fixed copy mode, data is written in the streaming mode rather than in batches. Therefore, data writing in the fixed copy mode provides higher throughput and lower data latency, and consumes less client memory resources than data writing by using the INSERT statement.
NoteIf you want to use the fixed copy mode, the connector version must be V1.3.0 or later, and the Hologres engine version must be r1.3.34 or later.
COPY_WRITE_DIRTY_DATA_CHECK
false
No
Specifies whether to check dirty data. This parameter takes effect only when COPY_WRITE_MODE is set to true. If you set this parameter to true and dirty data is generated, the system can locate the row to which data fails to be written.
NoteChecking of dirty data negatively affects the write performance. We recommend that you set this parameter to true only in the troubleshooting process.
COPY_WRITE_DIRECT_CONNECT
true for scenarios that allow direct connections
No
This parameter takes effect only when COPY_WRITE_MODE is set to true. The amount of data that can be written in the copy mode is determined based on the throughput of the VPC endpoint. The system checks whether the environment can directly connect to the Hologres FE node when data is written in the copy mode. Direct connections are used by default if the environment can directly connect to the Hologres FE node. If this parameter is set to false, direct connections are not used.
INPUT_DATA_SCHEMA_DDL
No default value
This parameter is required for Spark 3.X. Set this parameter to
<your_DataFrame>.schema.toDDL
.The DDL statement for DataFrame in Spark.
WRITE_MODE
INSERT_OR_REPLACE
No
The policy that is used to handle primary key conflicts. This parameter is required if the destination table has a primary key. Valid values:
INSERT_OR_IGNORE: discards the data that you want to write if a primary key conflict occurs.
INSERT_OR_UPDATE: updates the relevant columns in the destination table if a primary key conflict occurs.
INSERT_OR_REPLACE: updates all columns in the destination table if a primary key conflict occurs.
WRITE_BATCH_SIZE
512
No
The maximum number of requests that are allowed in a batch in a thread to write data. If the total number of PUT requests reaches the upper limit that is specified by the WRITE_BATCH_SIZE parameter after conflicts are handled based on the WRITE_MODE parameter, the data is submitted in a batch.
WRITE_BATCH_BYTE_SIZE
2 MB
No
The maximum number of bytes that are allowed in a batch in a thread to write data. If the total number of bytes of the PUT requests reaches the upper limit that is specified by the WRITE_BATCH_BYTE_SIZE parameter after conflicts are handled based on the WRITE_MODE parameter, the data is submitted in a batch.
WRITE_MAX_INTERVAL_MS
10000 ms
No
The interval at which data is submitted in a batch.
WRITE_FAIL_STRATEGY
TYR_ONE_BY_ONE
No
The policy that is used to handle submission failures. If a batch cannot be submitted, Holo Client submits the data entries one at a time in the batch in the specified sequence. If a data entry cannot be submitted, Holo Client returns an error message that contains the information about the data entry.
WRITE_THREAD_SIZE
1
No
The number of parallel threads that are used to write data. Each thread occupies one connection.
The total number of connections occupied by a Spark job varies based on the Spark parallelism. The total number of connections can be calculated by using the following formula:
Total number of connections = Value of spark.default.parallelism × Value of WRITE_THREAD_SIZE
.DYNAMIC_PARTITION
false
No
Specifies whether to automatically create a partition if the data is written to a parent table that has no partitions. Valid values: true and false. true: A partition is automatically created if the data is written to a parent table that has no partitions.
RETRY_COUNT
3
No
The maximum number of retries allowed to write and query data if a connection failure occurs.
RETRY_SLEEP_INIT_MS
1000 ms
No
The interval at which a retry is performed. The amount of time consumed by the retries for a request is calculated by using the following formula: Value of the RETRY_SLEEP_INIT_MS parameter + Value of the RETRY_COUNT parameter × Value of the RETRY_SLEEP_STEP_MS parameter.
RETRY_SLEEP_STEP_MS
10*1000 ms
No
The amount of time required for a retry. The amount of time consumed by the retries for a request is calculated by using the following formula: Value of the RETRY_SLEEP_INIT_MS parameter + Value of the RETRY_COUNT parameter × Value of the RETRY_SLEEP_STEP_MS parameter.
CONNECTION_MAX_IDLE_MS
60000 ms
No
The idle timeout period for the connections that are used to read and write data. If a connection remains idle for a period of time longer than the specified idle timeout period, Holo Client automatically releases the connection.
FIXED_CONNECTION_MODE
false
No
Specifies whether to use fixed connections. In a non-fixed copy mode, such as the INSERT mode, data writes and point queries do not occupy connections.
NoteThe fixed connection feature is in beta release and is available only when the connector version is V1.2.0 or later and the Hologres engine version is r1.3.0 or later.
Data type mappings
The following table describes data type mappings between Spark and Hologres.
Spark data type | Hologres data type |
IntegerType | INT |
LongType | BIGINT |
StringType | TEXT |
DecimalType | NUMERIC(38, 18) |
BooleanType | BOOL |
DoubleType | DOUBLE PRECISION |
FloatType | FLOAT |
TimestampType | TIMESTAMPTZ |
DateType | DATE |
BinaryType | BYTEA |
ArrayType(IntegerType) | int4[] |
ArrayType(LongType) | int8[] |
ArrayType(FloatType | float4[] |
ArrayType(DoubleType) | float8[] |
ArrayType(BooleanType) | boolean[] |
ArrayType(StringType) | text[] |