全部產品
Search
文件中心

E-MapReduce:讀寫PostgreSQL

更新時間:Sep 17, 2025

Spark原生支援通過JDBC連接器訪問PostgreSQL。Serverless Spark的部分版本在啟動時會自動載入PostgreSQL JDBC驅動,因此您可以直接通過Serverless Spark的SQL會話、Spark批處理任務或Notebook等方式進行串連,以便進行資料的讀取與寫入操作。

前提條件

  • 已建立Serverless Spark工作空間,詳情請參見建立工作空間

  • 已建立PostgreSQL執行個體。

    您可以選擇自建的PostgreSQL執行個體,或選擇阿里雲提供的RDS PostgreSQL與PolarDB PostgreSQL資料庫。

    本文以阿里雲的RDS PostgreSQL為例,詳情請參見快速建立RDS PostgreSQL執行個體

注意事項

  • JDBC驅動版本要求:

    • 如果您使用的Serverless Spark引擎是以下版本,則無需準備PostgreSQL JDBC Driver,因為Serverless Spark已內建該驅動(版本號碼為42.7.6)。

      • esr-4.4.0及後續版本

      • esr-3.4.0及後續版本

      • esr-2.8.0及後續版本

    • 如果使用的是低於上述版本的引擎,則需要手動下載PostgreSQL JDBC Driver並上傳至OSS,同時在會話管理Spark配置中填寫以下參數。

      spark.emr.serverless.user.defined.jars oss://<bucket>/path/to/postgresql-<version>.jar
  • 網路設定:確保Serverless Spark能夠與PostgreSQL服務之間的網路互連。具體配置請參見EMR Serverless Spark與其他VPC間網路互連

    說明

    配置安全性群組規則時,請根據實際需求選擇性開放必要的連接埠範圍(1~65535)。本文樣本需開啟TCP 5432連接埠。

操作步驟

方式一:使用SQL會話

  1. 建立SQL會話。 在會話管理中建立SQL會話,並選擇預先配置的網路連接。具體配置請參見建立SQL會話

  2. 建立SQL任務。 在資料開發中建立SQL > SparkSQL類型的任務,使用以下SQL進行測試。

    CREATE TEMPORARY VIEW test
    USING org.apache.spark.sql.jdbc
    OPTIONS (
      url 'jdbc:postgresql://<jdbc_url>/<database>',
      dbtable '<schema>.<table>',
      user '<username>',
      password '<password>'
    );
    
    SELECT * FROM test;

    本文涉及參數說明如下所示。

    參數

    說明

    url

    JDBC連接字串,包含PostgreSQL主機地址、連接埠和資料庫名。

    填寫格式為jdbc:postgresql://<jdbc_url>/<database>,需替換為實際值。

    dbtable

    待讀取的資料庫表名,格式為<schema>.<table> 。

    user

    PostgreSQL資料庫使用者名稱。

    說明

    需具備目標表的讀取許可權。

    password

    PostgreSQL資料庫密碼。

    如果能夠正確輸出表的內容,則說明串連成功。

    image

  3. 插入資料。 請使用以下命令向表中插入資料。

    INSERT INTO test VALUES(4, 'd'),(5, 'e');
    SELECT * FROM test;

    如果能正確查詢到插入的資料,說明寫入功能正常。

    image

方式二:使用Notebook會話

  1. 建立Notebook會話。 在會話管理中建立Notebook會話,並選擇預先配置的網路連接。具體配置請參見建立Notebook會話

  2. 建立Notebook任務。 在資料開發中建立Python > Notebook類型的任務,使用以下Python代碼進行測試。

    df = spark.read \
      .format("jdbc") \
      .option("url", "jdbc:postgresql://<jdbc_url>/<database>") \
      .option("dbtable", "<schema>.<table>") \
      .option("user", "<username>") \
      .option("password", "<password>") \
      .load()
    df.show()

    如果能夠正確輸出表的內容,說明串連成功。

    image

  3. 插入資料。 請使用以下代碼向表中插入資料。

    df = spark.createDataFrame([(6, 'f'), (7, 'g')], ["id", "name"])
    df.write \
      .format("jdbc") \
      .mode("append") \
      .option("url", "jdbc:postgresql://<jdbc_url>/<database>") \
      .option("dbtable", "<schema>.<table>") \
      .option("user", "<username>") \
      .option("password", "<password>") \
      .save()
    df.show()

    涉及參數mode("append"),該參數指定寫入模式為追加模式,確保新資料被追加至目標表中,而不會覆蓋或刪除已存資料。

    如果能正確返回插入的資料,說明寫入功能正常。

    image

方式三:使用Spark批任務

  1. 編寫測試代碼。 使用以下Scala代碼編譯並打包為JAR檔案。

    package spark.test
    
    import org.apache.spark.sql.SparkSession
    
    object Main {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName("test")
          .getOrCreate()
          
        val newRows = spark.createDataFrame(Seq((6, "f"), (7, "g"))).toDF("id", "name")
        newRows.write.format("jdbc")
          .mode("append")
          .option("url", "jdbc:postgresql://<jdbc_url>/<database>")
          .option("dbtable", "<schema>.<table>")
          .option("user", "<username>")
          .option("password", "<password>")
          .save()
    
        spark.read.format("jdbc")
          .option("url", "jdbc:postgresql://<jdbc_url>/<database>")
          .option("dbtable", "<schema>.<table>")
          .option("user", "<username>")
          .option("password", "<password>")
          .load()
          .show()
    
        spark.stop()
      }
    }
  2. 建立批任務。 在資料開發中建立批任務 > JAR類型的任務,然後配置以下參數進行測試。

    • 主jar資源:選擇或者填寫打包好的JAR檔案地址。

    • Main Classspark.test.Main

    • 網路連接:選擇預先配置的網路連接。

  3. 查看驗證結果 任務執行後,您可以單擊下方運行記錄地區中的日誌探查,在Driver日誌Stdout頁簽,查看到PostgreSQL對應表中的內容。