All Products
Search
Document Center

Hologres:Read from and write to Hologres by using Spark

Last Updated:Feb 04, 2026

Spark is a unified analytics engine for large-scale data processing. Hologres integrates tightly with the community edition of Spark and EMR Spark to help you quickly build a data warehouse. The Spark connector provided by Hologres supports creating a Hologres Catalog in a Spark cluster, enabling high-performance batch reads and imports using foreign tables. This approach delivers better performance than native Java Database Connectivity (JDBC).

Limits

Only Hologres instances version 1.3 or later support the Spark connector. You can view your current instance version on the Instance Details page in the Hologres Management Console. If your instance runs a version earlier than 1.3, use Instance Upgrade or join the Hologres communication group by searching for DingTalk group number 32314975 to request an upgrade.

Preparations

  • Install a compatible Spark environment where you can run `spark-sql`, `spark-shell`, or `pyspark` commands. Use Spark 3.3.0 or later to avoid dependency issues and access more features.

    • Use Alibaba Cloud EMR Spark to quickly build a Spark environment and connect to your Hologres instance. For more information, see EMR Spark features.

    • You can also build a Spark environment in your preferred setup. For more information, see Apache Spark.

  • To read from or write to Hologres using Spark, reference the connector JAR package hologres-connector-spark-3.x. This topic uses version 1.5.2 of the Spark connector. You can download it from the Maven Central Repository. All connector resources are open source. For more information, see Hologres-Connectors.

  • If you develop Spark jobs in Java and debug them locally in a tool such as IntelliJ IDEA, add the following Maven dependency to your pom.xml file.

    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>hologres-connector-spark-3.x</artifactId>
        <version>1.5.2</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>

Hologres Catalog

Starting from version 1.5.2, the Spark connector supports Hologres Catalog, allowing you to conveniently read from and write to Hologres using foreign tables.

In Spark, each Hologres Catalog corresponds to a database in Hologres, and each namespace in a Hologres Catalog corresponds to a schema in that database. The following section shows how to use Hologres Catalog in Spark.

Note

Hologres Catalog does not support creating tables.

This topic uses a Hologres instance with the following database and table names:

test_db --database
  public.test_table1 --table in the public schema
  public.test_table2
  test_schema.test_table3  -- table in the test_schema schema

Initialize a Hologres Catalog

In your Spark cluster, start spark-sql, load the Hologres connector, and specify the Catalog parameters.

spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db

Common commands for Hologres Catalog

  • Load a Hologres Catalog

    A Hologres Catalog in Spark corresponds exactly to a Hologres database and cannot be changed during use.

    USE hologres_external_test_db;
  • Query all namespaces.

    A namespace in Spark corresponds to a schema in Hologres. The default schema is `public`. You can use the USE instruction to change the default schema.

    -- View all namespaces in the Hologres Catalog. This corresponds to all schemas in Hologres.
    SHOW NAMESPACES;
  • Query tables in a namespace.

    • Query all tables.

      SHOW TABLES;
    • Query tables in a specific namespace.

      USE test_schema;
      SHOW TABLES;
      
      -- Or use 
      SHOW TABLES IN test_schema;
  • Read from and write to tables.

    Use SELECT and INSERT statements to read from and write to Hologres foreign tables in the Catalog.

    -- Read from a table.
    SELECT * FROM public.test_table1;
    
    -- Write to a table.
    INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;

Import data into Hologres

This section uses the customer table from the TPC-H dataset as the source data for Hologres. Spark can read Hologres table data in CSV format. Download the customer data. The SQL statement to create the customer table schema is as follows.

CREATE TABLE customer_holo_table
(
  c_custkey    BIGINT ,
  c_name       TEXT   ,
  c_address    TEXT   ,
  c_nationkey  INT    ,
  c_phone      TEXT   ,
  c_acctbal    DECIMAL(15,2) ,
  c_mktsegment TEXT   ,
  c_comment    TEXT
);

Import data using Spark-SQL

When using Spark-SQL, it is more convenient to load the metadata of a Hologres table using a Catalog. You can also declare a Hologres table by creating a temporary table.

