Spark原生支援通過JDBC連接器訪問PostgreSQL。Serverless Spark的部分版本在啟動時會自動載入PostgreSQL JDBC驅動,因此您可以直接通過Serverless Spark的SQL會話、Spark批處理任務或Notebook等方式進行串連,以便進行資料的讀取與寫入操作。
前提條件
已建立Serverless Spark工作空間,詳情請參見建立工作空間。
已建立PostgreSQL執行個體。
您可以選擇自建的PostgreSQL執行個體,或選擇阿里雲提供的RDS PostgreSQL與PolarDB PostgreSQL資料庫。
本文以阿里雲的RDS PostgreSQL為例,詳情請參見快速建立RDS PostgreSQL執行個體。
注意事項
JDBC驅動版本要求:
如果您使用的Serverless Spark引擎是以下版本,則無需準備PostgreSQL JDBC Driver,因為Serverless Spark已內建該驅動(版本號碼為42.7.6)。
esr-4.4.0及後續版本
esr-3.4.0及後續版本
esr-2.8.0及後續版本
如果使用的是低於上述版本的引擎,則需要手動下載PostgreSQL JDBC Driver並上傳至OSS,同時在會話管理的Spark配置中填寫以下參數。
spark.emr.serverless.user.defined.jars oss://<bucket>/path/to/postgresql-<version>.jar
網路設定:確保Serverless Spark能夠與PostgreSQL服務之間的網路互連。具體配置請參見EMR Serverless Spark與其他VPC間網路互連。
說明配置安全性群組規則時,請根據實際需求選擇性開放必要的連接埠範圍(1~65535)。本文樣本需開啟TCP
5432連接埠。
操作步驟
方式一:使用SQL會話
建立SQL會話。 在會話管理中建立SQL會話,並選擇預先配置的網路連接。具體配置請參見建立SQL會話。
建立SQL任務。 在資料開發中建立類型的任務,使用以下SQL進行測試。
CREATE TEMPORARY VIEW test USING org.apache.spark.sql.jdbc OPTIONS ( url 'jdbc:postgresql://<jdbc_url>/<database>', dbtable '<schema>.<table>', user '<username>', password '<password>' ); SELECT * FROM test;本文涉及參數說明如下所示。
參數
說明
urlJDBC連接字串,包含PostgreSQL主機地址、連接埠和資料庫名。
填寫格式為
jdbc:postgresql://<jdbc_url>/<database>,需替換為實際值。dbtable待讀取的資料庫表名,格式為
<schema>.<table>。userPostgreSQL資料庫使用者名稱。
說明需具備目標表的讀取許可權。
passwordPostgreSQL資料庫密碼。
如果能夠正確輸出表的內容,則說明串連成功。

插入資料。 請使用以下命令向表中插入資料。
INSERT INTO test VALUES(4, 'd'),(5, 'e'); SELECT * FROM test;如果能正確查詢到插入的資料,說明寫入功能正常。

方式二:使用Notebook會話
建立Notebook會話。 在會話管理中建立Notebook會話,並選擇預先配置的網路連接。具體配置請參見建立Notebook會話。
建立Notebook任務。 在資料開發中建立類型的任務,使用以下Python代碼進行測試。
df = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql://<jdbc_url>/<database>") \ .option("dbtable", "<schema>.<table>") \ .option("user", "<username>") \ .option("password", "<password>") \ .load() df.show()如果能夠正確輸出表的內容,說明串連成功。

插入資料。 請使用以下代碼向表中插入資料。
df = spark.createDataFrame([(6, 'f'), (7, 'g')], ["id", "name"]) df.write \ .format("jdbc") \ .mode("append") \ .option("url", "jdbc:postgresql://<jdbc_url>/<database>") \ .option("dbtable", "<schema>.<table>") \ .option("user", "<username>") \ .option("password", "<password>") \ .save() df.show()涉及參數
mode("append"),該參數指定寫入模式為追加模式,確保新資料被追加至目標表中,而不會覆蓋或刪除已存資料。如果能正確返回插入的資料,說明寫入功能正常。

方式三:使用Spark批任務
編寫測試代碼。 使用以下Scala代碼編譯並打包為JAR檔案。
package spark.test import org.apache.spark.sql.SparkSession object Main { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("test") .getOrCreate() val newRows = spark.createDataFrame(Seq((6, "f"), (7, "g"))).toDF("id", "name") newRows.write.format("jdbc") .mode("append") .option("url", "jdbc:postgresql://<jdbc_url>/<database>") .option("dbtable", "<schema>.<table>") .option("user", "<username>") .option("password", "<password>") .save() spark.read.format("jdbc") .option("url", "jdbc:postgresql://<jdbc_url>/<database>") .option("dbtable", "<schema>.<table>") .option("user", "<username>") .option("password", "<password>") .load() .show() spark.stop() } }建立批任務。 在資料開發中建立類型的任務,然後配置以下參數進行測試。
主jar資源:選擇或者填寫打包好的JAR檔案地址。
Main Class:
spark.test.Main。網路連接:選擇預先配置的網路連接。
查看驗證結果 任務執行後,您可以單擊下方運行記錄地區中的日誌探查,在Driver日誌的Stdout頁簽,查看到PostgreSQL對應表中的內容。