All Products
Search
Document Center

E-MapReduce:Read from and write to Hologres

Last Updated:Feb 04, 2026

EMR Serverless Spark can connect to Hologres using specific configurations and the Spark Connector provided by Hologres. This topic describes how to read data from and write data to Hologres in an EMR Serverless Spark environment.

Limits

Only Hologres instances version 1.3 or later support the Spark connector. You can view your current instance version on the Instance Details page in the Hologres Management Console. If your instance runs a version earlier than 1.3, use Instance Upgrade or join the Hologres communication group by searching for DingTalk group number 32314975 to request an upgrade.

Procedure

Step 1: Obtain the hologres-connector-spark JAR and upload it to OSS

  1. To read from and write to Hologres, Spark requires a connector JAR package. You can download the package from the Maven Central Repository. This topic uses version 1.5.6 as an example. Click the attachment hologres-connector-spark-3.x-1.5.6-jar-with-dependencies.jar to download it.

  2. Upload the downloaded hologres-connector-spark JAR package to Alibaba Cloud OSS. For more information about uploading files, see Simple upload.

Step 2: Add a network connection

  1. Obtain network information.

    Navigate to the Hologres page and open the instance details page of the target Hologres instance to obtain its VPC and vSwitch information.

  2. Add a new network connection.

    Serverless Spark requires network connectivity with the Hologres cluster to access the Hologres service. For more information about network connectivity, see Network connectivity between EMR Serverless Spark and other VPCs.

Step 3: Create a database and table in Hologres

  1. Connect to the Hologres instance. For more information, see Connect to an instance.

  2. On the SQL editor tab, enter and execute the following SQL statements in a new temporary query.

    -- Create a database
    CREATE DATABASE testdb;
    -- Create a table
    CREATE TABLE "public"."test" (
        "id" text  NULL,
        "name" text  NULL);
    -- Insert data
    INSERT INTO public.test VALUES ('1001','jack'),('1002','tony'),('1003','mike');
    -- Query data
    SELECT * FROM public.test 

    image

Step 4: Read from and write to Hologres using Serverless Spark

Example 1: SQL session

This example shows how to read from and write to Hologres in an SQL session.

  1. Create an SQL session. For more information, see Manage SQL sessions.

    When you create the session, select the network connection that you created in the previous step. Then, add the following parameters in the Spark Configurations section to load the hologres-connector-spark.

    # Add the hologres-connector JAR.
    spark.emr.serverless.user.defined.jars oss://<bucket>/hologres-connector-spark-3.x-<version>.jar
    
    # Configure the Hologres catalog.
    spark.sql.catalog.hologres_external_test_db com.alibaba.hologres.spark3.HoloTableCatalog
    spark.sql.catalog.hologres_external_test_db.username ***
    spark.sql.catalog.hologres_external_test_db.password ***
    spark.sql.catalog.hologres_external_test_db.jdbcurl jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/testdb

    The parameters are described as follows:

    Parameter

    Example

    Description

    spark.emr.serverless.user.defined.jars

    oss://<bucket>/hologres-connector-spark-3.x-<version>.jar

    Specifies the path of the user-defined JAR package.

    spark.sql.catalog.hologres_external_test_db

    com.alibaba.hologres.spark3.HoloTableCatalog

    Configures the Hologres data source as an external catalog in Spark 3.x. This is a static field.

    spark.sql.catalog.hologres_external_test_db.username

    LTAI******

    The AccessKey ID of your Alibaba Cloud account. We recommend managing sensitive information as ciphertext. For more information, see Manage sensitive information as ciphertext.

    spark.sql.catalog.hologres_external_test_db.password

    mXYV******

    The AccessKey secret of your Alibaba Cloud account. We recommend managing sensitive information as ciphertext. For more information, see Manage sensitive information as ciphertext.

    spark.sql.catalog.hologres_external_test_db.jdbcurl

    jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db

    The Java Database Connectivity (JDBC) connection URL of the Hologres instance.

    The hologres_external_test_db part of the parameter name can be customized.

  2. On the Data Development page, create a SparkSQL job. Then, in the upper-right corner, select the SQL session that you created.

    For more information, see Develop a SparkSQL job.

  3. Copy the following code to the new SparkSQL tab and click Run.

    -- Use the testdb database.
    USE hologres_external_test_db;
    -- Write data.
    INSERT INTO `public`.test VALUES ('1004','tom');
    -- Query data.
    SELECT * FROM `public`.test;

    image

Example 2: Streaming job

This example shows how to use a PySpark streaming job to read data from Kafka and write it to Hologres.

Note

