All Products
Search
Document Center

Hologres:Access Hologres via Spark

Last Updated:Mar 26, 2026

Apache Spark integrates with Hologres through the Hologres Spark connector, which outperforms native JDBC by reading data concurrently across table shards and writing in batch COPY mode. Use this connector to build high-throughput data pipelines between your Spark cluster and Hologres.

Compatibility

Spark connectorSparkHologres instance
1.5.2 (this guide)3.3.0 or later1.3 or later

To check your instance version, open the Hologres Management Console and go to the Instance Details page. If your instance is earlier than 1.3, upgrade the instance or join the Hologres DingTalk group (group number 32314975) to request an upgrade.

How it works

The connector maps Hologres concepts to Spark concepts as follows:

  • One Hologres Catalog in Spark = one Hologres database

  • One namespace in the Hologres Catalog = one schema in that database

For reads, the connector splits the Hologres table into partitions based on shard count and reads them concurrently. The number of Spark tasks is min(ShardCount, read.max_task_count). Starting from connector version 1.5.0, predicate pushdown, LIMIT pushdown, and column pruning are applied automatically, and batch mode reading improves throughput by 3–4x over older versions.

For writes, the connector uses COPY-based modes by default, which provide higher throughput and lower memory usage than INSERT. The write.mode=auto setting picks the optimal mode based on your Hologres version and table schema — you rarely need to override it.

Understanding these read and write modes helps you interpret parameter choices in the Parameter reference section.

Hologres Catalog does not support creating tables.

Prerequisites

Before you begin, ensure that you have:

  • A Spark environment that supports spark-sql, spark-shell, or pyspark commands (Spark 3.3.0 or later)

  • The hologres-connector-spark-3.x JAR (version 1.5.2): download from Maven Central Repository. The connector is open source: Hologres-Connectors on GitHub

  • Your Hologres JDBC URL, username (AccessKey ID or database user), and password (AccessKey secret or user password). Find the host and port on the Instance Details page under Network Information in the Hologres Management Console

If you develop Spark jobs in Java and debug locally (for example, in IntelliJ IDEA), add the following to your pom.xml:

<dependency>
    <groupId>com.alibaba.hologres</groupId>
    <artifactId>hologres-connector-spark-3.x</artifactId>
    <version>1.5.2</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>

Use Hologres Catalog

The Hologres Catalog (supported from connector version 1.5.2) lets you browse and query Hologres tables using standard Spark SQL, without manually declaring each table.

This guide uses a Hologres instance with the following structure:

test_db                      -- database
  public.test_table1         -- table in the public schema
  public.test_table2
  test_schema.test_table3    -- table in the test_schema schema

Initialize a Hologres Catalog

Start spark-sql, load the connector JAR, and specify the Catalog configuration:

spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
  --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
  --conf spark.sql.catalog.hologres_external_test_db.username=<your-username> \
  --conf spark.sql.catalog.hologres_external_test_db.password=<your-password> \
  --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://<host>:<port>/test_db

Common Catalog commands

Load a Catalog — a Catalog maps to exactly one Hologres database:

USE hologres_external_test_db;

List namespaces — each namespace corresponds to a Hologres schema. The default is public:

SHOW NAMESPACES;

List tables in a namespace:

-- List tables in the current namespace
SHOW TABLES;

-- Or list tables in a specific namespace
USE test_schema;
SHOW TABLES;

-- Alternatively, without switching namespaces
SHOW TABLES IN test_schema;

Read and write tables:

-- Read from a table
SELECT * FROM public.test_table1;

-- Write from one table to another
INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;

Write data to Hologres

This section uses the customer table from the TPC-H dataset. Download the sample data, then create the target table in Hologres:

CREATE TABLE customer_holo_table
(
  c_custkey    BIGINT,
  c_name       TEXT,
  c_address    TEXT,
  c_nationkey  INT,
  c_phone      TEXT,
  c_acctbal    DECIMAL(15,2),
  c_mktsegment TEXT,
  c_comment    TEXT
);

Write using Spark SQL

Connector versions earlier than 1.5.2 do not support Hologres Catalog. Use the temporary view approach instead.
  1. Initialize the Hologres Catalog (same command as in Initialize a Hologres Catalog).

  2. Load the Catalog, create a temporary view for the CSV source, and write to Hologres:

