AnalyticDB for PostgreSQL, previously known as HybridDB for PostgreSQL, is a fast, easy-to-use, and cost-effective warehousing service that can process petabytes of data. This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access AnalyticDB for PostgreSQL.

Prerequisites

  • Object Storage Service (OSS) is activated. For more information, see Activate OSS.
  • An AnalyticDB for PostgreSQL instance is created. For more information, see Create an instance.
  • A database and table are created in the AnalyticDB for PostgreSQL instance. Data is inserted into the table. Sample statements:
    # Statement for creating a database:
    create database testdb
    # Statement for creating a table:
    CREATE TABLE "test_table"
    (
     "name" varchar(32) ,
     "age" smallint ,
     "score" double precision 
    )
    WITH (
        FILLFACTOR = 100,
        OIDS = FALSE
    )
    ;
    ALTER TABLE "test_table" OWNER TO testuser;
    # Statements for inserting data into the table:
    INSERT INTO "test_table" VALUES('aliyun01', 101, 10.0);
    INSERT INTO "test_table" VALUES('aliyun02', 102, 10.0);
    INSERT INTO "test_table" VALUES('aliyun03', 103, 10.0);
    INSERT INTO "test_table" VALUES('aliyun04', 104, 10.0);
    INSERT INTO "test_table" VALUES('aliyun05', 105, 10.0);
  • The security group ID and vSwitch ID required for the serverless Spark engine of DLA to access the AnalyticDB for PostgreSQL instance are obtained. For more information, see Access your VPC.
  • The vSwitch IP address required for the serverless Spark engine of DLA to access the AnalyticDB for PostgreSQL instance is added to a whitelist of this instance. For more information, see Configure an IP address whitelist for an AnalyticDB for PostgreSQL instance.

Procedure

  1. Compile the following test code and the pom.xml file that contains the dependency required for accessing the AnalyticDB for PostgreSQL instance. Then, package the test code and dependency into a JAR file and upload this file to OSS.
    Sample test code:
    package com.aliyun.spark
    
    import java.util.Properties
    import org.apache.spark.sql.SparkSession
    
    object SparkOnADBPostgreSQL {
      def main(args: Array[String]): Unit = {
        val url = args(0)
        val database = args(1)
        val jdbcConnURL = s"jdbc:postgresql://$url/$database"
        var schemaName = args(2)
        val tableName = args(3)
        val user = args(4)
        val password = args(5)
    
        // The name of the table in the serverless Spark engine of DLA.
        var sparkTableName = args(6)
    
        val sparkSession = SparkSession
          .builder()
          .appName("scala spark on adb test")
          .getOrCreate()
    
        val driver = "org.postgresql.Driver"
    
        // If the table exists, delete it.
        sparkSession.sql(s"drop table if exists $sparkTableName")
    
        // Execute the following SQL statement to create a table. The schema of the table is mapped to that in the AnalyticDB for PostgreSQL database.
        val createCmd =
          s"""CREATE TABLE ${sparkTableName} USING org.apache.spark.sql.jdbc
             |    options (
             |    driver '$driver',
             |    url '$jdbcConnURL',
             |    dbtable '$schemaName.$tableName',
             |    user '$user',
             |    password '$password'
             |    )""".stripMargin
        println(s"createCmd: \n $createCmd")
        sparkSession.sql(createCmd)
        val querySql = "select * from " + sparkTableName + " limit 1"
        sparkSession.sql(querySql).show
    
    
        // Call the Dataset API operation.
        val connectionProperties = new Properties()
        connectionProperties.put("driver", driver)
        connectionProperties.put("user", user)
        connectionProperties.put("password", password)
        // Read data from the AnalyticDB for PostgreSQL database.
        var jdbcDf = sparkSession.read.jdbc(jdbcConnURL,
          s"$database.$schemaName.$tableName",
          connectionProperties)
        jdbcDf.select("name", "age", "score").show()
    
        val data =
          Seq(
            PersonADBPG("bill", 30, 170.5D),
            PersonADBPG("gate", 29, 200.3D)
          )
        val dfWrite = sparkSession.createDataFrame(data)
    
        //Write data to the AnalyticDB for PostgreSQL database.
        dfWrite
          .write
          .mode("append")
          .jdbc(jdbcConnURL, s"$database.$schemaName.$tableName", connectionProperties)
        jdbcDf.select("name", "age").show()
        sparkSession.stop()
      }
    }
    
    case class PersonADBPG(name: String, age: Int, score: Double)
    Dependency in the pom.xml file of the AnalyticDB for PostgreSQL instance:
            <dependency>
                <groupId>org.postgresql</groupId>
                <artifactId>postgresql</artifactId>
                <version>42.2.5</version>
            </dependency>
  2. Log on to the DLA console.
  3. In the top navigation bar, select the region where the AnalyticDB for PostgreSQL instance resides.
  4. In the left-side navigation pane, choose Serverless Spark > Submit job.
  5. On the Parameter Configuration page, click Create Job.
  6. In the Create Job dialog box, configure the parameters and click OK to create a Spark job.
    3
  7. In the Job List navigation tree, click the Spark job that you created and enter the following content of the job in the code editor. Replace the parameter values based on the following parameter descriptions. Then, click Save and Execute.
    {
        "args": [
            "gp-xxx-master.gpdbmaster.rds.aliyuncs.com:5432",  # The internal endpoint and port number of the AnalyticDB for PostgreSQL instance.
            "testdb",  # The name of the database in the AnalyticDB for PostgreSQL instance.
            "public",  # The name of the schema in the database of the AnalyticDB for PostgreSQL instance.
            "test_table",  # The name of the table in the database of the AnalyticDB for PostgreSQL instance.
            "xxx1",  # The username that is used to log on to the database in the AnalyticDB for PostgreSQL instance.
            "xxx2",  # The password that is used to log on to the database in the AnalyticDB for PostgreSQL instance.
            "spark_on_adbpg_table"  # The name of the table in the serverless Spark engine of DLA. This table is mapped to the table in the database of the AnalyticDB for PostgreSQL instance.
        ],
        "file": "oss://spark_test/jars/adbpg/spark-examples-0.0.1-SNAPSHOT.jar",  # The OSS directory where the test code is saved.
        "name": "adbpg-test",
        "jars": [
            "oss://spark_test/jars/adbpg/postgresql-42.2.5.jar"  # The OSS directory where the JAR file that contains the dependency of the test code is saved.
        ],
        "className": "com.aliyun.spark.SparkOnADBPostgreSQL",
        "conf": {
            "spark.driver.resourceSpec": "small",  # The specifications of the Spark driver, which can be small, medium, large, or xlarge.
            "spark.executor.instances": 2,  # The number of Spark executors.
            "spark.executor.resourceSpec": "small",  # The specifications of Spark executors, which can be small, medium, large, or xlarge.
            "spark.dla.eni.enable": "true",  # Specifies whether to enable an elastic network interface (ENI) for the VPC. If you want to access data of the VPC, set spark.dla.eni.enable to true.
            "spark.dla.eni.vswitch.id": "vsw-xxx",  # The ID of the vSwitch to which the AnalyticDB for PostgreSQL instance belongs.
            "spark.dla.eni.security.group.id": "sg-xxx"  # The ID of the security group to which the AnalyticDB for PostgreSQL instance belongs.
        }
    }

Result

After the job succeeds, find the job and click Log in the Operation column to view the logs of the job.