PolarDB for MySQL is a next-generation relational cloud database developed by Alibaba Cloud. It is fully compatible with MySQL and can be used for enterprises in diverse scenarios. This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access PolarDB for MySQL.

Prerequisites

  • Object Storage Service (OSS) is activated. For more information, see Activate OSS.
  • A PolarDB for MySQL instance is created. For more information, see Databases.
  • A data table is created in the PolarDB for MySQL instance and data is inserted into the table. Sample statements:
    #Execute the following statement to create a table:
    CREATE TABLE `testdb`.`test_table` (
     `name` varchar(32) NULL,
     `age` INT  NULL,
     `score` DOUBLE NULL
    )
    #Execute the following statements to insert data into the table:
    INSERT INTO `testdb`.`test_table` VALUES('aliyun01', 1001, 10.1);
    INSERT INTO `testdb`.`test_table` VALUES('aliyun02', 1002, 10.2);
    INSERT INTO `testdb`.`test_table` VALUES('aliyun03', 1003, 10.3);
    INSERT INTO `testdb`.`test_table` VALUES('aliyun04', 1004, 10.4);
    INSERT INTO `testdb`.`test_table` VALUES('aliyun05', 1005, 10.5);
  • You have obtained the security group ID and vSwitch ID that are required for the serverless Spark engine of DLA to access the PolarDB for MySQL instance. For more information, see Access your VPC.
  • The security group ID or vSwitch ID that you obtained is added to a whitelist of the cluster to which the PolarDB for MySQL instance belongs. For more information, see Configure a whitelist for a cluster.

Procedure

  1. Compile the following test code and download the required the POM file that contains the required dependency to access the PolarDB for MySQL instance. Then, package the test code and dependency into a JAR file and upload this file to OSS.
    Sample test code:
    package com.aliyun.spark
    
    import java.util.Properties
    import org.apache.spark.sql.SparkSession
    
    object SparkOnPOLARDB {
    
      def main(args: Array[String]): Unit = {
        //Obtain the URL, database name, and table name of the PolarDB for MySQL instance, and the username and password that are used to log on to the PolarDB for MySQL database.
        val url = args(0)
        val jdbcConnURL = s"jdbc:mysql://$url"
        val database = args(1)
        val tableName = args(2)
        val user = args(3)
        val password = args(4)
    
        //The name of the table on the serverless Spark engine of DLA.
        var sparkTableName = args(5)
    
        val sparkSession = SparkSession
          .builder()
          .appName("scala spark on POLARDB test")
          .getOrCreate()
    
        val driver = "com.mysql.cj.jdbc.Driver"
    
        //If the table exists, delete it.
        sparkSession.sql(s"drop table if exists $sparkTableName")
    
        //Execute SQL statements to create a Spark table that is mapped to the schema of the PolarDB for MySQL table.
        val createCmd =
          s"""CREATE TABLE ${sparkTableName} USING org.apache.spark.sql.jdbc
             |    options (
             |    driver '$driver',
             |    url '$jdbcConnURL',
             |    dbtable '$database.$tableName',
             |    user '$user',
             |    password '$password'
             |    )""".stripMargin
        println(s"createCmd: \n $createCmd")
        sparkSession.sql(createCmd)
        val querySql = "select * from " + sparkTableName + " limit 1"
        sparkSession.sql(querySql).show
    
        //Call the Dataset API operation.
        val connectionProperties = new Properties()
        connectionProperties.put("driver", driver)
        connectionProperties.put("user", user)
        connectionProperties.put("password", password)
        //Read data from the PolarDB for MySQL database.
        var jdbcDf = sparkSession.read.jdbc(jdbcConnURL,
          s"$database.$tableName",
          connectionProperties)
        jdbcDf.select("name", "age", "score").show()
    
        val data =
          Seq(
            PersonPolardb("bill", 30, 170.5),
            PersonPolardb("gate", 29, 200.3)
          )
        val dfWrite = sparkSession.createDataFrame(data)
    
        //Write data to the PolarDB for MySQL database.
        dfWrite
          .write
          .mode("append")
          .jdbc(jdbcConnURL, s"$database.$tableName", connectionProperties)
        jdbcDf.select("name", "age").show()
        sparkSession.stop()
      }
    }
    
    case class PersonPolardb(name: String, age: Int, score: Double)
    POM file that contains the dependency of PolarDB for MySQL:
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.22</version>
            </dependency>
  2. Log on to the DLA console.
  3. In the top navigation bar, select the region where the PolarDB for MySQL instance resides.
  4. In the left-side navigation pane, choose Serverless Spark > Submit job.
  5. On the Parameter Configuration page, click Create Job.
  6. In the Create Job dialog box, configure the parameters as prompted and click OK to create a Spark job.
    3
  7. In the Job List navigation tree, click the name of the Spark job that you created. In the code editor, enter the following content of the job. Replace the parameter values as required based on the following parameter description. Then, save and submit the job.
    {
        "args": [
            "pc-xxx.mysql.polardb.rds.aliyuncs.com:3306",  #Internal endpoint and port that are used to access the PolarDB for MySQL instance.
            "testdb",  #The name of the PolarDB for MySQL database.
            "test_table",  #The name of the table in the PolarDB for MySQL database.
            "xxx1", #The username that is used to log on to the PolarDB for MySQL database.
            "xxx2", #The username that is used to log on to the PolarDB for MySQL database.
            "spark_on_polardb_table"  #The name of the Spark table that is mapped to the table in the PolarDB for MySQL database.
        ],
        "file": "oss://spark_test/jars/polardb/spark-examples-0.0.1-SNAPSHOT.jar",  #The OSS directory in which the test code is saved.
        "name": "polardb-test",
        "jars": [
            "oss://spark_test/jars/polardb/mysql-connector-java-8.0.22.jar"  #The OSS directory in which the test code dependency is saved.
        ],
        "className": "com.aliyun.spark.SparkOnPOLARDB",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "small",
            "spark.dla.eni.enable": "true",
            "spark.dla.eni.vswitch.id": "vsw-xxx",   #The ID of the vSwitch in the VPC where the PolarDB for MySQL instance resides.
            "spark.dla.eni.security.group.id": "sg-xxx"  #The ID of the security group in the VPC where the PolarDB for MySQL instance resides.
        }
    }

Result

After the job succeeds, find the job and click Log in the Operation column to view the logs of the job.