Spark can read data from Hologres tables in two modes:
Full data read — reads all records from a table using JDBC. Use this for batch processing and point-in-time snapshots.
Incremental data read — subscribes to Hologres binary logs and streams new records in real time using Spark Structured Streaming. Use this for change data capture (CDC) and event-driven pipelines.
Prerequisites
Before you begin, ensure that you have:
An E-MapReduce (EMR) cluster with Spark installed
A running Hologres instance
Your AccessKey ID and AccessKey secret
Store your AccessKey ID and AccessKey secret as environment variables or in a secrets manager. Do not hardcode them in your scripts.
Read full data from Hologres tables
Full data reads use Java Database Connectivity (JDBC) with the PostgreSQL driver. Download the driver from Maven Central. The driver version must be 42.2.25 or later.
When running spark-submit, spark-shell, or spark-sql, add the driver to the classpath using both --driver-class-path and --jars:
# spark-submit
spark-submit --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar --class <your-main-class> <your-app.jar>
# spark-shell
spark-shell --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar
# spark-sql
spark-sql --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jarReplace /home/hadoop/postgresql-42.6.0.jar with the path where you placed the downloaded driver.
Read full data using the Spark DataFrame API (Scala)
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db")
.option("dbtable", "tablename")
.option("user", "<your-accesskey-id>")
.option("password", "<your-accesskey-secret>")
.load()
jdbcDF.show(1000)Parameter | Description | Example |
| JDBC connection string. Get the hostname and port from the endpoint on your Hologres instance details page. |
|
| The Hologres table to read |
|
| Your AccessKey ID | — |
| Your AccessKey secret | — |
For all available JDBC options, see JDBC To Other Databases.
Read full data using Spark SQL
CREATE TABLE holo_test
USING jdbc2
OPTIONS(
url='jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db',
driver='org.postgresql.Driver',
dbtable='test_table',
user='<your-accesskey-id>',
password='<your-accesskey-secret>'
);
DESC holo_test;
SELECT * FROM holo_test;Read incremental data from Hologres tables
Incremental reads subscribe to Hologres binary logs and deliver new records as a stream. For background on binary logs, see Subscribe to Hologres binary logs.
This mode requires the EMR SDK JARs in addition to the PostgreSQL driver. Add them to the classpath based on your Spark version.
Spark 2:
--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jar \
--jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar,/opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jarSpark 3:
--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jar \
--jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar,/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jarRead incremental data using Spark Structured Streaming
The following example reads incremental records from a Hologres table starting from a given timestamp and writes them to Delta Lake in append mode.
// Read incremental data from a Hologres table
val df = spark
.readStream
.format("hologres")
.option("url", "jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db")
.option("username", "<your-accesskey-id>")
.option("password", "<your-accesskey-secret>")
.option("tablename", "test_1")
.option("starttime", "2022-04-19 10:00:00")
.load()
// Write records to Delta Lake
df.writeStream
.outputMode("append")
.format("delta")
.start()Parameter | Description | Example |
| JDBC connection string for your Hologres instance |
|
| Your AccessKey ID | — |
| Your AccessKey secret | — |
| The Hologres table to subscribe to |
|
| Timestamp from which to start reading binary logs |
|
Read incremental data using Spark Streaming SQL
The following example uses the EMR Spark Streaming SQL extension to read from a Hologres table and write to a Delta Lake sink table.
-- Source table: subscribe to Hologres binary logs
DROP TABLE IF EXISTS holo;
CREATE TABLE IF NOT EXISTS holo
USING hologres
OPTIONS(
url='jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db',
username='<your-accesskey-id>',
password='<your-accesskey-secret>',
tablename='test_1',
starttime='2022-04-19 10:00:00',
max.offset.per.trigger='1'
);
DESC holo;
-- Sink table: Delta Lake
DROP TABLE IF EXISTS holo_sink;
CREATE TABLE IF NOT EXISTS holo_sink(id INT, name STRING) USING delta;
-- Scan definition
CREATE SCAN holo_scan
ON holo
USING stream;
-- Stream job: read from the scan, write to the sink
CREATE STREAM holo_test
OPTIONS(
checkpointLocation='file:///tmp/',
outputMode='Append',
triggerType='ProcessingTime',
triggerIntervalMs='3000'
)
INSERT INTO holo_sink
SELECT id, name FROM holo_scan;Key options for the stream job:
Option | Description | Example |
| Maximum number of records to read per trigger interval. Lower values reduce latency; higher values improve throughput. |
|
| Path for storing checkpoint data. Use a persistent location in production (for example, an OSS path). |
|
| How records are written to the sink. |
|
| Trigger policy for the streaming job. |
|
| Interval in milliseconds between triggers when |
|
