All Products
Search
Document Center

Hologres:Read/Write Hologres data with Spark

Last Updated:Dec 05, 2025

Spark is a unified analytics engine for large-scale data processing. Hologres is integrated with Apache Spark and Alibaba Cloud's EMR Serverless Spark to help enterprises quickly build data warehouses. The Spark connector provided by Hologres lets you create Hologres catalogs in a Spark cluster. This enables you to read data from and write data to Hologres in batches using external tables. The Spark connector provides better performance than the native Java Database Connectivity (JDBC).

Version requirement

The Spark connector requires Hologres V1.3+. Check your instance version on the Instance Details page in the Hologres console. If your instance version is earlier than V1.3, you can upgrade your instance or join the Hologres DingTalk group (ID: 32314975) to request an instance upgrade.

Preparations

  • Install a Spark environment that can run spark-sql, spark-shell, or pyspark commands. We recommend using Spark 3.3.0 or later to avoid dependency issues and access more features.

    • You can use EMR Serverless Spark to quickly build a Spark environment and connect it to a Hologres instance. For more information, see Enhanced features of Spark in EMR.

    • You can also set up an independent Spark environment. For more information, see Apache Spark.

  • To read data from and write data to Hologres using Spark, you need the hologres-connector-spark-3.x connector JAR package. This topic uses version 1.5.2 as an example. You can download the package from the Maven Central Repository. The connector resources are open-source. For more information, see Hologres-Connectors.

  • To develop Spark jobs in Java and perform local debugging in a tool such as IntelliJ IDEA, add the following Maven dependency to the pom.xml file.

    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>hologres-connector-spark-3.x</artifactId>
        <version>1.5.2</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>

Hologres catalog

Hologres catalogs are supported in Spark connector 1.5.2 and later. You can use external tables to easily read data from and write data to Hologres.

Each Hologres catalog in Spark maps to a database in Hologres. Each namespace in a Hologres catalog maps to a schema in the mapped database. The following sections describe how to use Hologres catalogs in Spark.

Note

Hologres catalogs do not support table creation.

This topic uses the following database and tables in a Hologres instance:

test_db -- Database
  public.test_table1 -- Table in the public schema
  public.test_table2
  test_schema.test_table3  -- Table in the test_schema schema 

Initialize a Hologres catalog

Start spark-sql in the Spark cluster, load the Hologres connector, and specify the catalog parameters.

spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db

Common Hologres catalog commands

  • Load a Hologres catalog

    A Hologres catalog in Spark maps to a Hologres database. This mapping cannot be changed during the session.

    USE hologres_external_test_db;
  • Query all namespaces.

    A namespace in Spark maps to a schema in Hologres. The default schema is public. Run the USE command to change the default schema.

    -- View all namespaces in the Hologres catalog, which correspond to all schemas in the Hologres database.
    SHOW NAMESPACES;
  • Query tables in a namespace.

    • Query all tables.

      SHOW TABLES;
    • Query tables in a specific namespace.

      USE test_schema;
      SHOW TABLES;
      
      -- Or, use the following statement. 
      SHOW TABLES IN test_schema;
  • Read data from and write data to a table.

    Use SELECT and INSERT statements to read data from and write data to Hologres external tables in the catalog.

    -- Read data from the table.
    SELECT * FROM public.test_table1;
    
    -- Write data to the table.
    INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;

Import data to Hologres

The test data in this section is from the customer table in a TPC-H dataset. You can download the sample customer data in CSV format. The following SQL statement creates the customer table schema.

CREATE TABLE customer_holo_table
(
  c_custkey    BIGINT ,
  c_name       TEXT   ,
  c_address    TEXT   ,
  c_nationkey  INT    ,
  c_phone      TEXT   ,
  c_acctbal    DECIMAL(15,2) ,
  c_mktsegment TEXT   ,
  c_comment    TEXT
);

Import data using Spark-SQL

When you use Spark-SQL, it is more convenient to load the metadata of a Hologres table using a catalog. You can also declare a Hologres table by creating a temporary table.

