AnalyticDB for MySQL is a service developed by Alibaba Cloud and used for real-time online analytical processing of large amounts of data with high concurrency. AnalyticDB for MySQL can analyze petabytes of data from multiple dimensions in milliseconds to provide you with data-driven insights into your business. This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access AnalyticDB for MySQL.

Prerequisites

  • Object Storage Service (OSS) is activated. For more information, see Activate OSS.
  • An AnalyticDB for MySQL cluster is created. For more information, see Create a cluster.
  • A database and table are created in the AnalyticDB for MySQL cluster, and data is inserted into the table. Sample statements:
    # Statement for creating a database:
    create database testdb
    
    # Statement for creating a table:
     CREATE TABLE testdb.test_table (
         name varchar(32) NULL,
         age INT  NULL,
         score DOUBLE NULL,
         primary key (name)
     )
      PARTITION by hash key(name) PARTITION NUM 10
    # Statements for inserting data into the table:
     INSERT INTO testdb.test_table VALUES('aliyun01', 1001, 10.1);
     INSERT INTO testdb.test_table VALUES('aliyun02', 1002, 10.2);
     INSERT INTO testdb.test_table VALUES('aliyun03', 1003, 10.3);
     INSERT INTO testdb.test_table VALUES('aliyun04', 1004, 10.4);
     INSERT INTO testdb.test_table VALUES('aliyun05', 1005, 10.5);
  • The security group ID and vSwitch ID that are used by the serverless Spark engine of DLA to access the AnalyticDB for MySQL cluster are obtained. For more information, see Access your VPC.
  • The vSwitch IP address that is used by the serverless Spark engine of DLA to access the AnalyticDB for MySQL cluster is added to a whitelist of the AnalyticDB for MySQL cluster. For more information, see Configure a whitelist.

Procedure

  1. Compile the following test code and the pom.xml file that contains the dependency required for accessing the AnalyticDB for MySQL cluster. Then, package the test code and dependency into a JAR file and upload this file to OSS.
    Sample test code:
    package com.aliyun.spark
    
    import java.util.Properties
    import org.apache.spark.sql.SparkSession
    
    object SparkOnADBMySQL {
    
      def main(args: Array[String]): Unit = {
        val url = args(0)
        val database = args(1)
        val tableName = args(2)
        val user = args(3)
        val password = args(4)
        val jdbcConnURL = s"jdbc:mysql://$url/$database"
    
        // The name of the table in the serverless Spark engine of DLA.
        var sparkTableName = args(5)
    
        val sparkSession = SparkSession
          .builder()
          .appName("scala spark on adb test")
          .getOrCreate()
    
        val driver = "com.mysql.cj.jdbc.Driver"
    
        // If the table exists, delete it.
        sparkSession.sql(s"drop table if exists $sparkTableName")
    
        // Execute the following SQL statement to create a table. The schema of the table is mapped to that in the AnalyticDB for MySQL database.
        val createCmd =
          s"""CREATE TABLE ${sparkTableName} USING org.apache.spark.sql.jdbc
             |    options (
             |    driver '$driver',
             |    url '$jdbcConnURL',
             |    dbtable '$tableName',
             |    user '$user',
             |    password '$password'
             |    )""".stripMargin
        println(s"createCmd: \n $createCmd")
        sparkSession.sql(createCmd)
        val querySql = "select * from " + sparkTableName + " limit 1"
        sparkSession.sql(querySql).show
    
        // Call the Dataset API operation.
        val connectionProperties = new Properties()
        connectionProperties.put("driver", driver)
        connectionProperties.put("user", user)
        connectionProperties.put("password", password)
        // Read data from the table in the AnalyticDB for MySQL database.
        var jdbcDf = sparkSession.read.jdbc(jdbcConnURL,
          s"$database.$tableName",
          connectionProperties)
        jdbcDf.select("name", "age", "score").show()
    
        val data =
          Seq(
            PersonADBMysql("bill", 30, 170.5D),
            PersonADBMysql("gate", 29, 200.3D)
          )
        val dfWrite = sparkSession.createDataFrame(data)
    
        // Write data to the table in the AnalyticDB for MySQL database.
        dfWrite
          .write
          .mode("append")
          .jdbc(jdbcConnURL, s"$database.$tableName", connectionProperties)
        jdbcDf.select("name", "age").show()
        sparkSession.stop()
      }
    }
    
    case class PersonADBMysql(name: String, age: Int, score: Double)
    Dependency in the pom.xml file of the AnalyticDB for MySQL cluster:
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.22</version>
            </dependency>
  2. Log on to the DLA console.
  3. In the top navigation bar, select the region where the AnalyticDB for MySQL cluster resides.
  4. In the left-side navigation pane, choose Serverless Spark > Submit job.
  5. On the Parameter Configuration page, click Create Job.
  6. In the Create Job dialog box, configure the parameters and click OK to create a Spark job.
    3
  7. In the Job List navigation tree, click the Spark job that you created and enter the following content of the job in the code editor. Replace the parameter values based on the following parameter descriptions. Then, click Save and Execute.
    {
        "args": [
            "am-xxx.ads.aliyuncs.com:3306",  # The VPC endpoint and port number of the AnalyticDB for MySQL cluster.
            "testdb",  # The name of the AnalyticDB for MySQL database.
            "test_table",  # The name of the table in the AnalyticDB for MySQL database.
            "xxx1",  # The username that is used to log on to the AnalyticDB for MySQL database.
            "xxx2",  # The password that is used to log on to the AnalyticDB for MySQL database.
            "spark_on_adbmysql_table"  # The name of the Spark table that is mapped to the table in the AnalyticDB for MySQL database.
        ],
        "file": "oss://spark_test/jars/adbmysql/spark-examples-0.0.1-SNAPSHOT.jar",   # The OSS directory where the test code is saved.
        "name": "adbmysql-test",
        "jars": [
            "oss://spark_test/jars/adbmysql/mysql-connector-java-8.0.22.jar"  # The OSS directory where the JAR file that contains the dependency of the test code is saved.
        ],
        "className": "com.aliyun.spark.SparkOnADBMySQL",
        "conf": {
            "spark.driver.resourceSpec": "small",  # The specifications of the Spark driver, which can be small, medium, large, or xlarge.
            "spark.executor.instances": 2,  # The number of Spark executors.
            "spark.executor.resourceSpec": "small", # The specifications of Spark executors, which can be small, medium, large, or xlarge.
            "spark.dla.eni.enable": "true",  # Specifies whether to enable an elastic network interface (ENI) for the VPC. If you want to access data of the VPC, set spark.dla.eni.enable to true.
            "spark.dla.eni.vswitch.id": "vsw-xxx",  # The ID of the vSwitch to which the AnalyticDB for MySQL cluster belongs.
            "spark.dla.eni.security.group.id": "sg-xxx"  # The ID of the security group to which the AnalyticDB for MySQL cluster belongs.
        }
    }

Result

After the job succeeds, find the job and click Log in the Operation column to view the logs of the job.