Write using CATALOG

-- Load the Hologres Catalog
USE hologres_external_test_db;

-- Create a CSV data source
CREATE TEMPORARY VIEW csvTable (
    c_custkey    BIGINT,
    c_name       STRING,
    c_address    STRING,
    c_nationkey  INT,
    c_phone      STRING,
    c_acctbal    DECIMAL(15, 2),
    c_mktsegment STRING,
    c_comment    STRING)
USING csv OPTIONS (
    path "resources/customer", sep ","  -- Use the absolute path for local testing
);

-- Write data from the CSV source to Hologres
INSERT INTO public.customer_holo_table SELECT * FROM csvTable;

Write using TEMPORARY VIEW

-- Create a CSV data source
CREATE TEMPORARY VIEW csvTable (
    c_custkey    BIGINT,
    c_name       STRING,
    c_address    STRING,
    c_nationkey  INT,
    c_phone      STRING,
    c_acctbal    DECIMAL(15, 2),
    c_mktsegment STRING,
    c_comment    STRING)
USING csv OPTIONS (
    path "resources/customer", sep ","
);

-- Declare a Hologres temporary view with only the target columns
CREATE TEMPORARY VIEW hologresTable (
    c_custkey BIGINT,
    c_name    STRING,
    c_phone   STRING)
USING hologres OPTIONS (
    jdbcurl  "jdbc:postgresql://<host>:<port>/test_db",
    username "<your-username>",
    password "<your-password>",
    table    "customer_holo_table"
);

-- Write only the selected columns
INSERT INTO hologresTable SELECT c_custkey, c_name, c_phone FROM csvTable;

If you want to write only specific columns, declare a Hologres temporary view with just those columns:

The Spark INSERT INTO syntax does not support specifying a column list directly (for example, INSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTable). Declaring a temporary view with only the target columns is the standard workaround.

Write using the DataFrame API

Use the DataFrame write interface when developing Spark jobs with spark-shell, PySpark, or other tools.

All three examples below read CSV data into a DataFrame and write it to Hologres using .format("hologres").

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode

// Define the CSV schema
val schema = StructType(Array(
  StructField("c_custkey",    LongType),
  StructField("c_name",       StringType),
  StructField("c_address",    StringType),
  StructField("c_nationkey",  IntegerType),
  StructField("c_phone",      StringType),
  StructField("c_acctbal",    DecimalType(15, 2)),
  StructField("c_mktsegment", StringType),
  StructField("c_comment",    StringType)
))

// Read from CSV
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")

// Write to Hologres
csvDf.write
  .format("hologres")
  .option("username", "<your-username>")
  .option("password", "<your-password>")
  .option("jdbcurl",  "jdbc:postgresql://<host>:<port>/test_db")
  .option("table",    "customer_holo_table")
  .mode(SaveMode.Append)
  .save()

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;

public class SparkTest {
    public static void main(String[] args) {
        // Define the CSV schema
        List<StructField> fields = Arrays.asList(
            DataTypes.createStructField("c_custkey",    DataTypes.LongType,            true),
            DataTypes.createStructField("c_name",       DataTypes.StringType,          true),
            DataTypes.createStructField("c_address",    DataTypes.StringType,          true),
            DataTypes.createStructField("c_nationkey",  DataTypes.IntegerType,         true),
            DataTypes.createStructField("c_phone",      DataTypes.StringType,          true),
            DataTypes.createStructField("c_acctbal",    new DecimalType(15, 2),        true),
            DataTypes.createStructField("c_mktsegment", DataTypes.StringType,          true),
            DataTypes.createStructField("c_comment",    DataTypes.StringType,          true)
        );
        StructType schema = DataTypes.createStructType(fields);

        SparkSession spark = SparkSession.builder()
            .appName("Spark Hologres Write Example")
            .master("local[*]")
            .getOrCreate();

        // Read from CSV (use the absolute path for local testing)
        Dataset<Row> csvDf = spark.read()
            .format("csv").schema(schema).option("sep", ",")
            .load("resources/customer");

        // Write to Hologres
        csvDf.write()
            .format("hologres")
            .option("username", "<your-username>")
            .option("password", "<your-password>")
            .option("jdbcurl",  "jdbc:postgresql://<host>:<port>/test_db")
            .option("table",    "customer_holo_table")
            .mode("append")
            .save();
    }
}