Note
  • Spark connectors earlier than 1.5.2 do not support catalogs. You can only declare a Hologres table by creating a temporary table.

  • For more information about the parameters for Hologres Spark connector, see Parameters.

  1. Initialize a Hologres catalog.

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. Import data from a CSV source table to a Hologres external table.

    Note

    The INSERT INTO syntax of Spark does not support using column_list to specify a subset of columns to write to. For example, you cannot use INSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTable to write data only to the c_custkey field.

    If you want to write data to specific fields, use the CREATE TEMPORARY VIEW statement to declare a Hologres temporary table that contains only the required fields.

    Write data using a catalog

    -- Load a Hologres catalog.
    USE hologres_external_test_db;
    
    -- Create a CSV data source.
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep "," -- For local testing, use the absolute path of the file.
    );
    
    -- Write data from the CSV table to Hologres.
    INSERT INTO public.customer_holo_table SELECT * FROM csvTable;

    Write data using a temporary view

    -- Create a CSV data source.
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep ","
    );
    
    -- Create a Hologres temporary table.
    CREATE TEMPORARY VIEW hologresTable (
        c_custkey BIGINT,
        c_name STRING,
        c_phone STRING)
    USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        table "customer_holo_table"
    );
    
    INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;

Import data using DataFrame

You can use tools such as spark-shell or pyspark to develop Spark jobs and call the write method of a DataFrame to write data. Data read from a CSV file is converted to a DataFrame and then written to a Hologres instance. The following sections provide sample code for different programming languages. For more information about the parameters for the Hologres connector for Spark, see Parameters.

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode

// The schema of the CSV source.
val schema = StructType(Array(
  StructField("c_custkey", LongType),
  StructField("c_name", StringType),
  StructField("c_address", StringType),
  StructField("c_nationkey", IntegerType),
  StructField("c_phone", StringType),
  StructField("c_acctbal", DecimalType(15, 2)),
  StructField("c_mktsegment", StringType),
  StructField("c_comment", StringType)
))

// Read data from a CSV file as a DataFrame.
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")

// Write the DataFrame to Hologres.
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;

public class SparkTest {
    public static void main(String[] args) {
        // The schema of the CSV source.
        List<StructField> asList =
                Arrays.asList(
                        DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
                        DataTypes.createStructField("c_name", DataTypes.StringType, true),
                        DataTypes.createStructField("c_address", DataTypes.StringType, true),
                        DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
                        DataTypes.createStructField("c_phone", DataTypes.StringType, true),
                        DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
                        DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
                        DataTypes.createStructField("c_comment", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(asList);

        // Run in local mode.
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();

        // Read data from a CSV file as a DataFrame.
        // For local testing, use the absolute path of the customer data.
        Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");

        // Write the DataFrame to Hologres.
        csvDf.write.format("hologres").option(
           "username", "***").option(
           "password", "***").option(
           "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
           "table", "customer_holo_table").mode(
           "append").save()
    }
}

The following configuration is required for the Maven file.

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>

Python

from pyspark.sql.types import *

# The schema of the CSV source.
schema = StructType([
    StructField("c_custkey", LongType()),
    StructField("c_name", StringType()),
    StructField("c_address", StringType()),
    StructField("c_nationkey", IntegerType()),
    StructField("c_phone", StringType()),
    StructField("c_acctbal", DecimalType(15, 2)),
    StructField("c_mktsegment", StringType()),
    StructField("c_comment", StringType())
])

# Read data from a CSV file as a DataFrame.
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')

# Write the DataFrame to Hologres.
csvDf.write.format("hologres").option(
    "username", "***").option(
    "password", "***").option(
    "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
    "table", "customer_holo_table").mode(
    "append").save()

To use Spark to run jobs in different programming languages, perform the following steps:

  • Scala

    • Use the sample code to generate a sparktest.scala file and run the following commands to execute the job.

      -- Load dependencies. 
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- For local testing, use the absolute path to load the file.
      scala> :load D:/sparktest.scala
    • Alternatively, copy and paste the sample code to execute it after the dependencies are loaded.

  • Java

    Use a development tool to add the sample code and use a Maven tool to package it. For example, if the package name is spark_test.jar, run the following command to execute the job.

    -- Use the absolute path of the job JAR package.
    spark-submit --class SparkTest  --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_test.jar
  • Python

    After the following command is executed, paste the sample code to execute it.

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

Read data from Hologres

  • Starting from Spark connector 1.3.2, you can read data from Hologres. Compared with the default JDBC connector of Spark, the Spark connector can concurrently read data from a Hologres table based on the table shards. This improves read performance. The read concurrency is determined by the number of shards in the table. Use the read.max_task_count parameter to limit the read concurrency of the Spark connector. A job generates a number of read tasks equal to Min(shardCount, max_task_count). Schema inference is also supported. If you do not specify a schema, the Spark schema is inferred from the schema of the Hologres table.

  • Starting from Spark connector 1.5.0, predicate pushdown, LIMIT pushdown, and field pruning are supported when you read data from Hologres tables. You can also use a Hologres SELECT QUERY to read data. This version supports batch read mode, which improves read performance by three to four times compared to previous versions.

Read data using Spark SQL

When you use Spark SQL, it is more convenient to load the metadata of a Hologres table using a catalog. You can also declare a Hologres table by creating a temporary table.

Note
  • The Spark connector earlier than 1.5.2 do not support catalogs. You can only declare a Hologres table by creating a temporary table.

  • For more information about Hologres-Connector-Spark parameters, see Parameters.

  1. Initialize a Hologres catalog.

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. Read Hologres data.

    • Read data using a catalog.

      -- Load a Hologres catalog.
      USE hologres_external_test_db;
      
      -- Read data from a Hologres table. Field pruning and predicate pushdown are supported.
      SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10;
    • Read data by creating a temporary table.

      CREATE TEMPORARY VIEW (table)

      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.max_task_count "80", // The maximum number of tasks into which the Hologres table can be split for reading.
        table "customer_holo_table"
      );
      
