Spark原生支援通過JDBC Connector訪問MySQL。Serverless Spark在啟動時將自動載入MySQL JDBC驅動(版本 8.0.33)。您可以通過SQL會話、批處理任務或Notebook等方式串連MySQL,從而實現資料的讀取與寫入操作。
前提條件
已建立Serverless Spark工作空間,詳情請參見建立工作空間。
已建立MySQL執行個體。
您可以選擇自建的MySQL執行個體,或選擇阿里雲提供的RDS MySQL與PolarDB MySQL資料庫。
本文以阿里雲的RDS MySQL為例,詳情請參見建立RDS MySQL執行個體與設定資料庫。
注意事項
確保Serverless Spark能夠與MySQL之間的網路互連。具體配置請參見EMR Serverless Spark與其他VPC間網路互連。
配置安全性群組規則時,請根據實際需求選擇性開放必要的連接埠範圍(1~65535)。本文樣本需開啟TCP 3306連接埠。
操作步驟
方式一:使用SQL會話
建立SQL會話。 在會話管理中建立SQL會話,並選擇預先配置的網路連接。具體配置請參見建立SQL會話。
建立SQL任務。 在資料開發中建立SparkSQL類型的任務,使用以下SQL進行測試。
CREATE TEMPORARY VIEW test USING org.apache.spark.sql.jdbc OPTIONS ( url 'jdbc:mysql://<jdbc_url>/', dbtable '<db>.<table>', user '<username>', password '<password>' ); SELECT * FROM test;涉及參數說明如下所示。
參數
說明
urlJDBC連接字串。填寫格式為
jdbc:mysql://<jdbc_url>/,需替換<jdbc_url>為實際值。dbtable待讀取的資料庫表名,格式為
<db>.<table>。本文樣本為test_mysql_db.test。userMySQL資料庫使用者名稱。
說明需具備目標表的讀取許可權。
passwordMySQL資料庫密碼。
如果能夠正確輸出表的內容,則說明串連成功。

插入資料。 請使用以下命令向MySQL表中插入資料。
INSERT INTO test VALUES(4, 'd'),(5, 'e'); SELECT * FROM test;如果能正確查詢到插入的資料,說明寫入功能正常。

方式二:使用Notebook會話
建立Notebook會話。 在會話管理中建立Notebook會話,並選擇預先配置的網路連接。具體配置請參見建立Notebook會話。
建立Notebook任務。 在資料開發中建立類型的任務,使用以下Python代碼進行測試。
df = spark.read \ .format("jdbc") \ .option("url", "jdbc:mysql://<jdbc_url>") \ .option("dbtable", "<db>.<table>") \ .option("user", "<username>") \ .option("password", "<password>") \ .load() df.show()如果能夠正確輸出表的內容,說明串連成功。

插入資料。 請使用以下代碼向MySQL表中插入資料。
df = spark.createDataFrame([(6, 'f'), (7, 'g')], ["id", "name"]) df.write \ .format("jdbc") \ .mode("append") \ .option("url", "jdbc:mysql://<jdbc_url>") \ .option("dbtable", "<db>.<table>") \ .option("user", "<username>") \ .option("password", "<password>") \ .save() df.show()涉及參數
mode("append"),該參數指定寫入模式為追加模式,確保新資料被追加至目標表中,而不會覆蓋或刪除已存資料。如果能正確返回插入的資料,說明寫入功能正常。

方式三:使用Spark批任務
編寫測試代碼。 使用以下Scala代碼編譯並打包為JAR檔案。
package spark.test import org.apache.spark.sql.SparkSession object Main { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("test") .getOrCreate() val newRows = spark.createDataFrame(Seq((6, "f"), (7, "g"))).toDF("id", "name") newRows.write.format("jdbc") .mode("append") .option("url", "jdbc:mysql://<jdbc_url>") .option("dbtable", "<db>.<table>") .option("user", "<username>") .option("password", "<password>") .save() spark.read.format("jdbc") .option("url", "jdbc:mysql://<jdbc_url>") .option("dbtable", "<db>.<table>") .option("user", "<username>") .option("password", "<password>") .load() .show() spark.stop() } }建立批任務。 在資料開發中建立類型的任務,然後配置以下參數進行測試。具體配置請參見批任務或流任務開發。
主jar資源:選擇或者填寫打包好的JAR檔案地址。
Main Class:
spark.test.Main。網路連接:選擇預先配置的網路連接。
查看驗證結果。 任務執行後,您可以單擊下方運行記錄地區中的日誌探查,在Driver日誌的Stdout頁簽,查看到MySQL對應庫表中的內容。