Add the following Maven dependency for Spark SQL:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.13</artifactId>
    <version>3.5.4</version>
    <scope>provided</scope>
</dependency>

Python

from pyspark.sql.types import *

# Define the CSV schema
schema = StructType([
    StructField("c_custkey",    LongType()),
    StructField("c_name",       StringType()),
    StructField("c_address",    StringType()),
    StructField("c_nationkey",  IntegerType()),
    StructField("c_phone",      StringType()),
    StructField("c_acctbal",    DecimalType(15, 2)),
    StructField("c_mktsegment", StringType()),
    StructField("c_comment",    StringType())
])

# Read from CSV
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=",")

# Write to Hologres
csvDf.write.format("hologres") \
    .option("username", "<your-username>") \
    .option("password", "<your-password>") \
    .option("jdbcurl",  "jdbc:postgresql://<host>:<port>/test_db") \
    .option("table",    "customer_holo_table") \
    .mode("append") \
    .save()

To run this example:

# Load the connector
spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

# In the Scala REPL, load the file (use the absolute path for local testing)
scala> :load D:/sparktest.scala

You can also paste the code directly after loading the connector.

To run this example, package your code with Maven (for example, as spark_test.jar) and submit it:

spark-submit --class SparkTest \
  --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
  D:\spark_test.jar

To run this example:

pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

Then paste the code directly into the PySpark shell.

Tip: To confirm write performance, open the Spark UI while the job runs. Monitor the number of active tasks — this reflects the actual write parallelism and helps you tune spark.executor.instances or shard count accordingly.

Read data from Hologres

Read support requires connector version 1.3.2 or later. Version 1.5.0 introduced predicate pushdown, LIMIT pushdown, column pruning, and the read.query option.

Read using Spark SQL

Connector versions earlier than 1.5.2 do not support Hologres Catalog. Use the temporary view approach instead.
  1. Initialize the Hologres Catalog (same command as in Initialize a Hologres Catalog).

  2. Read data using one of the following approaches:

Read using Catalog:

-- Load the Hologres Catalog
USE hologres_external_test_db;

-- Column pruning and predicate pushdown are applied automatically
SELECT c_custkey, c_name, c_phone
FROM public.customer_holo_table
WHERE c_custkey < 500
LIMIT 10;

CREATE TEMPORARY VIEW(table)

-- Declare a Hologres table as a temporary view
CREATE TEMPORARY VIEW hologresTable
USING hologres OPTIONS (
  jdbcurl           "jdbc:postgresql://<host>:<port>/test_db",
  username          "<your-username>",
  password          "<your-password>",
  read.max_task_count "80",  -- Maximum concurrent read tasks
  table             "customer_holo_table"
);

-- Column pruning and predicate pushdown are applied automatically
SELECT c_custkey, c_name, c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;

CREATE TEMPORARY VIEW(query)

-- Pass a Hologres SELECT query directly
CREATE TEMPORARY VIEW hologresTable
USING hologres OPTIONS (
  jdbcurl    "jdbc:postgresql://<host>:<port>/test_db",
  username   "<your-username>",
  password   "<your-password>",
  read.query "select c_custkey, c_name, c_phone from customer_holo_table where c_custkey < 500 limit 10"
);

SELECT * FROM hologresTable LIMIT 5;
When using read.query, only a single task is used for reading. Predicate pushdown is not supported. Use the table option for concurrent reads.

Read into a DataFrame

The following examples read a Hologres table into a DataFrame for downstream processing.

Scala

val readDf = spark.read
  .format("hologres")
  .option("username",           "<your-username>")
  .option("password",           "<your-password>")
  .option("jdbcurl",            "jdbc:postgresql://<host>:<port>/test_db")
  .option("table",              "customer_holo_table")
  .option("read.max_task_count", "80")  // Maximum concurrent read tasks
  .load()
  .filter("c_custkey < 500")

