全部產品
Search
文件中心

E-MapReduce:讀寫MySQL

更新時間:Sep 19, 2025

Spark原生支援通過JDBC Connector訪問MySQL。Serverless Spark在啟動時將自動載入MySQL JDBC驅動(版本 8.0.33)。您可以通過SQL會話、批處理任務或Notebook等方式串連MySQL,從而實現資料的讀取與寫入操作。

前提條件

  • 已建立Serverless Spark工作空間,詳情請參見建立工作空間

  • 已建立MySQL執行個體。

    您可以選擇自建的MySQL執行個體,或選擇阿里雲提供的RDS MySQL與PolarDB MySQL資料庫。

    本文以阿里雲的RDS MySQL為例,詳情請參見建立RDS MySQL執行個體與設定資料庫

注意事項

確保Serverless Spark能夠與MySQL之間的網路互連。具體配置請參見EMR Serverless Spark與其他VPC間網路互連

說明

配置安全性群組規則時,請根據實際需求選擇性開放必要的連接埠範圍(1~65535)。本文樣本需開啟TCP 3306連接埠。

操作步驟

方式一:使用SQL會話

  1. 建立SQL會話。 在會話管理中建立SQL會話,並選擇預先配置的網路連接。具體配置請參見建立SQL會話

  2. 建立SQL任務。 在資料開發中建立SparkSQL類型的任務,使用以下SQL進行測試。

    CREATE TEMPORARY VIEW test
    USING org.apache.spark.sql.jdbc
    OPTIONS (
      url 'jdbc:mysql://<jdbc_url>/',
      dbtable '<db>.<table>',
      user '<username>',
      password '<password>'
    );
    
    SELECT * FROM test;

    涉及參數說明如下所示。

    參數

    說明

    url

    JDBC連接字串。填寫格式為jdbc:mysql://<jdbc_url>/,需替換<jdbc_url>為實際值。

    dbtable

    待讀取的資料庫表名,格式為<db>.<table> 。本文樣本為test_mysql_db.test

    user

    MySQL資料庫使用者名稱。

    說明

    需具備目標表的讀取許可權。

    password

    MySQL資料庫密碼。

    如果能夠正確輸出表的內容,則說明串連成功。

    image

  3. 插入資料。 請使用以下命令向MySQL表中插入資料。

    INSERT INTO test VALUES(4, 'd'),(5, 'e');
    SELECT * FROM test;

    如果能正確查詢到插入的資料,說明寫入功能正常。

    image

方式二:使用Notebook會話

  1. 建立Notebook會話。 在會話管理中建立Notebook會話,並選擇預先配置的網路連接。具體配置請參見建立Notebook會話

  2. 建立Notebook任務。 在資料開發中建立互動式開發 > Notebook類型的任務,使用以下Python代碼進行測試。

    df = spark.read \
      .format("jdbc") \
      .option("url", "jdbc:mysql://<jdbc_url>") \
      .option("dbtable", "<db>.<table>") \
      .option("user", "<username>") \
      .option("password", "<password>") \
      .load()
    df.show()

    如果能夠正確輸出表的內容,說明串連成功。

    image

  3. 插入資料。 請使用以下代碼向MySQL表中插入資料。

    df = spark.createDataFrame([(6, 'f'), (7, 'g')], ["id", "name"])
    df.write \
      .format("jdbc") \
      .mode("append") \
      .option("url", "jdbc:mysql://<jdbc_url>") \
      .option("dbtable", "<db>.<table>") \
      .option("user", "<username>") \
      .option("password", "<password>") \
      .save()
    df.show()
    

    涉及參數mode("append"),該參數指定寫入模式為追加模式,確保新資料被追加至目標表中,而不會覆蓋或刪除已存資料。

    如果能正確返回插入的資料,說明寫入功能正常。

    image

方式三:使用Spark批任務

  1. 編寫測試代碼。 使用以下Scala代碼編譯並打包為JAR檔案。

    package spark.test
    
    import org.apache.spark.sql.SparkSession
    
    object Main {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName("test")
          .getOrCreate()
    
        val newRows = spark.createDataFrame(Seq((6, "f"), (7, "g"))).toDF("id", "name")
        newRows.write.format("jdbc")
          .mode("append")
          .option("url", "jdbc:mysql://<jdbc_url>")
          .option("dbtable", "<db>.<table>")
          .option("user", "<username>")
          .option("password", "<password>")
          .save()
        
        spark.read.format("jdbc")
          .option("url", "jdbc:mysql://<jdbc_url>")
          .option("dbtable", "<db>.<table>")
          .option("user", "<username>")
          .option("password", "<password>")
          .load()
          .show()
        
        spark.stop()
      }
    }
  2. 建立批任務。 在資料開發中建立批任務 > JAR類型的任務,然後配置以下參數進行測試。具體配置請參見批任務或流任務開發

    • 主jar資源:選擇或者填寫打包好的JAR檔案地址。

    • Main Classspark.test.Main

    • 網路連接:選擇預先配置的網路連接。

  3. 查看驗證結果。 任務執行後,您可以單擊下方運行記錄地區中的日誌探查,在Driver日誌Stdout頁簽,查看到MySQL對應庫表中的內容。