      -- Field pruning and predicate pushdown are supported.
      SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;

      CREATE TEMPORARY VIEW (query)

      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.query "select c_custkey,c_name,c_phone from customer_holo_table where c_custkey < 500 limit 10"
      );
      
      SELECT * FROM hologresTable LIMIT 5;

Read Hologres data as a DataFrame

You can use tools such as spark-shell or pyspark to develop Spark jobs and call Spark's read method to read data as a DataFrame for subsequent processing. The following sections provide sample code that shows how to read data from a Hologres table as a DataFrame in different programming languages. For more information about the connector parameters, see Parameters.

Scala

val readDf = (
  spark.read
    .format("hologres")
    .option("username", "***")
    .option("password", "***")
    .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
    .option("table", "customer_holo_table")
    .option("read.max_task_count", "80") // The maximum number of tasks into which the Hologres table can be split for reading.
    .load()
    .filter("c_custkey < 500")
)

readDf.select("c_custkey", "c_name", "c_phone").show(10)

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSelect {
    public static void main(String[] args) {
        
        // Run in local mode.
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();
                
        Dataset<Row> readDf = (
           spark.read
                .format("hologres")
                .option("username", "***")
                .option("password", "***")
                .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
                .option("table", "customer_holo_table")
                .option("read.max_task_count", "80") // The maximum number of tasks into which the Hologres table can be split for reading.
                .load()
                .filter("c_custkey < 500")
        );
        readDf.select("c_custkey", "c_name", "c_phone").show(10);
    }
}

The following configuration is required for the Maven file.

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>

Python

readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()

readDf.select("c_custkey", "c_name", "c_phone").show(10)

To use Spark to run jobs in different programming languages, perform the following steps:

  • Scala

    • Use the sample code to generate a sparkselect.scala file and run the following commands to execute the job.

      -- Load dependencies. 
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- For local testing, use the absolute path to load the file.
      scala> :load D:/sparkselect.scala
    • Alternatively, you can paste the sample code to execute it after the dependencies are loaded.

  • Java

    You can use a development tool to add the sample code and use a Maven tool to package it. For example, if the package name is spark_select.jar, run the following command to execute the job.

    -- Use the absolute path of the job JAR package.
    spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_select.jar
  • Python

    After the following command is executed, you can paste the sample code to execute it.

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

Parameters

General parameters

Parameter

Default value

Required

Description

username

(none)

Yes

password

(none)

Yes

  • The AccessKey secret for your account. For more information, see Create an AccessKey pair.

  • The password of the created account.

table

(none)

Yes

The name of the Hologres table for reading and writing data.

Note

You can also replace this parameter with the read.query parameter when you read data.

jdbcurl

(none)

Yes

The JDBC URL of the Hologres real-time data API, in the format of jdbc:postgresql://<host>:<port>/<db_name>. Go to the Hologres console. In the navigation pane on the left, click Instances. Find the target instance and go to the Instance Details page. In the Network Information section, obtain the host and port number.

enable_serverless_computing

false

No

Specifies whether to use serverless computing resources. This parameter is valid only for reading data and for writing data in bulk_load mode. For more information, see Serverless Computing guide.

serverless_computing_query_priority

3

No

The execution priority of Serverless Computing.

statement_timeout_seconds

28800 (8 hours)

No

The timeout period for a query, in seconds.

retry_count

3

No

The number of retries when the connection fails.

direct_connect

See the Description column.

No

The bottleneck of batch reading and writing is usually the network throughput of the endpoint. Therefore, Hologres tests whether the current environment can directly connect to the Hologres FrontEnd (access node). If a direct connection is supported, it is used by default. If you set this parameter to false, a direct connection is not used.