readDf.select("c_custkey", "c_name", "c_phone").show(10)

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSelect {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
            .appName("Spark Hologres Read Example")
            .master("local[*]")
            .getOrCreate();

        Dataset<Row> readDf = spark.read()
            .format("hologres")
            .option("username",            "<your-username>")
            .option("password",            "<your-password>")
            .option("jdbcurl",             "jdbc:postgresql://<host>:<port>/test_db")
            .option("table",              "customer_holo_table")
            .option("read.max_task_count", "80")  // Maximum concurrent read tasks
            .load()
            .filter("c_custkey < 500");

        readDf.select("c_custkey", "c_name", "c_phone").show(10);
    }
}

Add the same Maven dependency as in the write example (Spark SQL spark-sql_2.13 version 3.5.4). To submit the job:

spark-submit --class SparkSelect \
  --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
  D:\spark_select.jar

Python

readDf = spark.read.format("hologres") \
    .option("username",            "<your-username>") \
    .option("password",            "<your-password>") \
    .option("jdbcurl",             "jdbc:postgresql://<host>:<port>/test_db") \
    .option("table",               "customer_holo_table") \
    .option("read.max_task_count", "80") \
    .load()

readDf.select("c_custkey", "c_name", "c_phone").show(10)

To run this example:

# Load the connector
spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

# In the Scala REPL, load the file (use the absolute path for local testing)
scala> :load D:/sparkselect.scala

To run this example:

pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
Tip: To verify that predicate pushdown and column pruning are active, check the Spark UI during the read job. The Stages tab shows the number of tasks created — if pushdown is effective, fewer rows are transferred from Hologres. You can also confirm this by reviewing the Hologres query logs in the Hologres Management Console.

Parameter reference

General parameters

ParameterDefaultRequiredDescription
usernameYesYour AccessKey ID, or the name of a Hologres database user. To get your AccessKey ID, go to AccessKey Management.
passwordYesYour AccessKey secret, or the password of the database user. See Create an AccessKey.
tableYesThe Hologres table to read from or write to. When reading, you can use read.query instead.
jdbcurlYesThe JDBC URL for the Hologres real-time data API. Format: jdbc:postgresql://<host>:<port>/<db_name>. Find the host and port under Network Information on the Instance Details page.
enable_serverless_computingfalseNoUse Serverless Computing resources. Valid for reads and for writes in bulk_load mode. See Serverless Computing.
serverless_computing_query_priority3NoExecution priority for Serverless Computing.
statement_timeout_seconds28800 (8 hours)NoTimeout for query execution, in seconds.
retry_count3NoNumber of retries on connection failure.
direct_connectEnabled if supportedNoThe connector tests whether the environment supports a direct connection to the Hologres frontend node. If supported, direct connection is used by default (reduces network bottleneck). Set to false to disable.

Write parameters

The connector supports Spark's SaveMode. In SQL, this maps to INSERT INTO or INSERT OVERWRITE. In DataFrame, set SaveMode to Append or Overwrite. Overwrite creates a temporary table for writing and replaces the original after success — use with caution.

ParameterPrevious nameDefaultRequiredDescription
write.modecopy_write_modeautoNoWrite mode. See Write mode selection.
write.copy.max_buffer_sizemax_cell_buffer_size52428800 (50 MB)NoMaximum local buffer size for COPY mode. Increase this if writes fail due to very large field values.
write.copy.dirty_data_checkcopy_write_dirty_data_checkfalseNoEnable dirty data validation. When enabled, the connector identifies the exact row that failed. This reduces write performance — enable only for troubleshooting.
write.on_conflict_actionwrite_modeINSERT_OR_REPLACENoConflict resolution for tables with a primary key: INSERT_OR_IGNORE (skip conflicting rows), INSERT_OR_UPDATE (update conflicting columns), or INSERT_OR_REPLACE (replace all columns).

The following parameters apply only when write.mode is insert:

ParameterPrevious nameDefaultRequiredDescription
write.insert.dynamic_partitiondynamic_partitionfalseNoAutomatically create non-existent partitions when writing to the parent table of a partitioned table.
write.insert.batch_sizewrite_batch_size512NoMaximum number of PUT operations per write thread before committing a batch.
write.insert.batch_byte_sizewrite_batch_byte_size2097152 (2 MB)NoMaximum batch size per write thread in bytes. A batch commits when this threshold is reached.
write.insert.max_interval_mswrite_max_interval_ms10000NoA batch commits if this many milliseconds have passed since the last commit.
write.insert.thread_sizewrite_thread_size1NoNumber of concurrent write threads. Each thread uses one JDBC connection.

