Spark is a unified analytics engine for large-scale data processing. Hologres integrates tightly with the community edition of Spark and EMR Spark to help you quickly build a data warehouse. The Spark connector provided by Hologres supports creating a Hologres Catalog in a Spark cluster, enabling high-performance batch reads and imports using foreign tables. This approach delivers better performance than native Java Database Connectivity (JDBC).
Limits
Only Hologres instances version 1.3 or later support the Spark connector. You can view your current instance version on the Instance Details page in the Hologres Management Console. If your instance runs a version earlier than 1.3, use Instance Upgrade or join the Hologres communication group by searching for DingTalk group number 32314975 to request an upgrade.
Preparations
-
Install a compatible Spark environment where you can run `spark-sql`, `spark-shell`, or `pyspark` commands. Use Spark 3.3.0 or later to avoid dependency issues and access more features.
-
Use Alibaba Cloud EMR Spark to quickly build a Spark environment and connect to your Hologres instance. For more information, see EMR Spark features.
-
You can also build a Spark environment in your preferred setup. For more information, see Apache Spark.
-
-
To read from or write to Hologres using Spark, reference the connector JAR package
hologres-connector-spark-3.x. This topic uses version 1.5.2 of the Spark connector. You can download it from the Maven Central Repository. All connector resources are open source. For more information, see Hologres-Connectors. -
If you develop Spark jobs in Java and debug them locally in a tool such as IntelliJ IDEA, add the following Maven dependency to your pom.xml file.
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-3.x</artifactId> <version>1.5.2</version> <classifier>jar-with-dependencies</classifier> </dependency>
Hologres Catalog
Starting from version 1.5.2, the Spark connector supports Hologres Catalog, allowing you to conveniently read from and write to Hologres using foreign tables.
In Spark, each Hologres Catalog corresponds to a database in Hologres, and each namespace in a Hologres Catalog corresponds to a schema in that database. The following section shows how to use Hologres Catalog in Spark.
Hologres Catalog does not support creating tables.
This topic uses a Hologres instance with the following database and table names:
test_db --database
public.test_table1 --table in the public schema
public.test_table2
test_schema.test_table3 -- table in the test_schema schema
Initialize a Hologres Catalog
In your Spark cluster, start spark-sql, load the Hologres connector, and specify the Catalog parameters.
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
Common commands for Hologres Catalog
-
Load a Hologres Catalog
A Hologres Catalog in Spark corresponds exactly to a Hologres database and cannot be changed during use.
USE hologres_external_test_db; -
Query all namespaces.
A namespace in Spark corresponds to a schema in Hologres. The default schema is `public`. You can use the
USEinstruction to change the default schema.-- View all namespaces in the Hologres Catalog. This corresponds to all schemas in Hologres. SHOW NAMESPACES; -
Query tables in a namespace.
-
Query all tables.
SHOW TABLES; -
Query tables in a specific namespace.
USE test_schema; SHOW TABLES; -- Or use SHOW TABLES IN test_schema;
-
-
Read from and write to tables.
Use SELECT and INSERT statements to read from and write to Hologres foreign tables in the Catalog.
-- Read from a table. SELECT * FROM public.test_table1; -- Write to a table. INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;
Import data into Hologres
This section uses the customer table from the TPC-H dataset as the source data for Hologres. Spark can read Hologres table data in CSV format. Download the customer data. The SQL statement to create the customer table schema is as follows.
CREATE TABLE customer_holo_table
(
c_custkey BIGINT ,
c_name TEXT ,
c_address TEXT ,
c_nationkey INT ,
c_phone TEXT ,
c_acctbal DECIMAL(15,2) ,
c_mktsegment TEXT ,
c_comment TEXT
);
Import data using Spark-SQL
When using Spark-SQL, it is more convenient to load the metadata of a Hologres table using a Catalog. You can also declare a Hologres table by creating a temporary table.
-
Hologres-Connector-Spark versions earlier than 1.5.2 do not support catalogs. You must declare a Hologres table by creating a temporary table.
-
For more information about the parameters for Hologres-Connector-Spark, see Parameter description.
-
Initialize the Hologres Catalog.
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db -
Import data from the source CSV table to the Hologres foreign table.
NoteThe Spark INSERT INTO syntax does not support specifying a partial list of columns for writing using
column_list. For example, you cannot useINSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTableto write only the `c_custkey` field.If you want to write only specific fields, declare a Hologres temporary table with only those fields using
CREATE TEMPORARY VIEW.Write using CATALOG
-- Load the Hologres Catalog. USE hologres_external_test_db; -- Create a CSV data source. CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," -- For local testing, use the absolute path of the file. ); -- Write data from the CSV table to Hologres. INSERT INTO public.customer_holo_table SELECT * FROM csvTable;Write using TEMPORARY VIEW
-- Create a CSV data source. CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," ); -- Create a Hologres temporary table. CREATE TEMPORARY VIEW hologresTable ( c_custkey BIGINT, c_name STRING, c_phone STRING) USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", table "customer_holo_table" ); INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;
Import data using DataFrame
When you develop Spark jobs using spark-shell, pyspark, or other tools, you can also call the DataFrame `write` interface to write data. Different development languages convert the data read from a CSV file into a DataFrame and then write it to a Hologres instance. The following are sample codes. For more information about the parameters for Hologres-Connector-Spark, see Parameter description.
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
// Schema of the CSV source.
val schema = StructType(Array(
StructField("c_custkey", LongType),
StructField("c_name", StringType),
StructField("c_address", StringType),
StructField("c_nationkey", IntegerType),
StructField("c_phone", StringType),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType),
StructField("c_comment", StringType)
))
// Read data from the CSV file into a DataFrame.
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")
// Write the DataFrame to Hologres.
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()
Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;
public class SparkTest {
public static void main(String[] args) {
// Schema of the CSV source.
List<StructField> asList =
Arrays.asList(
DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
DataTypes.createStructField("c_name", DataTypes.StringType, true),
DataTypes.createStructField("c_address", DataTypes.StringType, true),
DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
DataTypes.createStructField("c_phone", DataTypes.StringType, true),
DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
DataTypes.createStructField("c_comment", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(asList);
// Run in local mode.
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
// Read data from the CSV file into a DataFrame.
// For local testing, use the absolute path of the customer data.
Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");
// Write the DataFrame to Hologres.
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save()
}
}
The following configuration is required for the Maven file.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>
Python
from pyspark.sql.types import *
# Schema of the CSV source.
schema = StructType([
StructField("c_custkey", LongType()),
StructField("c_name", StringType()),
StructField("c_address", StringType()),
StructField("c_nationkey", IntegerType()),
StructField("c_phone", StringType()),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType()),
StructField("c_comment", StringType())
])
# Read data from the CSV file into a DataFrame.
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')
# Write the DataFrame to Hologres.
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save()
To execute Spark jobs in different languages, perform the following operations:
-
Scala
-
You can use the sample code to generate a sparktest.scala file and run the job as follows.
-- Load dependencies. spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- For local testing, use the absolute path to load the file. scala> :load D:/sparktest.scala -
You can also paste the sample code directly after loading the dependencies to run it.
-
-
Java
You can import the sample code into your development tool and use Maven to package it. For example, the package name could be spark_test.jar. Run the job with the following code.
-- Use the absolute path for the job JAR package. spark-submit --class SparkTest --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_test.jar -
Python
After running the following code, you can paste the sample code directly to run it.
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
Read data from Hologres
-
Starting from spark-connector version 1.3.2, you can read data from Hologres. Compared with Spark's default
jdbc-connector, thespark-connectorcan read data concurrently based on the shards of the Hologres table, which provides better performance. The read concurrency depends on the number of shards in the table. Thespark-connectorcan be limited by theread.max_task_countparameter. The job ultimately generatesMin(shardCount, max_task_count)read tasks. It also supports schema inference. If you do not provide a schema, the connector infers the Spark-side schema from the Hologres table schema. -
Starting from spark-connector version 1.5.0, reading from a Hologres table supports predicate pushdown, LIMIT pushdown, and column pruning. It also supports reading data by passing a Hologres
SELECT QUERY. This version introduces batch mode reading, which improves read performance by three to four times compared with previous versions.
Read data using Spark-SQL
When using Spark-SQL, it is more convenient to load the metadata of a Hologres table using a Catalog. You can also declare a Hologres table by creating a temporary table.
-
Hologres-Connector-Spark versions earlier than 1.5.2 do not support catalogs. You must declare a Hologres table by creating a temporary table.
-
For more information about the parameters for Hologres-Connector-Spark, see Parameter description.
-
Initialize the Hologres Catalog.
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db -
Read data from Hologres.
-
Read data using a Catalog.
-- Load the Hologres Catalog. USE hologres_external_test_db; -- Read from the Hologres table. Column pruning and predicate pushdown are supported. SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10; -
Read data by creating a temporary table.
CREATE TEMPORARY VIEW(table)
CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.max_task_count "80", // The maximum number of tasks to use for reading the Hologres table. table "customer_holo_table" ); -- Column pruning and predicate pushdown are supported. SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;CREATE TEMPORARY VIEW(query)
CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.query "select c_custkey,c_name,c_phone from customer_holo_table where c_custkey < 500 limit 10" ); SELECT * FROM hologresTable LIMIT 5;
-
Read Hologres data into a DataFrame
When you develop Spark jobs using spark-shell, pyspark, or other tools, you can call Spark's Read interface to read data into a DataFrame for subsequent calculations. The following are examples of reading a Hologres table into a DataFrame in different languages. For more information about the parameters for Hologres-Connector-Spark, see Parameter description.
Scala
val readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // The maximum number of tasks to use for reading the Hologres table.
.load()
.filter("c_custkey < 500")
)
readDf.select("c_custkey", "c_name", "c_phone").show(10)
Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSelect {
public static void main(String[] args) {
// Run in local mode.
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
Dataset<Row> readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // The maximum number of tasks to use for reading the Hologres table.
.load()
.filter("c_custkey < 500")
);
readDf.select("c_custkey", "c_name", "c_phone").show(10);
}
}
The following configuration is required for the Maven file.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>
Python
readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()
readDf.select("c_custkey", "c_name", "c_phone").show(10)
To execute Spark jobs in different languages, perform the following operations:
-
Scala
-
You can use the sample code to generate a sparkselect.scala file and run the job as follows.
-- Load dependencies. spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- For local testing, use the absolute path to load the file. scala> :load D:/sparkselect.scala -
You can also paste the sample code directly after loading the dependencies to run it.
-
-
Java
You can import the sample code into your development tool and use Maven to package it. For example, the package name could be spark_select.jar. Run the job with the following code.
-- Use the absolute path for the job JAR package. spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_select.jar -
Python
After running the following code, you can paste the sample code directly to run it.
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
Parameter description
General parameters
|
Parameter |
Default value |
Required |
Description |
|
username |
None |
Yes |
|
|
password |
None |
Yes |
|
|
table |
None |
Yes |
The name of the Hologres table to read from or write to. Note
When reading data, you can also use the |
|
jdbcurl |
None |
Yes |
The JDBC URL for the Hologres real-time data API, in the format |
|
enable_serverless_computing |
false |
No |
Specifies whether to use Serverless resources. This parameter is valid only for read operations and write operations in |
|
serverless_computing_query_priority |
3 |
No |
The execution priority for Serverless Computing. |
|
statement_timeout_seconds |
28800 (8 hours) |
No |
The timeout period for query execution, in seconds. |
|
retry_count |
3 |
No |
The number of retries when a connection fails. |
|
direct_connect |
For environments that support direct connection, direct connection is used by default. |
No |
The bottleneck for batch data reads and writes is often the network throughput of the Endpoint. Therefore, the system tests whether the current environment can directly connect to the Hologres Frontend (access node). If a direct connection is supported, it is used by default. Set this parameter to |
Write parameters
The Hologres connector supports Spark's SaveMode parameter. For SQL, this means INSERT INTO or INSERT OVERWRITE. For DataFrame, this means setting SaveMode to Append or Overwrite when writing. Overwrite creates a temporary table for writing and replaces the original table after the write succeeds. Use this with caution.
|
Parameter name |
Previous parameter name |
Default value |
Required |
Description |
|
write.mode |
copy_write_mode |
auto |
No |
The write mode. Valid values are as follows. For a comparison of write modes, see Comparison of batch write modes.
|
|
write.copy.max_buffer_size |
max_cell_buffer_size |
52428800 (50 MB) |
No |
When writing in COPY mode, you usually do not need to adjust the maximum length of the local buffer. However, if a buffer overflow occurs when writing large fields, such as very long strings, you can increase this value. |
|
write.copy.dirty_data_check |
copy_write_dirty_data_check |
false |
No |
Specifies whether to perform dirty data validation. If you enable this feature, the system can pinpoint the specific row that failed to be written when dirty data is found. However, this affects write performance. Do not enable this feature unless you are troubleshooting. |
|
write.on_conflict_action |
write_mode |
INSERT_OR_REPLACE |
No |
The policy to use when inserting into a table with a primary key:
|
The following parameters take effect only when write.mode is set to insert.
|
Parameter name |
Previous parameter name |
Default value |
Required |
Description |
|
write.insert.dynamic_partition |
dynamic_partition |
false |
No |
This parameter takes effect only when |
|
write.insert.batch_size |
write_batch_size |
512 |
No |
The maximum batch size for each write thread. A batch is committed when the number of Put operations, after being merged by WriteMode, reaches this value. |
|
write.insert.batch_byte_size |
write_batch_byte_size |
2097152 (2 * 1024 * 1024) |
No |
The maximum batch size for each write thread, in bytes. The default value is 2 MB. A batch is committed when the byte size of the Put data, after being merged by WriteMode, reaches this value. |
|
write.insert.max_interval_ms |
write_max_interval_ms |
10000 |
No |
A batch is committed if the time since the last commit exceeds this value. |
|
write.insert.thread_size |
write_thread_size |
1 |
No |
The number of concurrent write threads. Each concurrent thread occupies one database connection. |
Read parameters
|
Parameter name |
Previous parameter name (Version 1.5.0 and earlier) |
Default value |
Required |
Description |
|
read.mode |
bulk_read |
auto |
No |
The read mode. Valid values:
|
|
read.max_task_count |
max_partition_count |
80 |
No |
Divides the Hologres table to be read into multiple partitions. Each partition corresponds to one Spark task. If the `ShardCount` of the Hologres table is less than this parameter, the number of partitions is at most `ShardCount`. |
|
read.copy.max_buffer_size |
/ |
52428800 (50 MB) |
No |
When reading in COPY mode, this is the maximum length of the local buffer. If an exception occurs with large fields, increase this value. |
|
read.push_down_predicate |
push_down_predicate |
true |
No |
Specifies whether to enable predicate pushdown, such as applying filter conditions during a query. This feature supports the pushdown of common filter conditions and column pruning. |
|
read.push_down_limit |
push_down_limit |
true |
No |
Specifies whether to enable LIMIT pushdown. |
|
read.select.batch_size |
scan_batch_size |
256 |
No |
This parameter takes effect only when |
|
read.select.timeout_seconds |
scan_timeout_seconds |
60 |
No |
This parameter takes effect only when |
|
read.query |
query |
None |
No |
Uses the specified Note
|
Data type mapping
|
Spark type |
Hologres type |
|
ShortType |
SMALLINT |
|
IntegerType |
INT |
|
LongType |
BIGINT |
|
StringType |
TEXT |
|
StringType |
JSON |
|
StringType |
JSONB |
|
DecimalType |
NUMERIC(38, 18) |
|
BooleanType |
BOOL |
|
DoubleType |
DOUBLE PRECISION |
|
FloatType |
FLOAT |
|
TimestampType |
TIMESTAMPTZ |
|
DateType |
DATE |
|
BinaryType |
BYTEA |
|
BinaryType |
ROARINGBITMAP |
|
ArrayType(IntegerType) |
INT4[] |
|
ArrayType(LongType) |
INT8[] |
|
ArrayType(FloatType) |
FLOAT4[] |
|
ArrayType(DoubleType) |
FLOAT8[] |
|
ArrayType(BooleanType) |
BOOLEAN[] |
|
ArrayType(StringType) |
TEXT[] |
Connection calculation
When Hologres-Connector-Spark reads or writes data, it uses a certain number of JDBC connections. This number can be affected by the following factors:
-
Spark concurrency. This is the number of tasks running concurrently, which can be seen on the Spark UI when the job is running.
-
The number of connections used by the connector for each concurrent task:
-
For writing in COPY mode, each concurrent task uses only one JDBC connection.
-
For writing in INSERT mode, each concurrent task uses
write_thread_sizeJDBC connections. -
For reading, each concurrent task uses one JDBC connection.
-
-
Connections used by other aspects: When a job starts, it performs operations such as schema retrieval, which may briefly establish a connection.
Therefore, the total number of connections used by a job can be calculated using the following formulas:
|
Work item |
Connections used |
|
Query metadata from Catalog |
1 |
|
Read data |
parallelism × 1 + 1 |
|
Write in COPY mode |
parallelism × 1 + 1 |
|
Write in INSERT mode |
parallelism × write_thread_size + 1 |
The connection calculation above assumes that the number of tasks Spark can run concurrently is greater than the number of tasks generated by the job.
The number of concurrent tasks that Spark can run may be affected by user-set parameters, such as spark.executor.instances, and may also be affected by Hadoop's file splitting policy. For more information, see Apache Hadoop.