This topic describes how to use Spark to read data from Hologres tables.

Use Spark to read full data from Hologres tables

You can use Spark to read full data from a Hologres table by calling Java Database Connectivity (JDBC) APIs. The JDBC driver must be a PostgreSQL driver. You can download the PostgreSQL driver from the official website. The version of the PostgreSQL driver must be 42.2.25 or later. For more information, see Use JDBC to connect to Hologres.

When you run the spark-submit, spark-shell, or spark-sql command to access Hologres, you must add the dependency on the PostgreSQL driver to the classpath by running the --driver-class-path command. Example: --driver-class-path <postgresql-**.jar>--jars <postgresql-**.jar>. <postgresql-**.jar> indicates the path where the downloaded PostgreSQL driver is placed.

In this example, the PostgreSQL driver is in the /home/hadoop/postgresql-42.6.0.jar path.

spark-submit

spark-submit --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar --class ***

spark-shell

spark-shell --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar

spark-sql

spark-sql --driver-class-path /home/hadoop/postgresql-42.6.0.jar --jars /home/hadoop/postgresql-42.6.0.jar

The following examples show how to read full data from Hologres tables.

Read full data from a Hologres table by using spark-scala-dataframe

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db")
  .option("dbtable", "tablename")
  .option("user", "ram ak")
  .option("password", "ram ak secret")
  .load()
jdbcDF.show(1000)
Parameters:
  • url: In this example, jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db is used. hgpostcn****.hologres.aliyuncs.com:80 is the endpoint of your Hologres instance. You can obtain the endpoint of the Hologres instance on the instance details page in the Hologres console. Net IP
  • user: the AccessKey ID of your account that is used to access the Hologres instance.
  • password: the AccessKey secret of your account that is used to access the Hologres instance.

For more information about the parameters for option, see JDBC To Other Databases.

Read full data from a Hologres table by using Spark SQL statements

CREATE TABLE holo_test
USING jdbc2
OPTIONS(url='jdbc:postgresql://hgpostcn****.hologres.aliyuncs.com:80/test_db',
driver='org.postgresql.Driver',
dbtable='test_table',
user='ram ak',
password='ram ak secret'
);

desc holo_test;

select * from holo_test;

Use Spark to read incremental data from Hologres tables

For information about incremental data of Hologres tables, see Subscribe to Hologres binary logs.

When you use spark-streaming to read incremental data from Hologres tables, you must add the related dependencies of Hologres to the classpath by running the --driver-class-path command.

Dependencies for Spark 2

--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jar  --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar, /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/postgresql-42.2.23.jar

Dependencies for Spark 3

--driver-class-path /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar:/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jar --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar,/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/postgresql-42.2.23.jar

The following examples show how to read incremental data from Hologres tables.

Read incremental data from a Hologres table by using spark-structured-streaming

// Read incremental data from a Hologres table. 
val df = spark
  .readStream
  .format("hologres")
  .option(url, 'jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db')
  .option(username, 'ram ak')
  .option(password, 'ram ak secret')
  .option(tablename, 'test_1')
  .option(starttime, '2022-04-19 10:00:00')
  .load()

// Write data to Delta. 
df.writeStream
    .outputMode("append")
  .format("delta")
  .start()
                

Read incremental data from a Hologres table by using spark-streaming-sql

drop table if exists holo;
CREATE TABLE if not exists holo
USING hologres
OPTIONS(url='jdbc:postgresql://hgpostcn-****.hologres.aliyuncs.com:80/test_db',
    username='ram ak',
    password='ram ak secret',
    tablename='test_1',
    starttime='2022-04-19 10:00:00',
    max.offset.per.trigger="1");

desc holo;

drop table if exists holo_sink;
create table if not exists holo_sink(id int, name string) using delta;


create scan holo_scan
on holo
using stream
;

create stream holo_test
options(
checkpointLocation='file:///tmp/',
outputMode='Append',
triggerType='ProcessingTime',
triggerIntervalMs='3000')
insert into holo_sink
select  id,  name  from holo_scan;