Note
  • Hologres-Connector-Spark versions earlier than 1.5.2 do not support catalogs. You must declare a Hologres table by creating a temporary table.

  • For more information about the parameters for Hologres-Connector-Spark, see Parameter description.

  1. Initialize the Hologres Catalog.

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. Import data from the source CSV table to the Hologres foreign table.

    Note

    The Spark INSERT INTO syntax does not support specifying a partial list of columns for writing using column_list. For example, you cannot use INSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTable to write only the `c_custkey` field.

    If you want to write only specific fields, declare a Hologres temporary table with only those fields using CREATE TEMPORARY VIEW.

    Write using CATALOG

    -- Load the Hologres Catalog.
    USE hologres_external_test_db;
    
    -- Create a CSV data source.
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep "," -- For local testing, use the absolute path of the file.
    );
    
    -- Write data from the CSV table to Hologres.
    INSERT INTO public.customer_holo_table SELECT * FROM csvTable;

    Write using TEMPORARY VIEW

    -- Create a CSV data source.
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep ","
    );
    
    -- Create a Hologres temporary table.
    CREATE TEMPORARY VIEW hologresTable (
        c_custkey BIGINT,
        c_name STRING,
        c_phone STRING)
    USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        table "customer_holo_table"
    );
    
    INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;

Import data using DataFrame

When you develop Spark jobs using spark-shell, pyspark, or other tools, you can also call the DataFrame `write` interface to write data. Different development languages convert the data read from a CSV file into a DataFrame and then write it to a Hologres instance. The following are sample codes. For more information about the parameters for Hologres-Connector-Spark, see Parameter description.

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode

// Schema of the CSV source.
val schema = StructType(Array(
  StructField("c_custkey", LongType),
  StructField("c_name", StringType),
  StructField("c_address", StringType),
  StructField("c_nationkey", IntegerType),
  StructField("c_phone", StringType),
  StructField("c_acctbal", DecimalType(15, 2)),
  StructField("c_mktsegment", StringType),
  StructField("c_comment", StringType)
))

// Read data from the CSV file into a DataFrame.
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")

// Write the DataFrame to Hologres.
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;

public class SparkTest {
    public static void main(String[] args) {
        // Schema of the CSV source.
        List<StructField> asList =
                Arrays.asList(
                        DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
                        DataTypes.createStructField("c_name", DataTypes.StringType, true),
                        DataTypes.createStructField("c_address", DataTypes.StringType, true),
                        DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
                        DataTypes.createStructField("c_phone", DataTypes.StringType, true),
                        DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
                        DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
                        DataTypes.createStructField("c_comment", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(asList);

        // Run in local mode.
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();

        // Read data from the CSV file into a DataFrame.
        // For local testing, use the absolute path of the customer data.
        Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");

        // Write the DataFrame to Hologres.
        csvDf.write.format("hologres").option(
           "username", "***").option(
           "password", "***").option(
           "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
           "table", "customer_holo_table").mode(
           "append").save()
    }
}

The following configuration is required for the Maven file.

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>

Python

from pyspark.sql.types import *

# Schema of the CSV source.
schema = StructType([
    StructField("c_custkey", LongType()),
    StructField("c_name", StringType()),
    StructField("c_address", StringType()),
    StructField("c_nationkey", IntegerType()),
    StructField("c_phone", StringType()),
    StructField("c_acctbal", DecimalType(15, 2)),
    StructField("c_mktsegment", StringType()),
    StructField("c_comment", StringType())
])

# Read data from the CSV file into a DataFrame.
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')

# Write the DataFrame to Hologres.
csvDf.write.format("hologres").option(
    "username", "***").option(
    "password", "***").option(
    "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
    "table", "customer_holo_table").mode(
    "append").save()

To execute Spark jobs in different languages, perform the following operations:

  • Scala

    • You can use the sample code to generate a sparktest.scala file and run the job as follows.

      -- Load dependencies.
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- For local testing, use the absolute path to load the file.
      scala> :load D:/sparktest.scala
    • You can also paste the sample code directly after loading the dependencies to run it.

  • Java

    You can import the sample code into your development tool and use Maven to package it. For example, the package name could be spark_test.jar. Run the job with the following code.

    -- Use the absolute path for the job JAR package.
    spark-submit --class SparkTest  --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_test.jar
  • Python

    After running the following code, you can paste the sample code directly to run it.

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

Read data from Hologres

  • Starting from spark-connector version 1.3.2, you can read data from Hologres. Compared with Spark's default jdbc-connector, the spark-connector can read data concurrently based on the shards of the Hologres table, which provides better performance. The read concurrency depends on the number of shards in the table. The spark-connector can be limited by the read.max_task_count parameter. The job ultimately generates Min(shardCount, max_task_count) read tasks. It also supports schema inference. If you do not provide a schema, the connector infers the Spark-side schema from the Hologres table schema.

  • Starting from spark-connector version 1.5.0, reading from a Hologres table supports predicate pushdown, LIMIT pushdown, and column pruning. It also supports reading data by passing a Hologres SELECT QUERY. This version introduces batch mode reading, which improves read performance by three to four times compared with previous versions.

Read data using Spark-SQL

When using Spark-SQL, it is more convenient to load the metadata of a Hologres table using a Catalog. You can also declare a Hologres table by creating a temporary table.

Note
  • Hologres-Connector-Spark versions earlier than 1.5.2 do not support catalogs. You must declare a Hologres table by creating a temporary table.

  • For more information about the parameters for Hologres-Connector-Spark, see Parameter description.

  1. Initialize the Hologres Catalog.

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. Read data from Hologres.

    • Read data using a Catalog.

      -- Load the Hologres Catalog.
      USE hologres_external_test_db;
      
      -- Read from the Hologres table. Column pruning and predicate pushdown are supported.
      SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10;
    • Read data by creating a temporary table.

      CREATE TEMPORARY VIEW(table)

      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.max_task_count "80", // The maximum number of tasks to use for reading the Hologres table.
        table "customer_holo_table"
      );
      
