PolarDB-X is a cloud-native distributed database developed by Alibaba Cloud. This service is integrated with Distributed Relational Database Service (DRDS) and the self-developed distributed storage X-DB. Based on the integrated cloud-native architecture, this service supports a maximum of tens of millions of concurrent connections and hundreds of petabytes of mass data storage. This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access PolarDB-X.

Prerequisites

  • Object Storage Service (OSS) is activated. For more information, see Activate OSS.
  • A PolarDB-X database is created.
  • A table is created in the PolarDB-X database and data is inserted into the table. Sample statements:
    # Statement for creating a table:
    CREATE TABLE `testdb_drds`.`test_table` (
     `name` varchar(32) NULL,
     `age` INT  NULL,
     `score` DOUBLE NULL
    )
    # Statements for inserting data into the table:
    INSERT INTO `testdb_drds`.`test_table` VALUES('aliyun01', 1001, 10.1);
    INSERT INTO `testdb_drds`.`test_table` VALUES('aliyun02', 1002, 10.2);
    INSERT INTO `testdb_drds`.`test_table` VALUES('aliyun03', 1003, 10.3);
    INSERT INTO `testdb_drds`.`test_table` VALUES('aliyun04', 1004, 10.4);
    INSERT INTO `testdb_drds`.`test_table` VALUES('aliyun05', 1005, 10.5);
  • The security group ID and vSwitch ID that are used by the serverless Spark engine of DLA to access the PolarDB-X database are obtained. For more information, see Access your VPC.
  • The vSwitch IP address that is used by the serverless Spark engine of DLA to access the PolarDB-X database is added to a whitelist of the PolarDB-X database. For more information, see Set an IP address whitelist.

Procedure

  1. Compile the following test code and the pom.xml file that contains the dependency required for accessing the PolarDB-X database. Then, package the test code and dependency into a JAR file and upload this file to OSS.
    Sample test code:
    package com.aliyun.spark
    
    import java.util.Properties
    import org.apache.spark.sql.SparkSession
    
    object SparkOnPOLARDB {
    
      def main(args: Array[String]): Unit = {
        // The URL and database of the PolarDB-X cluster, the name of the table in the PolarDB-X database, and the username and password that are used to log on to the PolarDB-X database.
        val url = args(0)
        val jdbcConnURL = s"jdbc:mysql://$url"
        val database = args(1)
        val tableName = args(2)
        val user = args(3)
        val password = args(4)
    
        // The name of the table in the serverless Spark engine of DLA.
        var sparkTableName = args(5)
    
        val sparkSession = SparkSession
          .builder()
          .appName("scala spark on POLARDB test")
          .getOrCreate()
    
        val driver = "com.mysql.cj.jdbc.Driver"
    
        // If the table exists, delete it.
        sparkSession.sql(s"drop table if exists $sparkTableName")
    
        // Execute the following SQL statement to create a table. The schema of the table is mapped to that in the PolarDB-X database.
        val createCmd =
          s"""CREATE TABLE ${sparkTableName} USING org.apache.spark.sql.jdbc
             |    options (
             |    driver '$driver',
             |    url '$jdbcConnURL',
             |    dbtable '$database.$tableName',
             |    user '$user',
             |    password '$password'
             |    )""".stripMargin
        println(s"createCmd: \n $createCmd")
        sparkSession.sql(createCmd)
        val querySql = "select * from " + sparkTableName + " limit 1"
        sparkSession.sql(querySql).show
    
        // Call the Dataset API operation.
        val connectionProperties = new Properties()
        connectionProperties.put("driver", driver)
        connectionProperties.put("user", user)
        connectionProperties.put("password", password)
        // Read data from the table in the PolarDB-X database.
        var jdbcDf = sparkSession.read.jdbc(jdbcConnURL,
          s"$database.$tableName",
          connectionProperties)
        jdbcDf.select("name", "age", "score").show()
    
        val data =
          Seq(
            PersonPolardb("bill", 30, 170.5),
            PersonPolardb("gate", 29, 200.3)
          )
        val dfWrite = sparkSession.createDataFrame(data)
    
        // Write data to the table in the PolarDB-X database.
        dfWrite
          .write
          .mode("append")
          .jdbc(jdbcConnURL, s"$database.$tableName", connectionProperties)
        jdbcDf.select("name", "age").show()
        sparkSession.stop()
      }
    }
    
    case class PersonPolardb(name: String, age: Int, score: Double)
    Dependency in the pom.xml file of the PolarDB-X database:
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.22</version>
            </dependency>
  2. Log on to the DLA console.
  3. In the top navigation bar, select the region where the PolarDB-X cluster resides.
  4. In the left-side navigation pane, choose Serverless Spark > Submit job.
  5. On the Parameter Configuration page, click Create Job.
  6. In the Create Job dialog box, configure the parameters and click OK to create a Spark job.
    3
  7. In the Job List navigation tree, click the Spark job that you created and enter the following content of the job in the code editor. Replace the parameter values based on the following parameter descriptions. Then, click Save and Execute.
    {
        "args": [
            "xxx.drds.aliyuncs.com: 3306", # The internal endpoint and port number of the PolarDB-X cluster.
            "testdb_drds",  # The name of the database in the PolarDB-X cluster.
            "test_table",  # The name of the table in the PolarDB-X database.
            "xxx1",  # The username that is used to log on to the PolarDB-X database.
            "xxx2",  # The password that is used to log on to the PolarDB-X database.
            "spark_on_polardbx_table"  # The name of the Spark table that is mapped to the table in the PolarDB-X database.
        ],
        "file": "oss://spark_test/jars/polardbx/spark-examples-0.0.1-SNAPSHOT.jar",  # The OSS directory where the test code is saved.
        "name": "polardbx-test",
        "jars": [
            "oss://spark_test/jars/polardbx/mysql-connector-java-8.0.22.jar"  # The OSS directory where the JAR file that contains the dependency of the test code is saved.
        ],
        "className": "com.aliyun.spark.SparkOnPolarDBX",
        "conf": {
            "spark.driver.resourceSpec": "small", # The specifications of the Spark driver, which can be small, medium, large, or xlarge.
            "spark.executor.instances": 2,  # The number of Spark executors.
            "spark.executor.resourceSpec": "small",  # The specifications of Spark executors, which can be small, medium, large, or xlarge.
            "spark.dla.eni.enable": "true",  # Specifies whether to enable an elastic network interface (ENI) for the VPC. If you want to access data of the VPC, set spark.dla.eni.enable to true.
            "spark.dla.eni.vswitch.id": "vsw-xxx",  # The ID of the vSwitch to which the PolarDB-X cluster belongs.
            "spark.dla.eni.security.group.id": "sg-xxx"  # The ID of the security group to which the PolarDB-X cluster belongs.
        }
    }

Result

After the job succeeds, find the job and click Log in the Operation column to view the logs of the job.