EMR Serverless Spark can connect to Hologres using specific configurations and the Spark Connector provided by Hologres. This topic describes how to read data from and write data to Hologres in an EMR Serverless Spark environment.
Limits
Only Hologres instances version 1.3 or later support the Spark connector. You can view your current instance version on the Instance Details page in the Hologres Management Console. If your instance runs a version earlier than 1.3, use Instance Upgrade or join the Hologres communication group by searching for DingTalk group number 32314975 to request an upgrade.
Procedure
Step 1: Obtain the hologres-connector-spark JAR and upload it to OSS
To read from and write to Hologres, Spark requires a connector JAR package. You can download the package from the Maven Central Repository. This topic uses version 1.5.6 as an example. Click the attachment hologres-connector-spark-3.x-1.5.6-jar-with-dependencies.jar to download it.
Upload the downloaded hologres-connector-spark JAR package to Alibaba Cloud OSS. For more information about uploading files, see Simple upload.
Step 2: Add a network connection
Obtain network information.
Navigate to the Hologres page and open the instance details page of the target Hologres instance to obtain its VPC and vSwitch information.
Add a new network connection.
Serverless Spark requires network connectivity with the Hologres cluster to access the Hologres service. For more information about network connectivity, see Network connectivity between EMR Serverless Spark and other VPCs.
Step 3: Create a database and table in Hologres
Connect to the Hologres instance. For more information, see Connect to an instance.
On the SQL editor tab, enter and execute the following SQL statements in a new temporary query.
-- Create a database CREATE DATABASE testdb; -- Create a table CREATE TABLE "public"."test" ( "id" text NULL, "name" text NULL); -- Insert data INSERT INTO public.test VALUES ('1001','jack'),('1002','tony'),('1003','mike'); -- Query data SELECT * FROM public.test
Step 4: Read from and write to Hologres using Serverless Spark
Example 1: SQL session
This example shows how to read from and write to Hologres in an SQL session.
Create an SQL session. For more information, see Manage SQL sessions.
When you create the session, select the network connection that you created in the previous step. Then, add the following parameters in the Spark Configurations section to load the hologres-connector-spark.
# Add the hologres-connector JAR. spark.emr.serverless.user.defined.jars oss://<bucket>/hologres-connector-spark-3.x-<version>.jar # Configure the Hologres catalog. spark.sql.catalog.hologres_external_test_db com.alibaba.hologres.spark3.HoloTableCatalog spark.sql.catalog.hologres_external_test_db.username *** spark.sql.catalog.hologres_external_test_db.password *** spark.sql.catalog.hologres_external_test_db.jdbcurl jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/testdbThe parameters are described as follows:
Parameter
Example
Description
spark.emr.serverless.user.defined.jarsoss://<bucket>/hologres-connector-spark-3.x-<version>.jarSpecifies the path of the user-defined JAR package.
spark.sql.catalog.hologres_external_test_dbcom.alibaba.hologres.spark3.HoloTableCatalogConfigures the Hologres data source as an external catalog in Spark 3.x. This is a static field.
spark.sql.catalog.hologres_external_test_db.usernameLTAI******The AccessKey ID of your Alibaba Cloud account. We recommend managing sensitive information as ciphertext. For more information, see Manage sensitive information as ciphertext.
spark.sql.catalog.hologres_external_test_db.passwordmXYV******The AccessKey secret of your Alibaba Cloud account. We recommend managing sensitive information as ciphertext. For more information, see Manage sensitive information as ciphertext.
spark.sql.catalog.hologres_external_test_db.jdbcurljdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_dbThe Java Database Connectivity (JDBC) connection URL of the Hologres instance.
The
hologres_external_test_dbpart of the parameter name can be customized.On the Data Development page, create a SparkSQL job. Then, in the upper-right corner, select the SQL session that you created.
For more information, see Develop a SparkSQL job.
Copy the following code to the new SparkSQL tab and click Run.
-- Use the testdb database. USE hologres_external_test_db; -- Write data. INSERT INTO `public`.test VALUES ('1004','tom'); -- Query data. SELECT * FROM `public`.test;
Example 2: Streaming job
This example shows how to use a PySpark streaming job to read data from Kafka and write it to Hologres.
Ensure that Kafka and Hologres can connect over the network. To do this, deploy Kafka and Hologres in the same VPC and vSwitch.
The following is a code sample. Replace the Kafka information and Hologres table name with your actual information.
from pyspark.sql import SparkSession from pyspark.sql.functions import col # Configure your Kafka information. servers = "alikafka-serverless-cn-xxxxx-vpc.alikafka.aliyuncs.com:9092" # Replace with your Kafka bootstrap servers. topic = "topic-name" # Replace with your Kafka topic. # Create a SparkSession. spark = SparkSession.builder \ .appName("test read kafka") \ .getOrCreate() # Read the Kafka stream. df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", servers) \ .option("subscribe", topic) \ .load() # Define a function to write to Hologres (invoked for each micro-batch). def write_to_hologres(batch_df, batch_id): print(f"Writing batch {batch_id} to Hologres...") batch_df.write \ .format("hologres") \ .mode("append") \ .insertInto("hologres_external_test_db.public.test") # Replace with your Hologres table. # Convert the key and value to strings and write the stream. query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ .writeStream \ .foreachBatch(write_to_hologres) \ .outputMode("append") \ .trigger(processingTime='30 seconds') \ .start() # Wait for the streaming query to terminate (this blocks cell execution in a notebook). query.awaitTermination()Upload the file.
On the Files page, click Upload File.
In the Upload File dialog box, click the upload area to select the Python code file that you created, or drag the file to the upload area.
Create and run the streaming job.
On the Data Development page, click the
(Create) icon.In the dialog box that appears, enter a Name, select PySpark for Streaming Job, and then click OK.
On the new development tab, configure the following parameters and leave the others at their default settings. Then, click Publish.
Parameter
Description
Main Python Resources
Select the Python file that you uploaded in the previous step.
Engine Version
Select a suitable Spark version. This example uses
esr-4.6.0.Network Connection
Select the network created in Step 2.
Spark Configurations
# Add the hologres-connector JAR. spark.emr.serverless.user.defined.jars oss://shulang-emr/test_script/hologres-connector-spark-3.x-1.5.6-jar-with-dependencies.jar # Configure the Hologres catalog. spark.sql.catalog.hologres_external_test_db com.alibaba.hologres.spark3.HoloTableCatalog spark.sql.catalog.hologres_external_test_db.username *** spark.sql.catalog.hologres_external_test_db.password *** spark.sql.catalog.hologres_external_test_db.jdbcurl jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/testdbFor a detailed description of the parameters, see Example 1: SQL session.
After the job is published, click Go to O&M. On the page that appears, click Start.
Verify the result.
Send a message from Kafka.

