Apache Spark integrates with Hologres through the Hologres Spark connector, which outperforms native JDBC by reading data concurrently across table shards and writing in batch COPY mode. Use this connector to build high-throughput data pipelines between your Spark cluster and Hologres.
Compatibility
| Spark connector | Spark | Hologres instance |
|---|---|---|
| 1.5.2 (this guide) | 3.3.0 or later | 1.3 or later |
To check your instance version, open the Hologres Management Console and go to the Instance Details page. If your instance is earlier than 1.3, upgrade the instance or join the Hologres DingTalk group (group number 32314975) to request an upgrade.
How it works
The connector maps Hologres concepts to Spark concepts as follows:
One Hologres Catalog in Spark = one Hologres database
One namespace in the Hologres Catalog = one schema in that database
For reads, the connector splits the Hologres table into partitions based on shard count and reads them concurrently. The number of Spark tasks is min(ShardCount, read.max_task_count). Starting from connector version 1.5.0, predicate pushdown, LIMIT pushdown, and column pruning are applied automatically, and batch mode reading improves throughput by 3–4x over older versions.
For writes, the connector uses COPY-based modes by default, which provide higher throughput and lower memory usage than INSERT. The write.mode=auto setting picks the optimal mode based on your Hologres version and table schema — you rarely need to override it.
Understanding these read and write modes helps you interpret parameter choices in the Parameter reference section.
Hologres Catalog does not support creating tables.
Prerequisites
Before you begin, ensure that you have:
A Spark environment that supports
spark-sql,spark-shell, orpysparkcommands (Spark 3.3.0 or later)Use EMR Spark to quickly set up a managed Spark cluster
Or set up a community Spark cluster following the Apache Spark documentation
The
hologres-connector-spark-3.xJAR (version 1.5.2): download from Maven Central Repository. The connector is open source: Hologres-Connectors on GitHubYour Hologres JDBC URL, username (AccessKey ID or database user), and password (AccessKey secret or user password). Find the host and port on the Instance Details page under Network Information in the Hologres Management Console
If you develop Spark jobs in Java and debug locally (for example, in IntelliJ IDEA), add the following to your pom.xml:
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>hologres-connector-spark-3.x</artifactId>
<version>1.5.2</version>
<classifier>jar-with-dependencies</classifier>
</dependency>Use Hologres Catalog
The Hologres Catalog (supported from connector version 1.5.2) lets you browse and query Hologres tables using standard Spark SQL, without manually declaring each table.
This guide uses a Hologres instance with the following structure:
test_db -- database
public.test_table1 -- table in the public schema
public.test_table2
test_schema.test_table3 -- table in the test_schema schemaInitialize a Hologres Catalog
Start spark-sql, load the connector JAR, and specify the Catalog configuration:
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=<your-username> \
--conf spark.sql.catalog.hologres_external_test_db.password=<your-password> \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://<host>:<port>/test_dbCommon Catalog commands
Load a Catalog — a Catalog maps to exactly one Hologres database:
USE hologres_external_test_db;List namespaces — each namespace corresponds to a Hologres schema. The default is public:
SHOW NAMESPACES;List tables in a namespace:
-- List tables in the current namespace
SHOW TABLES;
-- Or list tables in a specific namespace
USE test_schema;
SHOW TABLES;
-- Alternatively, without switching namespaces
SHOW TABLES IN test_schema;Read and write tables:
-- Read from a table
SELECT * FROM public.test_table1;
-- Write from one table to another
INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;Write data to Hologres
This section uses the customer table from the TPC-H dataset. Download the sample data, then create the target table in Hologres:
CREATE TABLE customer_holo_table
(
c_custkey BIGINT,
c_name TEXT,
c_address TEXT,
c_nationkey INT,
c_phone TEXT,
c_acctbal DECIMAL(15,2),
c_mktsegment TEXT,
c_comment TEXT
);Write using Spark SQL
Connector versions earlier than 1.5.2 do not support Hologres Catalog. Use the temporary view approach instead.
Initialize the Hologres Catalog (same command as in Initialize a Hologres Catalog).
Load the Catalog, create a temporary view for the CSV source, and write to Hologres:
Write using CATALOG
-- Load the Hologres Catalog
USE hologres_external_test_db;
-- Create a CSV data source
CREATE TEMPORARY VIEW csvTable (
c_custkey BIGINT,
c_name STRING,
c_address STRING,
c_nationkey INT,
c_phone STRING,
c_acctbal DECIMAL(15, 2),
c_mktsegment STRING,
c_comment STRING)
USING csv OPTIONS (
path "resources/customer", sep "," -- Use the absolute path for local testing
);
-- Write data from the CSV source to Hologres
INSERT INTO public.customer_holo_table SELECT * FROM csvTable;Write using TEMPORARY VIEW
-- Create a CSV data source
CREATE TEMPORARY VIEW csvTable (
c_custkey BIGINT,
c_name STRING,
c_address STRING,
c_nationkey INT,
c_phone STRING,
c_acctbal DECIMAL(15, 2),
c_mktsegment STRING,
c_comment STRING)
USING csv OPTIONS (
path "resources/customer", sep ","
);
-- Declare a Hologres temporary view with only the target columns
CREATE TEMPORARY VIEW hologresTable (
c_custkey BIGINT,
c_name STRING,
c_phone STRING)
USING hologres OPTIONS (
jdbcurl "jdbc:postgresql://<host>:<port>/test_db",
username "<your-username>",
password "<your-password>",
table "customer_holo_table"
);
-- Write only the selected columns
INSERT INTO hologresTable SELECT c_custkey, c_name, c_phone FROM csvTable;If you want to write only specific columns, declare a Hologres temporary view with just those columns:
The Spark INSERT INTO syntax does not support specifying a column list directly (for example, INSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTable). Declaring a temporary view with only the target columns is the standard workaround.Write using the DataFrame API
Use the DataFrame write interface when developing Spark jobs with spark-shell, PySpark, or other tools.
All three examples below read CSV data into a DataFrame and write it to Hologres using .format("hologres").
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
// Define the CSV schema
val schema = StructType(Array(
StructField("c_custkey", LongType),
StructField("c_name", StringType),
StructField("c_address", StringType),
StructField("c_nationkey", IntegerType),
StructField("c_phone", StringType),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType),
StructField("c_comment", StringType)
))
// Read from CSV
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")
// Write to Hologres
csvDf.write
.format("hologres")
.option("username", "<your-username>")
.option("password", "<your-password>")
.option("jdbcurl", "jdbc:postgresql://<host>:<port>/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;
public class SparkTest {
public static void main(String[] args) {
// Define the CSV schema
List<StructField> fields = Arrays.asList(
DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
DataTypes.createStructField("c_name", DataTypes.StringType, true),
DataTypes.createStructField("c_address", DataTypes.StringType, true),
DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
DataTypes.createStructField("c_phone", DataTypes.StringType, true),
DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
DataTypes.createStructField("c_comment", DataTypes.StringType, true)
);
StructType schema = DataTypes.createStructType(fields);
SparkSession spark = SparkSession.builder()
.appName("Spark Hologres Write Example")
.master("local[*]")
.getOrCreate();
// Read from CSV (use the absolute path for local testing)
Dataset<Row> csvDf = spark.read()
.format("csv").schema(schema).option("sep", ",")
.load("resources/customer");
// Write to Hologres
csvDf.write()
.format("hologres")
.option("username", "<your-username>")
.option("password", "<your-password>")
.option("jdbcurl", "jdbc:postgresql://<host>:<port>/test_db")
.option("table", "customer_holo_table")
.mode("append")
.save();
}
}Add the following Maven dependency for Spark SQL:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>Python
from pyspark.sql.types import *
# Define the CSV schema
schema = StructType([
StructField("c_custkey", LongType()),
StructField("c_name", StringType()),
StructField("c_address", StringType()),
StructField("c_nationkey", IntegerType()),
StructField("c_phone", StringType()),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType()),
StructField("c_comment", StringType())
])
# Read from CSV
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=",")
# Write to Hologres
csvDf.write.format("hologres") \
.option("username", "<your-username>") \
.option("password", "<your-password>") \
.option("jdbcurl", "jdbc:postgresql://<host>:<port>/test_db") \
.option("table", "customer_holo_table") \
.mode("append") \
.save()To run this example:
# Load the connector
spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
# In the Scala REPL, load the file (use the absolute path for local testing)
scala> :load D:/sparktest.scalaYou can also paste the code directly after loading the connector.
To run this example, package your code with Maven (for example, as spark_test.jar) and submit it:
spark-submit --class SparkTest \
--jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
D:\spark_test.jarTo run this example:
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jarThen paste the code directly into the PySpark shell.
Tip: To confirm write performance, open the Spark UI while the job runs. Monitor the number of active tasks — this reflects the actual write parallelism and helps you tune spark.executor.instances or shard count accordingly.Read data from Hologres
Read support requires connector version 1.3.2 or later. Version 1.5.0 introduced predicate pushdown, LIMIT pushdown, column pruning, and the read.query option.
Read using Spark SQL
Connector versions earlier than 1.5.2 do not support Hologres Catalog. Use the temporary view approach instead.
Initialize the Hologres Catalog (same command as in Initialize a Hologres Catalog).
Read data using one of the following approaches:
Read using Catalog:
-- Load the Hologres Catalog
USE hologres_external_test_db;
-- Column pruning and predicate pushdown are applied automatically
SELECT c_custkey, c_name, c_phone
FROM public.customer_holo_table
WHERE c_custkey < 500
LIMIT 10;CREATE TEMPORARY VIEW(table)
-- Declare a Hologres table as a temporary view
CREATE TEMPORARY VIEW hologresTable
USING hologres OPTIONS (
jdbcurl "jdbc:postgresql://<host>:<port>/test_db",
username "<your-username>",
password "<your-password>",
read.max_task_count "80", -- Maximum concurrent read tasks
table "customer_holo_table"
);
-- Column pruning and predicate pushdown are applied automatically
SELECT c_custkey, c_name, c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;CREATE TEMPORARY VIEW(query)
-- Pass a Hologres SELECT query directly
CREATE TEMPORARY VIEW hologresTable
USING hologres OPTIONS (
jdbcurl "jdbc:postgresql://<host>:<port>/test_db",
username "<your-username>",
password "<your-password>",
read.query "select c_custkey, c_name, c_phone from customer_holo_table where c_custkey < 500 limit 10"
);
SELECT * FROM hologresTable LIMIT 5;When usingread.query, only a single task is used for reading. Predicate pushdown is not supported. Use thetableoption for concurrent reads.
Read into a DataFrame
The following examples read a Hologres table into a DataFrame for downstream processing.
Scala
val readDf = spark.read
.format("hologres")
.option("username", "<your-username>")
.option("password", "<your-password>")
.option("jdbcurl", "jdbc:postgresql://<host>:<port>/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // Maximum concurrent read tasks
.load()
.filter("c_custkey < 500")
readDf.select("c_custkey", "c_name", "c_phone").show(10)Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSelect {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Spark Hologres Read Example")
.master("local[*]")
.getOrCreate();
Dataset<Row> readDf = spark.read()
.format("hologres")
.option("username", "<your-username>")
.option("password", "<your-password>")
.option("jdbcurl", "jdbc:postgresql://<host>:<port>/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // Maximum concurrent read tasks
.load()
.filter("c_custkey < 500");
readDf.select("c_custkey", "c_name", "c_phone").show(10);
}
}Add the same Maven dependency as in the write example (Spark SQL spark-sql_2.13 version 3.5.4). To submit the job:
spark-submit --class SparkSelect \
--jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
D:\spark_select.jarPython
readDf = spark.read.format("hologres") \
.option("username", "<your-username>") \
.option("password", "<your-password>") \
.option("jdbcurl", "jdbc:postgresql://<host>:<port>/test_db") \
.option("table", "customer_holo_table") \
.option("read.max_task_count", "80") \
.load()
readDf.select("c_custkey", "c_name", "c_phone").show(10)To run this example:
# Load the connector
spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
# In the Scala REPL, load the file (use the absolute path for local testing)
scala> :load D:/sparkselect.scalaTo run this example:
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jarTip: To verify that predicate pushdown and column pruning are active, check the Spark UI during the read job. The Stages tab shows the number of tasks created — if pushdown is effective, fewer rows are transferred from Hologres. You can also confirm this by reviewing the Hologres query logs in the Hologres Management Console.
Parameter reference
General parameters
| Parameter | Default | Required | Description |
|---|---|---|---|
username | — | Yes | Your AccessKey ID, or the name of a Hologres database user. To get your AccessKey ID, go to AccessKey Management. |
password | — | Yes | Your AccessKey secret, or the password of the database user. See Create an AccessKey. |
table | — | Yes | The Hologres table to read from or write to. When reading, you can use read.query instead. |
jdbcurl | — | Yes | The JDBC URL for the Hologres real-time data API. Format: jdbc:postgresql://<host>:<port>/<db_name>. Find the host and port under Network Information on the Instance Details page. |
enable_serverless_computing | false | No | Use Serverless Computing resources. Valid for reads and for writes in bulk_load mode. See Serverless Computing. |
serverless_computing_query_priority | 3 | No | Execution priority for Serverless Computing. |
statement_timeout_seconds | 28800 (8 hours) | No | Timeout for query execution, in seconds. |
retry_count | 3 | No | Number of retries on connection failure. |
direct_connect | Enabled if supported | No | The connector tests whether the environment supports a direct connection to the Hologres frontend node. If supported, direct connection is used by default (reduces network bottleneck). Set to false to disable. |
Write parameters
The connector supports Spark's SaveMode. In SQL, this maps to INSERT INTO or INSERT OVERWRITE. In DataFrame, set SaveMode to Append or Overwrite. Overwrite creates a temporary table for writing and replaces the original after success — use with caution.
| Parameter | Previous name | Default | Required | Description |
|---|---|---|---|---|
write.mode | copy_write_mode | auto | No | Write mode. See Write mode selection. |
write.copy.max_buffer_size | max_cell_buffer_size | 52428800 (50 MB) | No | Maximum local buffer size for COPY mode. Increase this if writes fail due to very large field values. |
write.copy.dirty_data_check | copy_write_dirty_data_check | false | No | Enable dirty data validation. When enabled, the connector identifies the exact row that failed. This reduces write performance — enable only for troubleshooting. |
write.on_conflict_action | write_mode | INSERT_OR_REPLACE | No | Conflict resolution for tables with a primary key: INSERT_OR_IGNORE (skip conflicting rows), INSERT_OR_UPDATE (update conflicting columns), or INSERT_OR_REPLACE (replace all columns). |
The following parameters apply only when write.mode is insert:
| Parameter | Previous name | Default | Required | Description |
|---|---|---|---|---|
write.insert.dynamic_partition | dynamic_partition | false | No | Automatically create non-existent partitions when writing to the parent table of a partitioned table. |
write.insert.batch_size | write_batch_size | 512 | No | Maximum number of PUT operations per write thread before committing a batch. |
write.insert.batch_byte_size | write_batch_byte_size | 2097152 (2 MB) | No | Maximum batch size per write thread in bytes. A batch commits when this threshold is reached. |
write.insert.max_interval_ms | write_max_interval_ms | 10000 | No | A batch commits if this many milliseconds have passed since the last commit. |
write.insert.thread_size | write_thread_size | 1 | No | Number of concurrent write threads. Each thread uses one JDBC connection. |
Write mode selection
The write.mode=auto setting selects the best mode based on your Hologres instance version and table schema:
Hologres > V2.2.25 and table has a primary key →
bulk_load_on_conflictHologres > V2.1.0 and table has no primary key →
bulk_loadHologres > V1.3 →
streamOtherwise →
insert
| Mode | Description | Requirements |
|---|---|---|
auto | Automatically selects the optimal mode (recommended) | — |
stream | Streaming COPY using Fixed Plan. Lower latency and lower memory usage than insert. | Connector 1.3.0+, Hologres V1.3.34+ |
bulk_load | Batch COPY. Lower instance load than stream at the same throughput. Supports tables without a primary key only. | Connector 1.4.2+, Hologres V2.1.0+ |
bulk_load_on_conflict | Batch COPY with primary key conflict handling. Redistributes data by distribution key so each Spark task writes to a single shard, reducing lock contention. Tests show that repartitioning data before concurrent writing can reduce instance load by ~67% compared to stream. | Connector 1.4.2+, Hologres V2.2.25+ |
insert | Standard INSERT. | — |
Read parameters
| Parameter | Previous name | Default | Required | Description |
|---|---|---|---|---|
read.mode | bulk_read | auto | No | Read mode. See Read mode selection. |
read.max_task_count | max_partition_count | 80 | No | Maximum number of concurrent Spark read tasks. The actual task count is min(ShardCount, read.max_task_count). |
read.copy.max_buffer_size | — | 52428800 (50 MB) | No | Maximum local buffer size for COPY mode. Increase this if reads fail with large fields. |
read.push_down_predicate | push_down_predicate | true | No | Enable predicate pushdown (filter conditions and column pruning). |
read.push_down_limit | push_down_limit | true | No | Enable LIMIT pushdown. |
read.select.batch_size | scan_batch_size | 256 | No | Rows fetched per scan batch. Applies only when read.mode=select. |
read.select.timeout_seconds | scan_timeout_seconds | 60 | No | Scan timeout in seconds. Applies only when read.mode=select. |
read.query | query | — | No | Read using a Hologres SELECT query instead of a table. Cannot be used together with table. When set, only a single task is used and predicate pushdown is not supported. |
Read mode selection
The read.mode=auto setting selects the best mode based on your setup:
Fields include JSONB type →
selectInstance version > 3.0.24 →
bulk_read_compressedOtherwise →
bulk_read
| Mode | Description |
|---|---|
auto | Automatically selects the optimal mode (recommended) |
bulk_read | Reads in Apache Arrow format via COPY OUT. Several times faster than select. Does not support JSONB. |
bulk_read_compressed | Reads compressed Arrow format via COPY OUT. Saves ~45% bandwidth compared to bulk_read. |
select | Reads using standard SELECT. Supports all data types including JSONB. |
Data type mapping
| Spark type | Hologres type |
|---|---|
ShortType | SMALLINT |
IntegerType | INT |
LongType | BIGINT |
StringType | TEXT, JSON, JSONB |
DecimalType | NUMERIC(38, 18) |
BooleanType | BOOL |
DoubleType | DOUBLE PRECISION |
FloatType | FLOAT |
TimestampType | TIMESTAMPTZ |
DateType | DATE |
BinaryType | BYTEA, ROARINGBITMAP |
ArrayType(IntegerType) | INT4[] |
ArrayType(LongType) | INT8[] |
ArrayType(FloatType) | FLOAT4[] |
ArrayType(DoubleType) | FLOAT8[] |
ArrayType(BooleanType) | BOOLEAN[] |
ArrayType(StringType) | TEXT[] |
Connection count estimation
The connector uses JDBC connections for reads and writes. The number of connections scales with Spark parallelism (visible on the Spark UI during job execution).
| Operation | Connections used |
|---|---|
| Query metadata from Catalog | 1 |
| Read data | parallelism × 1 + 1 |
| Write in COPY mode | parallelism × 1 + 1 |
| Write in INSERT mode | parallelism × write.insert.thread_size + 1 |
This calculation assumes the number of concurrently running Spark tasks exceeds the number of tasks generated by the job. The actual parallelism may be affected by parameters such as spark.executor.instances and Hadoop file splitting policies. For more information, see Apache Hadoop documentation.