Spark is a unified analytics engine for large-scale data processing. Hologres is integrated with Apache Spark and Alibaba Cloud's EMR Serverless Spark to help enterprises quickly build data warehouses. The Spark connector provided by Hologres lets you create Hologres catalogs in a Spark cluster. This enables you to read data from and write data to Hologres in batches using external tables. The Spark connector provides better performance than the native Java Database Connectivity (JDBC).
Version requirement
The Spark connector requires Hologres V1.3+. Check your instance version on the Instance Details page in the Hologres console. If your instance version is earlier than V1.3, you can upgrade your instance or join the Hologres DingTalk group (ID: 32314975) to request an instance upgrade.
Preparations
Install a Spark environment that can run spark-sql, spark-shell, or pyspark commands. We recommend using Spark 3.3.0 or later to avoid dependency issues and access more features.
You can use EMR Serverless Spark to quickly build a Spark environment and connect it to a Hologres instance. For more information, see Enhanced features of Spark in EMR.
You can also set up an independent Spark environment. For more information, see Apache Spark.
To read data from and write data to Hologres using Spark, you need the
hologres-connector-spark-3.xconnector JAR package. This topic uses version 1.5.2 as an example. You can download the package from the Maven Central Repository. The connector resources are open-source. For more information, see Hologres-Connectors.To develop Spark jobs in Java and perform local debugging in a tool such as IntelliJ IDEA, add the following Maven dependency to the pom.xml file.
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-3.x</artifactId> <version>1.5.2</version> <classifier>jar-with-dependencies</classifier> </dependency>
Hologres catalog
Hologres catalogs are supported in Spark connector 1.5.2 and later. You can use external tables to easily read data from and write data to Hologres.
Each Hologres catalog in Spark maps to a database in Hologres. Each namespace in a Hologres catalog maps to a schema in the mapped database. The following sections describe how to use Hologres catalogs in Spark.
Hologres catalogs do not support table creation.
This topic uses the following database and tables in a Hologres instance:
test_db -- Database
public.test_table1 -- Table in the public schema
public.test_table2
test_schema.test_table3 -- Table in the test_schema schema Initialize a Hologres catalog
Start spark-sql in the Spark cluster, load the Hologres connector, and specify the catalog parameters.
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_dbCommon Hologres catalog commands
Load a Hologres catalog
A Hologres catalog in Spark maps to a Hologres database. This mapping cannot be changed during the session.
USE hologres_external_test_db;Query all namespaces.
A namespace in Spark maps to a schema in Hologres. The default schema is
public. Run theUSEcommand to change the default schema.-- View all namespaces in the Hologres catalog, which correspond to all schemas in the Hologres database. SHOW NAMESPACES;Query tables in a namespace.
Query all tables.
SHOW TABLES;Query tables in a specific namespace.
USE test_schema; SHOW TABLES; -- Or, use the following statement. SHOW TABLES IN test_schema;
Read data from and write data to a table.
Use SELECT and INSERT statements to read data from and write data to Hologres external tables in the catalog.
-- Read data from the table. SELECT * FROM public.test_table1; -- Write data to the table. INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;
Import data to Hologres
The test data in this section is from the customer table in a TPC-H dataset. You can download the sample customer data in CSV format. The following SQL statement creates the customer table schema.
CREATE TABLE customer_holo_table
(
c_custkey BIGINT ,
c_name TEXT ,
c_address TEXT ,
c_nationkey INT ,
c_phone TEXT ,
c_acctbal DECIMAL(15,2) ,
c_mktsegment TEXT ,
c_comment TEXT
);Import data using Spark-SQL
When you use Spark-SQL, it is more convenient to load the metadata of a Hologres table using a catalog. You can also declare a Hologres table by creating a temporary table.
Spark connectors earlier than 1.5.2 do not support catalogs. You can only declare a Hologres table by creating a temporary table.
For more information about the parameters for Hologres Spark connector, see Parameters.
Initialize a Hologres catalog.
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_dbImport data from a CSV source table to a Hologres external table.
NoteThe INSERT INTO syntax of Spark does not support using
column_listto specify a subset of columns to write to. For example, you cannot useINSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTableto write data only to the c_custkey field.If you want to write data to specific fields, use the
CREATE TEMPORARY VIEWstatement to declare a Hologres temporary table that contains only the required fields.Write data using a catalog
-- Load a Hologres catalog. USE hologres_external_test_db; -- Create a CSV data source. CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," -- For local testing, use the absolute path of the file. ); -- Write data from the CSV table to Hologres. INSERT INTO public.customer_holo_table SELECT * FROM csvTable;Write data using a temporary view
-- Create a CSV data source. CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," ); -- Create a Hologres temporary table. CREATE TEMPORARY VIEW hologresTable ( c_custkey BIGINT, c_name STRING, c_phone STRING) USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", table "customer_holo_table" ); INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;
Import data using DataFrame
You can use tools such as spark-shell or pyspark to develop Spark jobs and call the write method of a DataFrame to write data. Data read from a CSV file is converted to a DataFrame and then written to a Hologres instance. The following sections provide sample code for different programming languages. For more information about the parameters for the Hologres connector for Spark, see Parameters.
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
// The schema of the CSV source.
val schema = StructType(Array(
StructField("c_custkey", LongType),
StructField("c_name", StringType),
StructField("c_address", StringType),
StructField("c_nationkey", IntegerType),
StructField("c_phone", StringType),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType),
StructField("c_comment", StringType)
))
// Read data from a CSV file as a DataFrame.
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")
// Write the DataFrame to Hologres.
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;
public class SparkTest {
public static void main(String[] args) {
// The schema of the CSV source.
List<StructField> asList =
Arrays.asList(
DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
DataTypes.createStructField("c_name", DataTypes.StringType, true),
DataTypes.createStructField("c_address", DataTypes.StringType, true),
DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
DataTypes.createStructField("c_phone", DataTypes.StringType, true),
DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
DataTypes.createStructField("c_comment", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(asList);
// Run in local mode.
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
// Read data from a CSV file as a DataFrame.
// For local testing, use the absolute path of the customer data.
Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");
// Write the DataFrame to Hologres.
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save()
}
}The following configuration is required for the Maven file.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>Python
from pyspark.sql.types import *
# The schema of the CSV source.
schema = StructType([
StructField("c_custkey", LongType()),
StructField("c_name", StringType()),
StructField("c_address", StringType()),
StructField("c_nationkey", IntegerType()),
StructField("c_phone", StringType()),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType()),
StructField("c_comment", StringType())
])
# Read data from a CSV file as a DataFrame.
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')
# Write the DataFrame to Hologres.
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save()To use Spark to run jobs in different programming languages, perform the following steps:
Scala
Use the sample code to generate a
sparktest.scalafile and run the following commands to execute the job.-- Load dependencies. spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- For local testing, use the absolute path to load the file. scala> :load D:/sparktest.scalaAlternatively, copy and paste the sample code to execute it after the dependencies are loaded.
Java
Use a development tool to add the sample code and use a Maven tool to package it. For example, if the package name is
spark_test.jar, run the following command to execute the job.-- Use the absolute path of the job JAR package. spark-submit --class SparkTest --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_test.jarPython
After the following command is executed, paste the sample code to execute it.
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
Read data from Hologres
Starting from Spark connector 1.3.2, you can read data from Hologres. Compared with the default JDBC connector of Spark, the Spark connector can concurrently read data from a Hologres table based on the table shards. This improves read performance. The read concurrency is determined by the number of shards in the table. Use the
read.max_task_countparameter to limit the read concurrency of the Spark connector. A job generates a number of read tasks equal toMin(shardCount, max_task_count). Schema inference is also supported. If you do not specify a schema, the Spark schema is inferred from the schema of the Hologres table.Starting from Spark connector 1.5.0, predicate pushdown, LIMIT pushdown, and field pruning are supported when you read data from Hologres tables. You can also use a Hologres
SELECT QUERYto read data. This version supports batch read mode, which improves read performance by three to four times compared to previous versions.
Read data using Spark SQL
When you use Spark SQL, it is more convenient to load the metadata of a Hologres table using a catalog. You can also declare a Hologres table by creating a temporary table.
The Spark connector earlier than 1.5.2 do not support catalogs. You can only declare a Hologres table by creating a temporary table.
For more information about Hologres-Connector-Spark parameters, see Parameters.
Initialize a Hologres catalog.
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_dbRead Hologres data.
Read data using a catalog.
-- Load a Hologres catalog. USE hologres_external_test_db; -- Read data from a Hologres table. Field pruning and predicate pushdown are supported. SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10;Read data by creating a temporary table.
CREATE TEMPORARY VIEW (table)
CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.max_task_count "80", // The maximum number of tasks into which the Hologres table can be split for reading. table "customer_holo_table" ); -- Field pruning and predicate pushdown are supported. SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;CREATE TEMPORARY VIEW (query)
CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.query "select c_custkey,c_name,c_phone from customer_holo_table where c_custkey < 500 limit 10" ); SELECT * FROM hologresTable LIMIT 5;
Read Hologres data as a DataFrame
You can use tools such as spark-shell or pyspark to develop Spark jobs and call Spark's read method to read data as a DataFrame for subsequent processing. The following sections provide sample code that shows how to read data from a Hologres table as a DataFrame in different programming languages. For more information about the connector parameters, see Parameters.
Scala
val readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // The maximum number of tasks into which the Hologres table can be split for reading.
.load()
.filter("c_custkey < 500")
)
readDf.select("c_custkey", "c_name", "c_phone").show(10)Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSelect {
public static void main(String[] args) {
// Run in local mode.
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
Dataset<Row> readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // The maximum number of tasks into which the Hologres table can be split for reading.
.load()
.filter("c_custkey < 500")
);
readDf.select("c_custkey", "c_name", "c_phone").show(10);
}
}The following configuration is required for the Maven file.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>Python
readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()
readDf.select("c_custkey", "c_name", "c_phone").show(10)To use Spark to run jobs in different programming languages, perform the following steps:
Scala
Use the sample code to generate a
sparkselect.scalafile and run the following commands to execute the job.-- Load dependencies. spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- For local testing, use the absolute path to load the file. scala> :load D:/sparkselect.scalaAlternatively, you can paste the sample code to execute it after the dependencies are loaded.
Java
You can use a development tool to add the sample code and use a Maven tool to package it. For example, if the package name is
spark_select.jar, run the following command to execute the job.-- Use the absolute path of the job JAR package. spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_select.jarPython
After the following command is executed, you can paste the sample code to execute it.
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
Parameters
General parameters
Parameter | Default value | Required | Description |
username | (none) | Yes |
|
password | (none) | Yes |
|
table | (none) | Yes | The name of the Hologres table for reading and writing data. Note You can also replace this parameter with the |
jdbcurl | (none) | Yes | The JDBC URL of the Hologres real-time data API, in the format of |
enable_serverless_computing |
| No | Specifies whether to use serverless computing resources. This parameter is valid only for reading data and for writing data in |
serverless_computing_query_priority |
| No | The execution priority of Serverless Computing. |
statement_timeout_seconds |
| No | The timeout period for a query, in seconds. |
retry_count |
| No | The number of retries when the connection fails. |
direct_connect | See the Description column. | No | The bottleneck of batch reading and writing is usually the network throughput of the endpoint. Therefore, Hologres tests whether the current environment can directly connect to the Hologres FrontEnd (access node). If a direct connection is supported, it is used by default. If you set this parameter to |
Write parameters
The Hologres connector supports the SaveMode parameter of Spark. For SQL, this corresponds to INSERT INTO or INSERT OVERWRITE. For a DataFrame, you can set SaveMode to Append or Overwrite when you write data. If you use the Overwrite mode, a temporary table is created for writing data. After the write operation is successful, the temporary table replaces the original table. Use the Overwrite mode only when necessary.
Parameter name (new) | Parameter name (old) | Default value | Required | Description |
write.mode | copy_write_mode | auto | No | The write mode. For more information about the differences between write modes, see Comparison of batch write modes. Valid values:
|
write.copy.max_buffer_size | max_cell_buffer_size | 52428800 (50 MB) | No | The maximum size of the local buffer when you write data in COPY mode. You do not usually need to adjust this value. However, if a buffer overflow occurs when you write large fields, such as extra-long strings, increase this value. |
write.copy.dirty_data_check | copy_write_dirty_data_check | false | No | Specifies whether to check for dirty data. If you enable this feature, the specific row that fails to be written can be precisely located when dirty data is found. However, this affects write performance. Therefore, do not enable this feature unless you are troubleshooting. |
write.on_conflict_action | write_mode | INSERT_OR_REPLACE | No | The policy used when the destination table of an INSERT statement has a primary key:
|
The following parameters take effect only when write.mode is set to insert.
Parameter name | Previous parameter name | Default value | Required | Description |
write.insert.dynamic_partition | dynamic_partition | false | No | This parameter takes effect when |
write.insert.batch_size | write_batch_size | 512 | No | The maximum batch size for each write thread. A batch commit is performed when the number of Put operations after merging reaches the value of WriteBatchSize. |
write.insert.batch_byte_size | write_batch_byte_size | 2097152 (2 × 1024 × 1024) | No | The maximum batch size for each write thread. Unit: bytes. Default value: 2 MB. A batch commit is performed when the byte size of the Put data after merging reaches the value of WriteBatchByteSize. |
write.insert.max_interval_ms | write_max_interval_ms | 10000 | No | A batch commit is triggered if the time elapsed since the last commit exceeds the value of WriteMaxIntervalMs. |
write.insert.thread_size | write_thread_size | 1 | No | The number of concurrent write threads. Each concurrent thread occupies one database connection. |
Read parameters
Parameter name | Previous parameter name (1.5.0 and earlier) | Default value | Required | Description |
read.mode | bulk_read | auto | No | The read mode. Valid values:
|
read.max_task_count | max_partition_count | 80 | No | The Hologres table to be read is split into multiple partitions. Each partition corresponds to a Spark task. If the shard count of the Hologres table is less than the value of this parameter, the number of partitions is at most the shard count. |
read.copy.max_buffer_size | / | 52428800 (50 MB) | No | The maximum size of the local buffer when you read data in COPY mode. If an exception occurs due to a large field size, increase this value. |
read.push_down_predicate | push_down_predicate | true | No | Specifies whether to perform predicate pushdown, such as pushing down filter conditions applied during queries. The pushdown of common filter conditions and column pruning are supported. |
read.push_down_limit | push_down_limit | true | No | Specifies whether to perform Limit pushdown. |
read.select.batch_size | scan_batch_size | 256 | No | This parameter takes effect when |
read.select.timeout_seconds | scan_timeout_seconds | 60 | No | This parameter takes effect when |
read.query | query | None | No | The Note
|
Data type mappings
Spark type | Hologres type |
ShortType | SMALLINT |
IntegerType | INT |
LongType | BIGINT |
StringType | TEXT |
StringType | JSON |
StringType | JSONB |
DecimalType | NUMERIC(38, 18) |
BooleanType | BOOL |
DoubleType | DOUBLE PRECISION |
FloatType | FLOAT |
TimestampType | TIMESTAMPTZ |
DateType | DATE |
BinaryType | BYTEA |
BinaryType | ROARINGBITMAP |
ArrayType(IntegerType) | INT4[] |
ArrayType(LongType) | INT8[] |
ArrayType(FloatType) | FLOAT4[] |
ArrayType(DoubleType) | FLOAT8[] |
ArrayType(BooleanType) | BOOLEAN[] |
ArrayType(StringType) | TEXT[] |
Connection count calculation
The Hologres connector for Spark uses a specific number of JDBC connections for read and write operations. The number of connections is determined by the following factors:
The parallelism of Spark, which is the number of concurrent tasks displayed on the Spark UI when your job is running.
The number of connections used by each concurrent task of the connector:
If you write data in COPY mode, each concurrent task uses one JDBC connection.
If you write data in INSERT mode, the number of JDBC connections used by each concurrent task is the same as the value of the
write_thread_sizeparameter.If you read data, each concurrent task uses one JDBC connection.
The number of connections used for other operations. When a job starts, operations such as schema acquisition are performed, and a connection may be established for a short period.
The total number of JDBC connections used by a job can be calculated using the following formulas.
Item | Number of connections used |
Query metadata using a catalog | 1 |
Read data | parallelism × 1 + 1 |
Write data in COPY mode | parallelism × 1 + 1 |
Write data in INSERT mode | parallelism × write_thread_size + 1 |
The preceding connection count calculation assumes that the number of tasks that Spark can run concurrently is greater than the number of tasks generated by the job.
The number of tasks Spark can run concurrently may be affected by parameter settings, such as spark.executor.instances. It may also be affected by the file block splitting policy of Hadoop. For more information, see Apache Hadoop.