Write parameters

The Hologres connector supports the SaveMode parameter of Spark. For SQL, this corresponds to INSERT INTO or INSERT OVERWRITE. For a DataFrame, you can set SaveMode to Append or Overwrite when you write data. If you use the Overwrite mode, a temporary table is created for writing data. After the write operation is successful, the temporary table replaces the original table. Use the Overwrite mode only when necessary.

Parameter name (new)

Parameter name (old)

Default value

Required

Description

write.mode

copy_write_mode

auto

No

The write mode. For more information about the differences between write modes, see Comparison of batch write modes. Valid values:

  • auto: The connector automatically selects an optimal mode based on the Hologres instance version and the metadata of the destination table. The following selection logic applies:

    1. If the Hologres instance is later than V2.2.25 and the table has a primary key, the bulk_load_on_conflict mode is used.

    2. If the Hologres instance is later than V2.1.0 and the table does not have a primary key, the bulk_load mode is used.

    3. If the Hologres instance is later than V1.3, the stream mode is used.

    4. In other cases, the insert mode is used.

  • stream: COPY in fixed plans. The COPY statement is supported in fixed plans in Hologres V1.3 and later. Compared with the INSERT statement, the COPY statement provides higher throughput because it uses the stream mode. The COPY statement also features lower data latency and reduced client memory consumption because it does not batch data.

    Note

    This mode requires Hologres connector 1.3.0 or later and Hologres V1.3.34 or later.

  • bulk_load: This mode uses batch COPY. Compared with streaming COPY in Fixed Plan, batch COPY imposes a lower load on the Hologres instance under high records per second (RPS) conditions. However, this mode supports writing data only to tables without primary keys.

    Note

    This mode requires Hologres connector 1.4.2 or later and Hologres V2.1.0 or later.

  • bulk_load_on_conflict: This mode uses batch COPY to write data to tables with primary keys and can handle duplicate primary keys. By default, when you import data in batches into a Hologres table with a primary key, a table lock is triggered. This limits the ability to concurrently write data using multiple connections. The current connector lets you redistribute data based on the distribution key of the destination Hologres table. This ensures that each Spark task writes data to only one shard. This reduces the original table-level lock to a shard-level lock, which enables concurrent writes and improves write performance. Because each connection needs to maintain data for only a few shards, this optimization can also significantly reduce the number of small files and the memory usage of Hologres. Tests show that repartitioning data before concurrent writing reduces the system load by about 67% compared with writing in stream mode.

    Note

    This mode requires Hologres connector 1.4.2 or later and Hologres V2.2.25 or later.

  • insert: This mode uses the INSERT method to write data.

write.copy.max_buffer_size

max_cell_buffer_size

52428800 (50 MB)

No

The maximum size of the local buffer when you write data in COPY mode. You do not usually need to adjust this value. However, if a buffer overflow occurs when you write large fields, such as extra-long strings, increase this value.

write.copy.dirty_data_check

copy_write_dirty_data_check

false

No

Specifies whether to check for dirty data. If you enable this feature, the specific row that fails to be written can be precisely located when dirty data is found. However, this affects write performance. Therefore, do not enable this feature unless you are troubleshooting.

write.on_conflict_action

write_mode

INSERT_OR_REPLACE

No

The policy used when the destination table of an INSERT statement has a primary key:

  • INSERT_OR_IGNORE: If a primary key conflict occurs, the data is not written.

  • INSERT_OR_UPDATE: If a primary key conflict occurs, the corresponding columns are updated.

  • INSERT_OR_REPLACE: If a primary key conflict occurs, all columns are updated.

The following parameters take effect only when write.mode is set to insert.

Parameter name

Previous parameter name

Default value

Required

Description

write.insert.dynamic_partition

dynamic_partition

false

No

This parameter takes effect when copy_write_mode is set to insert. If this parameter is set to true, partitions that do not exist are automatically created when you write data to a parent partitioned table.

write.insert.batch_size

write_batch_size

512

No

The maximum batch size for each write thread. A batch commit is performed when the number of Put operations after merging reaches the value of WriteBatchSize.

write.insert.batch_byte_size

write_batch_byte_size

2097152 (2 × 1024 × 1024)

No

The maximum batch size for each write thread. Unit: bytes. Default value: 2 MB. A batch commit is performed when the byte size of the Put data after merging reaches the value of WriteBatchByteSize.

write.insert.max_interval_ms

write_max_interval_ms

10000

No