Ensure that Kafka and Hologres can connect over the network. To do this, deploy Kafka and Hologres in the same VPC and vSwitch.

  1. The following is a code sample. Replace the Kafka information and Hologres table name with your actual information.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    
    # Configure your Kafka information.
    servers = "alikafka-serverless-cn-xxxxx-vpc.alikafka.aliyuncs.com:9092"  # Replace with your Kafka bootstrap servers.
    topic = "topic-name"  # Replace with your Kafka topic.
    
    # Create a SparkSession.
    spark = SparkSession.builder \
        .appName("test read kafka") \
        .getOrCreate()
    
    # Read the Kafka stream.
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", servers) \
        .option("subscribe", topic) \
        .load()
    
    # Define a function to write to Hologres (invoked for each micro-batch).
    def write_to_hologres(batch_df, batch_id):
        print(f"Writing batch {batch_id} to Hologres...")
        batch_df.write \
            .format("hologres") \
            .mode("append") \
            .insertInto("hologres_external_test_db.public.test")  # Replace with your Hologres table.
    
    # Convert the key and value to strings and write the stream.
    query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
        .writeStream \
        .foreachBatch(write_to_hologres) \
        .outputMode("append") \
        .trigger(processingTime='30 seconds') \
        .start()
    
    # Wait for the streaming query to terminate (this blocks cell execution in a notebook).
    query.awaitTermination()
  2. Upload the file.

    1. On the Files page, click Upload File.

    2. In the Upload File dialog box, click the upload area to select the Python code file that you created, or drag the file to the upload area.

  3. Create and run the streaming job.

    1. On the Data Development page, click the image (Create) icon.

    2. In the dialog box that appears, enter a Name, select PySpark for Streaming Job, and then click OK.

    3. On the new development tab, configure the following parameters and leave the others at their default settings. Then, click Publish.

      Parameter

      Description

      Main Python Resources

      Select the Python file that you uploaded in the previous step.

      Engine Version

      Select a suitable Spark version. This example uses esr-4.6.0.

      Network Connection

      Select the network created in Step 2.

      Spark Configurations

      # Add the hologres-connector JAR.
      spark.emr.serverless.user.defined.jars              oss://shulang-emr/test_script/hologres-connector-spark-3.x-1.5.6-jar-with-dependencies.jar
      # Configure the Hologres catalog.
      spark.sql.catalog.hologres_external_test_db com.alibaba.hologres.spark3.HoloTableCatalog
      spark.sql.catalog.hologres_external_test_db.username ***
      spark.sql.catalog.hologres_external_test_db.password ***
      spark.sql.catalog.hologres_external_test_db.jdbcurl jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/testdb

      For a detailed description of the parameters, see Example 1: SQL session.

    4. After the job is published, click Go to O&M. On the page that appears, click Start.

  4. Verify the result.

    1. Send a message from Kafka.

      image

    2. Run a SparkSQL query.image

Example 3: Notebook session

  1. Create a Notebook session. For more information, see Manage SQL sessions.

    When you create the session, select the network connection that you created in the previous step. Then, add the following parameter in the Spark Configurations section to load the hologres-connector-spark.

    # Add the hologres-connector JAR.
    spark.emr.serverless.user.defined.jars oss://<bucket>/hologres-connector-spark-3.x-<version>.jar
  2. On the Data Development page, create a Notebook job. Then, in the upper-right corner, select the Notebook session that you created.

  3. Copy the following code to the new Notebook tab and click image.

    import pandas as pd
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
    
    # 1. Prepare a Pandas DataFrame.
    pdf = pd.DataFrame({
        "id": ["1006"],                 # The ID is a string.
        "name": ["sl"]        # The name is a string.
    })
    
    # 2. Convert to a PySpark DataFrame (optional: explicitly define the schema to ensure correct data types).
    schema = StructType([
        StructField("id", StringType(), True),
        StructField("name", StringType(), True)
    ])
    
    df = spark.createDataFrame(pdf, schema=schema)
    
    # Write to Hologres.
    df.write \
      .format("hologres") \
      .option("username", "LTAI******") \
      .option("password", "mXYV******") \
      .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test") \
      .option("table", "test") \
      .mode("append") \
      .save()
    
    # Read data.
    readDf = spark.read\
      .format("hologres") \
      .option("username", "LTAI******") \
      .option("password", "mXYV******") \
      .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test") \
      .option("table", "test") \
      .load()
    
    readDf.select("id", "name").show(10)

    The parameters are described as follows:

    Parameter

    Example

    Description

    spark.sql.catalog.hologres_external_test_db.username

    LTAI******

    The AccessKey ID of your Alibaba Cloud account. We recommend managing sensitive information as ciphertext. For more information, see Manage sensitive information as ciphertext.

    spark.sql.catalog.hologres_external_test_db.password

    mXYV******

    The AccessKey secret of your Alibaba Cloud account. We recommend managing sensitive information as ciphertext. For more information, see Manage sensitive information as ciphertext.

    spark.sql.catalog.hologres_external_test_db.jdbcurl

    jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db

    The JDBC connection URL of the Hologres instance.

  4. Verify the result.image

Common Hologres Catalog commands

In Serverless Spark, you can use a Hologres Catalog to connect a Hologres database to Spark SQL as an external Catalog. Each catalog is attached to a single Hologres database, and cross-database access is not supported. The internal logical organization of the catalog is consistent with Hologres:

Spark concept

Corresponding Hologres concept

Description

Catalog

Database

For example, hologres_external_test_db maps to the test_db database in Hologres.

Namespace

Schema

Examples include public and test_schema. The default is public. You can use USE to switch the current default namespace.

Table

Table

You must explicitly specify namespace.table_name, such as public.test, or run USE namespace first and then directly reference the table name.

Load a Hologres Catalog

A Hologres Catalog in Spark corresponds to a single Hologres database and cannot be changed during use.

USE hologres_external_test_db;

Query all namespaces

A namespace in Spark corresponds to a schema in Hologres. The default schema is public. You can use the USE instruction to change the default schema.

-- View all namespaces in the Hologres Catalog, which correspond to all schemas in Hologres.
SHOW NAMESPACES;

Query tables in a namespace

  • Query all tables.

    SHOW TABLES;
  • Query tables in a specific namespace.

    USE test_schema;
    SHOW TABLES;
    
    -- Or use 
    SHOW TABLES IN test_schema;

References

For more information about reading from and writing to Hologres using Spark, see Read from and write to Hologres using Spark.