Write mode selection

The write.mode=auto setting selects the best mode based on your Hologres instance version and table schema:

  1. Hologres > V2.2.25 and table has a primary key → bulk_load_on_conflict

  2. Hologres > V2.1.0 and table has no primary key → bulk_load

  3. Hologres > V1.3 → stream

  4. Otherwise → insert

ModeDescriptionRequirements
autoAutomatically selects the optimal mode (recommended)
streamStreaming COPY using Fixed Plan. Lower latency and lower memory usage than insert.Connector 1.3.0+, Hologres V1.3.34+
bulk_loadBatch COPY. Lower instance load than stream at the same throughput. Supports tables without a primary key only.Connector 1.4.2+, Hologres V2.1.0+
bulk_load_on_conflictBatch COPY with primary key conflict handling. Redistributes data by distribution key so each Spark task writes to a single shard, reducing lock contention. Tests show that repartitioning data before concurrent writing can reduce instance load by ~67% compared to stream.Connector 1.4.2+, Hologres V2.2.25+
insertStandard INSERT.

Read parameters

ParameterPrevious nameDefaultRequiredDescription
read.modebulk_readautoNoRead mode. See Read mode selection.
read.max_task_countmax_partition_count80NoMaximum number of concurrent Spark read tasks. The actual task count is min(ShardCount, read.max_task_count).
read.copy.max_buffer_size52428800 (50 MB)NoMaximum local buffer size for COPY mode. Increase this if reads fail with large fields.
read.push_down_predicatepush_down_predicatetrueNoEnable predicate pushdown (filter conditions and column pruning).
read.push_down_limitpush_down_limittrueNoEnable LIMIT pushdown.
read.select.batch_sizescan_batch_size256NoRows fetched per scan batch. Applies only when read.mode=select.
read.select.timeout_secondsscan_timeout_seconds60NoScan timeout in seconds. Applies only when read.mode=select.
read.queryqueryNoRead using a Hologres SELECT query instead of a table. Cannot be used together with table. When set, only a single task is used and predicate pushdown is not supported.

Read mode selection

The read.mode=auto setting selects the best mode based on your setup:

  1. Fields include JSONB type → select

  2. Instance version > 3.0.24 → bulk_read_compressed

  3. Otherwise → bulk_read

ModeDescription
autoAutomatically selects the optimal mode (recommended)
bulk_readReads in Apache Arrow format via COPY OUT. Several times faster than select. Does not support JSONB.
bulk_read_compressedReads compressed Arrow format via COPY OUT. Saves ~45% bandwidth compared to bulk_read.
selectReads using standard SELECT. Supports all data types including JSONB.

Data type mapping

Spark typeHologres type
ShortTypeSMALLINT
IntegerTypeINT
LongTypeBIGINT
StringTypeTEXT, JSON, JSONB
DecimalTypeNUMERIC(38, 18)
BooleanTypeBOOL
DoubleTypeDOUBLE PRECISION
FloatTypeFLOAT
TimestampTypeTIMESTAMPTZ
DateTypeDATE
BinaryTypeBYTEA, ROARINGBITMAP
ArrayType(IntegerType)INT4[]
ArrayType(LongType)INT8[]
ArrayType(FloatType)FLOAT4[]
ArrayType(DoubleType)FLOAT8[]
ArrayType(BooleanType)BOOLEAN[]
ArrayType(StringType)TEXT[]

Connection count estimation

The connector uses JDBC connections for reads and writes. The number of connections scales with Spark parallelism (visible on the Spark UI during job execution).

OperationConnections used
Query metadata from Catalog1
Read dataparallelism × 1 + 1
Write in COPY modeparallelism × 1 + 1
Write in INSERT modeparallelism × write.insert.thread_size + 1

This calculation assumes the number of concurrently running Spark tasks exceeds the number of tasks generated by the job. The actual parallelism may be affected by parameters such as spark.executor.instances and Hadoop file splitting policies. For more information, see Apache Hadoop documentation.

See also