Run a SparkSQL query.

Example 3: Notebook session
Create a Notebook session. For more information, see Manage SQL sessions.
When you create the session, select the network connection that you created in the previous step. Then, add the following parameter in the Spark Configurations section to load the hologres-connector-spark.
# Add the hologres-connector JAR. spark.emr.serverless.user.defined.jars oss://<bucket>/hologres-connector-spark-3.x-<version>.jarOn the Data Development page, create a Notebook job. Then, in the upper-right corner, select the Notebook session that you created.
Copy the following code to the new Notebook tab and click
.import pandas as pd from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType # 1. Prepare a Pandas DataFrame. pdf = pd.DataFrame({ "id": ["1006"], # The ID is a string. "name": ["sl"] # The name is a string. }) # 2. Convert to a PySpark DataFrame (optional: explicitly define the schema to ensure correct data types). schema = StructType([ StructField("id", StringType(), True), StructField("name", StringType(), True) ]) df = spark.createDataFrame(pdf, schema=schema) # Write to Hologres. df.write \ .format("hologres") \ .option("username", "LTAI******") \ .option("password", "mXYV******") \ .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test") \ .option("table", "test") \ .mode("append") \ .save() # Read data. readDf = spark.read\ .format("hologres") \ .option("username", "LTAI******") \ .option("password", "mXYV******") \ .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test") \ .option("table", "test") \ .load() readDf.select("id", "name").show(10)The parameters are described as follows:
Parameter
Example
Description
spark.sql.catalog.hologres_external_test_db.usernameLTAI******The AccessKey ID of your Alibaba Cloud account. We recommend managing sensitive information as ciphertext. For more information, see Manage sensitive information as ciphertext.
spark.sql.catalog.hologres_external_test_db.passwordmXYV******The AccessKey secret of your Alibaba Cloud account. We recommend managing sensitive information as ciphertext. For more information, see Manage sensitive information as ciphertext.
spark.sql.catalog.hologres_external_test_db.jdbcurljdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_dbThe JDBC connection URL of the Hologres instance.
Verify the result.

Common Hologres Catalog commands
In Serverless Spark, you can use a Hologres Catalog to connect a Hologres database to Spark SQL as an external Catalog. Each catalog is attached to a single Hologres database, and cross-database access is not supported. The internal logical organization of the catalog is consistent with Hologres:
Spark concept | Corresponding Hologres concept | Description |
Catalog | Database | For example, |
Namespace | Schema | Examples include |
Table | Table | You must explicitly specify |
Load a Hologres Catalog
A Hologres Catalog in Spark corresponds to a single Hologres database and cannot be changed during use.
USE hologres_external_test_db;Query all namespaces
A namespace in Spark corresponds to a schema in Hologres. The default schema is public. You can use the USE instruction to change the default schema.
-- View all namespaces in the Hologres Catalog, which correspond to all schemas in Hologres.
SHOW NAMESPACES;Query tables in a namespace
Query all tables.
SHOW TABLES;Query tables in a specific namespace.
USE test_schema; SHOW TABLES; -- Or use SHOW TABLES IN test_schema;
References
For more information about reading from and writing to Hologres using Spark, see Read from and write to Hologres using Spark.