This topic explains how to read data from Hologres tables by using Spark.
Read full data
Spark reads full data from a Hologres table using the JDBC interface. This process requires a PostgreSQL driver. Download PostgreSQL JDBC Driver version 42.2.25 or later from the official website. For more information, see JDBC.
When using the spark-submit, spark-shell, or spark-sql commands to access Hologres, add the PostgreSQL driver dependency to the classpath. Include the --driver-class-path <postgresql-**.jar> --jars <postgresql-**.jar> option in your command, where <postgresql-**.jar> is the path to your downloaded PostgreSQL JDBC driver.
For example, if the PostgreSQL driver is located at /home/hadoop/postgresql-42.6.0.jar:
spark-submit
spark-submit --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar --class ***
spark-shell
spark-shell --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar
spark-sql
spark-sql --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar
Read full data with a Spark Scala DataFrame
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db")
.option("dbtable", "tablename")
.option("user", "ram ak")
.option("password", "ram ak secret")
.load()
jdbcDF.show(1000)
url: The JDBC connection URL. For example,jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db. Thehgpostcn****.hologres.aliyuncs.com:80part is the endpoint of your Hologres instance. To get this endpoint, navigate to the Instance Details page in the Hologres console. In the Network Information section, find the desired network type (public network, classic network, VPC, or specified VPC) and click Copy next to its domain name.user: The AccessKey ID of your account.password: The AccessKey Secret of your account.
For more information about option settings, see JDBC To Other Databases.
Read full data with Spark SQL
CREATE TABLE holo_test
USING jdbc2
OPTIONS(url='jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db',
driver='org.postgresql.Driver',
dbtable='test_table',
user='ram ak',
password='ram ak secret'
);
desc holo_test;
select * from holo_test;
Read incremental data
For more information about incremental data in Hologres, see Subscribe to Hologres binary logs.
To use spark-streaming to access incremental data from Hologres, you must add the required Hologres dependencies to the classpath.
Spark 2 dependencies
--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jar --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar, /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jar
Spark 3 dependencies
--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jar --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar,/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jar
Read incremental data with Spark Structured Streaming
// Read incremental data from Hologres.
val df = spark
.readStream
.format("hologres")
.option("url", "jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db")
.option("username", "ram ak")
.option("password", "ram ak secret")
.option("table", "test_1")
.option("starttime", "2022-04-19 10:00:00")
.load()
// Write the data to a Delta Lake table.
df.writeStream
.outputMode("append")
.format("delta")
.start()
Read incremental data with Spark Streaming SQL
drop table if exists holo;
CREATE TABLE if not exists holo
USING hologres
OPTIONS(url='jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db',
username='ram ak',
password='ram ak secret',
tablename='test_1',
starttime='2022-04-19 10:00:00',
max.offset.per.trigger='1');
desc holo;
drop table if exists holo_sink;
create table if not exists holo_sink(id int, name string) using delta;
create scan holo_scan
on holo
using stream
;
create stream holo_test
options(
checkpointLocation='file:///tmp/',
outputMode='Append',
triggerType='ProcessingTime',
triggerIntervalMs='3000')
insert into holo_sink
select id, name from holo_scan;