All Products
Search
Document Center

E-MapReduce:Connect Spark to Hologres

Last Updated:Mar 27, 2026

Spark can read data from Hologres tables in two modes:

  • Full data read — reads all records from a table using JDBC. Use this for batch processing and point-in-time snapshots.

  • Incremental data read — subscribes to Hologres binary logs and streams new records in real time using Spark Structured Streaming. Use this for change data capture (CDC) and event-driven pipelines.

Prerequisites

Before you begin, ensure that you have:

  • An E-MapReduce (EMR) cluster with Spark installed

  • A running Hologres instance

  • Your AccessKey ID and AccessKey secret

Important

Store your AccessKey ID and AccessKey secret as environment variables or in a secrets manager. Do not hardcode them in your scripts.

Read full data from Hologres tables

Full data reads use Java Database Connectivity (JDBC) with the PostgreSQL driver. Download the driver from Maven Central. The driver version must be 42.2.25 or later.

When running spark-submit, spark-shell, or spark-sql, add the driver to the classpath using both --driver-class-path and --jars:

# spark-submit
spark-submit --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar --class <your-main-class> <your-app.jar>

# spark-shell
spark-shell --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar

# spark-sql
spark-sql --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar

Replace /home/hadoop/postgresql-42.6.0.jar with the path where you placed the downloaded driver.

Read full data using the Spark DataFrame API (Scala)

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db")
  .option("dbtable", "tablename")
  .option("user", "<your-accesskey-id>")
  .option("password", "<your-accesskey-secret>")
  .load()

jdbcDF.show(1000)

Parameter

Description

Example

url

JDBC connection string. Get the hostname and port from the endpoint on your Hologres instance details page. Net IP

jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db

dbtable

The Hologres table to read

tablename

user

Your AccessKey ID

password

Your AccessKey secret

For all available JDBC options, see JDBC To Other Databases.

Read full data using Spark SQL

CREATE TABLE holo_test
USING jdbc2
OPTIONS(
  url='jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db',
  driver='org.postgresql.Driver',
  dbtable='test_table',
  user='<your-accesskey-id>',
  password='<your-accesskey-secret>'
);

DESC holo_test;

SELECT * FROM holo_test;

Read incremental data from Hologres tables

Incremental reads subscribe to Hologres binary logs and deliver new records as a stream. For background on binary logs, see Subscribe to Hologres binary logs.

This mode requires the EMR SDK JARs in addition to the PostgreSQL driver. Add them to the classpath based on your Spark version.

Spark 2:

--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jar \
--jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar,/opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jar

Spark 3:

--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jar \
--jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar,/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jar

Read incremental data using Spark Structured Streaming

The following example reads incremental records from a Hologres table starting from a given timestamp and writes them to Delta Lake in append mode.

// Read incremental data from a Hologres table
val df = spark
  .readStream
  .format("hologres")
  .option("url", "jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db")
  .option("username", "<your-accesskey-id>")
  .option("password", "<your-accesskey-secret>")
  .option("tablename", "test_1")
  .option("starttime", "2022-04-19 10:00:00")
  .load()

// Write records to Delta Lake
df.writeStream
  .outputMode("append")
  .format("delta")
  .start()

Parameter

Description

Example

url

JDBC connection string for your Hologres instance

jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db

username

Your AccessKey ID

password

Your AccessKey secret

tablename

The Hologres table to subscribe to

test_1

starttime

Timestamp from which to start reading binary logs

2022-04-19 10:00:00

Read incremental data using Spark Streaming SQL

The following example uses the EMR Spark Streaming SQL extension to read from a Hologres table and write to a Delta Lake sink table.

-- Source table: subscribe to Hologres binary logs
DROP TABLE IF EXISTS holo;
CREATE TABLE IF NOT EXISTS holo
USING hologres
OPTIONS(
  url='jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db',
  username='<your-accesskey-id>',
  password='<your-accesskey-secret>',
  tablename='test_1',
  starttime='2022-04-19 10:00:00',
  max.offset.per.trigger='1'
);

DESC holo;

-- Sink table: Delta Lake
DROP TABLE IF EXISTS holo_sink;
CREATE TABLE IF NOT EXISTS holo_sink(id INT, name STRING) USING delta;

-- Scan definition
CREATE SCAN holo_scan
ON holo
USING stream;

-- Stream job: read from the scan, write to the sink
CREATE STREAM holo_test
OPTIONS(
  checkpointLocation='file:///tmp/',
  outputMode='Append',
  triggerType='ProcessingTime',
  triggerIntervalMs='3000'
)
INSERT INTO holo_sink
SELECT id, name FROM holo_scan;

Key options for the stream job:

Option

Description

Example

max.offset.per.trigger

Maximum number of records to read per trigger interval. Lower values reduce latency; higher values improve throughput.

1

checkpointLocation

Path for storing checkpoint data. Use a persistent location in production (for example, an OSS path).

file:///tmp/

outputMode

How records are written to the sink. Append writes new records only.

Append

triggerType

Trigger policy for the streaming job. ProcessingTime fires at fixed intervals.

ProcessingTime

triggerIntervalMs

Interval in milliseconds between triggers when triggerType is ProcessingTime.

3000

What's next