      -- Column pruning and predicate pushdown are supported.
      SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;

      CREATE TEMPORARY VIEW(query)

      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.query "select c_custkey,c_name,c_phone from customer_holo_table where c_custkey < 500 limit 10"
      );
      
      SELECT * FROM hologresTable LIMIT 5;

Read Hologres data into a DataFrame

When you develop Spark jobs using spark-shell, pyspark, or other tools, you can call Spark's Read interface to read data into a DataFrame for subsequent calculations. The following are examples of reading a Hologres table into a DataFrame in different languages. For more information about the parameters for Hologres-Connector-Spark, see Parameter description.

Scala

val readDf = (
  spark.read
    .format("hologres")
    .option("username", "***")
    .option("password", "***")
    .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
    .option("table", "customer_holo_table")
    .option("read.max_task_count", "80") // The maximum number of tasks to use for reading the Hologres table.
    .load()
    .filter("c_custkey < 500")
)

readDf.select("c_custkey", "c_name", "c_phone").show(10)

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSelect {
    public static void main(String[] args) {
        
        // Run in local mode.
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();
                
        Dataset<Row> readDf = (
           spark.read
                .format("hologres")
                .option("username", "***")
                .option("password", "***")
                .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
                .option("table", "customer_holo_table")
                .option("read.max_task_count", "80") // The maximum number of tasks to use for reading the Hologres table.
                .load()
                .filter("c_custkey < 500")
        );
        readDf.select("c_custkey", "c_name", "c_phone").show(10);
    }
}

The following configuration is required for the Maven file.

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>

Python

readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()

readDf.select("c_custkey", "c_name", "c_phone").show(10)

To execute Spark jobs in different languages, perform the following operations:

  • Scala

    • You can use the sample code to generate a sparkselect.scala file and run the job as follows.

      -- Load dependencies.
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- For local testing, use the absolute path to load the file.
      scala> :load D:/sparkselect.scala
    • You can also paste the sample code directly after loading the dependencies to run it.

  • Java

    You can import the sample code into your development tool and use Maven to package it. For example, the package name could be spark_select.jar. Run the job with the following code.

    -- Use the absolute path for the job JAR package.
    spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_select.jar
  • Python

    After running the following code, you can paste the sample code directly to run it.

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

Parameter description

General parameters

Parameter

Default value

Required

Description

username

None

Yes

password

None

Yes

  • The AccessKey secret of your account. For more information, see Create an AccessKey.

  • The password for the user account you created.

table

None

Yes

The name of the Hologres table to read from or write to.

Note

When reading data, you can also use the read.query parameter instead.

jdbcurl

None

Yes

The JDBC URL for the Hologres real-time data API, in the format jdbc:postgresql://<host>:<port>/<db_name>. Go to the Hologres console, click Instances in the navigation pane on the left, select the target instance, and obtain the host and port number from the Network Information section on the Instance Details page.

enable_serverless_computing

false

No

Specifies whether to use Serverless resources. This parameter is valid only for read operations and write operations in bulk_load mode. For more information, see Serverless Computing Guide.

serverless_computing_query_priority

3

No

The execution priority for Serverless Computing.

statement_timeout_seconds

28800 (8 hours)

No

The timeout period for query execution, in seconds.

retry_count

3

No

The number of retries when a connection fails.

direct_connect