A batch commit is triggered if the time elapsed since the last commit exceeds the value of WriteMaxIntervalMs.

write.insert.thread_size

write_thread_size

1

No

The number of concurrent write threads. Each concurrent thread occupies one database connection.

Read parameters

Parameter name

Previous parameter name

(1.5.0 and earlier)

Default value

Required

Description

read.mode

bulk_read

auto

No

The read mode. Valid values:

  • auto (default): The Hologres connector automatically selects an optimal mode based on the Hologres instance version and the metadata of the destination table. The following selection logic applies:

    1. If the fields to be read include the JSONB type, the select mode is used.

    2. If the instance version is later than 3.0.24, the bulk_read_compressed mode is used.

    3. In other cases, the bulk_read mode is used.

  • bulk_read: This mode uses COPY OUT to read data in the arrow format. The performance of this mode is several times higher than that of the select mode. This mode does not support reading data of the JSONB type from Hologres.

  • bulk_read_compressed: This mode uses COPY OUT to read compressed data in the arrow format. Compared with reading uncompressed data, this mode can save about 45% of the bandwidth.

  • select: This mode uses common SELECT statements to read data.

read.max_task_count

max_partition_count

80

No

The Hologres table to be read is split into multiple partitions. Each partition corresponds to a Spark task. If the shard count of the Hologres table is less than the value of this parameter, the number of partitions is at most the shard count.

read.copy.max_buffer_size

/

52428800 (50 MB)

No

The maximum size of the local buffer when you read data in COPY mode. If an exception occurs due to a large field size, increase this value.

read.push_down_predicate

push_down_predicate

true

No

Specifies whether to perform predicate pushdown, such as pushing down filter conditions applied during queries. The pushdown of common filter conditions and column pruning are supported.

read.push_down_limit

push_down_limit

true

No

Specifies whether to perform Limit pushdown.

read.select.batch_size

scan_batch_size

256

No

This parameter takes effect when read_mode is set to select. This parameter specifies the number of rows to fetch in a single scan operation when you read data from Hologres.

read.select.timeout_seconds

scan_timeout_seconds

60

No

This parameter takes effect when read_mode is set to select. This parameter specifies the timeout period of a scan operation when you read data from Hologres.

read.query

query

None

No

The query used to read data from Hologres. You can specify only one of the table and read.query parameters.

Note
  • If you use the query parameter, you can use only a single task to read data. Predicate pushdown is not supported.

  • If you use the table parameter, multiple tasks are used to concurrently read data based on the shard count of the Hologres table.

Data type mappings

Spark type

Hologres type

ShortType

SMALLINT

IntegerType

INT

LongType

BIGINT

StringType

TEXT

StringType

JSON

StringType

JSONB

DecimalType

NUMERIC(38, 18)

BooleanType

BOOL

DoubleType

DOUBLE PRECISION

FloatType

FLOAT

TimestampType

TIMESTAMPTZ

DateType

DATE

BinaryType

BYTEA

BinaryType

ROARINGBITMAP

ArrayType(IntegerType)

INT4[]

ArrayType(LongType)

INT8[]

ArrayType(FloatType)

FLOAT4[]

ArrayType(DoubleType)

FLOAT8[]

ArrayType(BooleanType)

BOOLEAN[]

ArrayType(StringType)

TEXT[]

Connection count calculation

The Hologres connector for Spark uses a specific number of JDBC connections for read and write operations. The number of connections is determined by the following factors:

  • The parallelism of Spark, which is the number of concurrent tasks displayed on the Spark UI when your job is running.

  • The number of connections used by each concurrent task of the connector:

    • If you write data in COPY mode, each concurrent task uses one JDBC connection.

    • If you write data in INSERT mode, the number of JDBC connections used by each concurrent task is the same as the value of the write_thread_size parameter.

    • If you read data, each concurrent task uses one JDBC connection.

  • The number of connections used for other operations. When a job starts, operations such as schema acquisition are performed, and a connection may be established for a short period.

The total number of JDBC connections used by a job can be calculated using the following formulas.

Item

Number of connections used

Query metadata using a catalog

1

Read data

parallelism × 1 + 1

Write data in COPY mode

parallelism × 1 + 1

Write data in INSERT mode

parallelism × write_thread_size + 1

The preceding connection count calculation assumes that the number of tasks that Spark can run concurrently is greater than the number of tasks generated by the job.

The number of tasks Spark can run concurrently may be affected by parameter settings, such as spark.executor.instances. It may also be affected by the file block splitting policy of Hadoop. For more information, see Apache Hadoop.