For environments that support direct connection, direct connection is used by default.

No

The bottleneck for batch data reads and writes is often the network throughput of the Endpoint. Therefore, the system tests whether the current environment can directly connect to the Hologres Frontend (access node). If a direct connection is supported, it is used by default. Set this parameter to false to disable direct connections.

Write parameters

The Hologres connector supports Spark's SaveMode parameter. For SQL, this means INSERT INTO or INSERT OVERWRITE. For DataFrame, this means setting SaveMode to Append or Overwrite when writing. Overwrite creates a temporary table for writing and replaces the original table after the write succeeds. Use this with caution.

Parameter name

Previous parameter name

Default value

Required

Description

write.mode

copy_write_mode

auto

No

The write mode. Valid values are as follows. For a comparison of write modes, see Comparison of batch write modes.

  • auto (default). The connector automatically selects the optimal mode based on the version and the metadata of the destination table. The selection logic is as follows:

    1. If the Hologres instance version is later than V2.2.25 and the table has a primary key, the bulk_load_on_conflict mode is selected.

    2. If the Hologres instance version is later than V2.1.0 and the table does not have a primary key, the bulk_load mode is selected.

    3. If the Hologres instance version is later than V1.3, the stream mode is selected.

    4. In other cases, the insert mode is selected.

  • stream, which uses Fixed Plan to accelerate SQL execution. In Fixed Plan, COPY is a feature introduced in Hologres V1.3. Compared with the INSERT method, the COPY method provides higher throughput because it uses a streaming mode, lower data latency, and lower client memory consumption because it does not accumulate batches.

    Note

    Requires Hologres Connector 1.3.0 or later and Hologres V1.3.34 or later.

  • bulk_load, which is batch COPY. Compared with the streaming COPY in Fixed Plan, batch COPY results in a lower load on the Hologres instance under the same records per second (RPS) conditions. However, it only supports writing to tables without a primary key.

    Note

    Requires Hologres Connector 1.4.2 or later and Hologres V2.1.0 or later.

  • bulk_load_on_conflict. When writing to a table with a primary key using batch COPY, this mode handles primary key conflicts. By default, batch data import into a Hologres table with a primary key triggers a table lock, which limits the ability of multiple connections to write concurrently. The connector supports redistributing data based on the distribution key of the destination Hologres table. This allows each Spark task to write data to only one shard, reducing the lock level from table to shard. This enables concurrent writing and improves write performance. Because each connection only needs to maintain data for a few shards, this optimization can also significantly reduce the number of small files and lower the memory usage of Hologres. Tests show that repartitioning data before concurrent writing can reduce the system load by about 67% compared with writing in stream mode.

    Note

    Requires Hologres Connector 1.4.2 or later and Hologres V2.2.25 or later.

  • insert. Writes data using the INSERT method.

write.copy.max_buffer_size

max_cell_buffer_size

52428800 (50 MB)

No

When writing in COPY mode, you usually do not need to adjust the maximum length of the local buffer. However, if a buffer overflow occurs when writing large fields, such as very long strings, you can increase this value.

write.copy.dirty_data_check

copy_write_dirty_data_check

false

No

Specifies whether to perform dirty data validation. If you enable this feature, the system can pinpoint the specific row that failed to be written when dirty data is found. However, this affects write performance. Do not enable this feature unless you are troubleshooting.

write.on_conflict_action

write_mode

INSERT_OR_REPLACE

No

The policy to use when inserting into a table with a primary key:

  • INSERT_OR_IGNORE: If a primary key conflict occurs, the data is not written.

  • INSERT_OR_UPDATE: If a primary key conflict occurs, the corresponding columns are updated.

  • INSERT_OR_REPLACE: If a primary key conflict occurs, all columns are updated.

The following parameters take effect only when write.mode is set to insert.

Parameter name

Previous parameter name

Default value

Required

Description

write.insert.dynamic_partition

dynamic_partition

false

No

This parameter takes effect only when copy_write_mode is set to insert. If set to true, non-existent partitions are automatically created when you write data to the parent table of a partitioned table.

write.insert.batch_size

write_batch_size

512

No

The maximum batch size for each write thread. A batch is committed when the number of Put operations, after being merged by WriteMode, reaches this value.

write.insert.batch_byte_size

write_batch_byte_size

2097152 (2 * 1024 * 1024)

No

The maximum batch size for each write thread, in bytes. The default value is 2 MB. A batch is committed when the byte size of the Put data, after being merged by WriteMode, reaches this value.

write.insert.max_interval_ms

write_max_interval_ms

10000

No

A batch is committed if the time since the last commit exceeds this value.

write.insert.thread_size

write_thread_size

1

No

The number of concurrent write threads. Each concurrent thread occupies one database connection.

Read parameters

Parameter name

Previous parameter name

(Version 1.5.0 and earlier)

Default value

Required

Description

read.mode

bulk_read

auto

No

The read mode. Valid values:

  • auto (default). The Hologres connector automatically selects the optimal mode based on the version and the metadata of the destination table. The selection logic is as follows:

    1. If the fields to be read include the JSONB type, the `select` mode is chosen.

    2. If the instance version is later than 3.0.24, the `bulk_read_compressed` mode is chosen.

    3. In other cases, the `bulk_read` mode is chosen.

  • bulk_read. Reads data in arrow format using COPY OUT. The performance is several times higher than that of the select mode. Reading the JSONB type in Hologres is not supported.

  • bulk_read_compressed. Reads compressed arrow format data using COPY OUT. This can save about 45% of bandwidth compared to uncompressed data.

  • select. Reads data using the standard SELECT method.

read.max_task_count

max_partition_count

80

No

Divides the Hologres table to be read into multiple partitions. Each partition corresponds to one Spark task. If the `ShardCount` of the Hologres table is less than this parameter, the number of partitions is at most `ShardCount`.

read.copy.max_buffer_size

/

52428800 (50 MB)

No

When reading in COPY mode, this is the maximum length of the local buffer. If an exception occurs with large fields, increase this value.

read.push_down_predicate

push_down_predicate

true

No

Specifies whether to enable predicate pushdown, such as applying filter conditions during a query. This feature supports the pushdown of common filter conditions and column pruning.

read.push_down_limit

push_down_limit

true

No

Specifies whether to enable LIMIT pushdown.

read.select.batch_size

scan_batch_size

256

No

This parameter takes effect only when read_mode is set to select. The number of rows to fetch at a time for a scan operation when reading from Hologres.

read.select.timeout_seconds

scan_timeout_seconds

60

No

This parameter takes effect only when read_mode is set to select. The timeout period for a scan operation when reading from Hologres.

read.query

query

None

No

Uses the specified query to read data from Hologres. You can set either this parameter or the table parameter, but not both.

Note
  • When reading data using the query method, only a single task can be used for reading. Predicate pushdown is not supported.

  • When reading data using the table method, the read operation is divided into multiple tasks for concurrent reading based on the `ShardCount` of the Hologres table.

Data type mapping

Spark type

Hologres type

ShortType

SMALLINT

IntegerType

INT

LongType

BIGINT

StringType

TEXT

StringType

JSON

StringType

JSONB

DecimalType

NUMERIC(38, 18)

BooleanType

BOOL

DoubleType

DOUBLE PRECISION

FloatType

FLOAT

TimestampType

TIMESTAMPTZ

DateType

DATE

BinaryType

BYTEA

BinaryType

ROARINGBITMAP

ArrayType(IntegerType)

INT4[]

ArrayType(LongType)

INT8[]

ArrayType(FloatType)

FLOAT4[]

ArrayType(DoubleType)

FLOAT8[]

ArrayType(BooleanType)

BOOLEAN[]

ArrayType(StringType)

TEXT[]

Connection calculation

When Hologres-Connector-Spark reads or writes data, it uses a certain number of JDBC connections. This number can be affected by the following factors:

  • Spark concurrency. This is the number of tasks running concurrently, which can be seen on the Spark UI when the job is running.

  • The number of connections used by the connector for each concurrent task:

    • For writing in COPY mode, each concurrent task uses only one JDBC connection.

    • For writing in INSERT mode, each concurrent task uses write_thread_size JDBC connections.

    • For reading, each concurrent task uses one JDBC connection.

  • Connections used by other aspects: When a job starts, it performs operations such as schema retrieval, which may briefly establish a connection.

Therefore, the total number of connections used by a job can be calculated using the following formulas:

Work item

Connections used

Query metadata from Catalog

1

Read data

parallelism × 1 + 1

Write in COPY mode

parallelism × 1 + 1

Write in INSERT mode

parallelism × write_thread_size + 1

The connection calculation above assumes that the number of tasks Spark can run concurrently is greater than the number of tasks generated by the job.

The number of concurrent tasks that Spark can run may be affected by user-set parameters, such as spark.executor.instances, and may also be affected by Hadoop's file splitting policy. For more information, see Apache Hadoop.