MaxCompute (previously known as ODPS) is a rapid, fully-managed data warehousing solution that can process terabytes or petabytes of data. This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access MaxCompute.

Prerequisites

  • Object Storage Service (OSS) is activated. For more information, see Activate OSS.
  • A MaxCompute project is created. For more information, see Create a project. In this example, the project name is spark_on_maxcompute and the mode of the workspace that is associated with the project is Basic Mode (Production Environment Only).
  • A table is created in the MaxCompute project. In this example, the table name is sparktest. The following example shows the statement that is used to create a table:
    CREATE TABLE `sparktest` (
     `a` int,
     `b` STRING 
    ) 
    PARTITIONED BY (pt string);
  • A RAM user is added for the MaxCompute project and roles are assigned to the RAM user. For more information, see Add project members and configure roles.

Procedure

  1. Compile the following test code and download the POM file that contains the dependencies required for accessing MaxCompute. Then, package the test code and dependencies into a JAR file and upload this file to OSS.
    Sample test code:
    package com.aliyun.spark
    
    import org.apache.spark.sql.{ SaveMode, SparkSession}
    
    object MaxComputeDataSourcePartitionSample {
      def main(args: Array[String]): Unit = {
        val accessKeyId = args(0)
        val accessKeySecret = args(1)
        val odpsUrl = args(2)
        val tunnelUrl = args(3)
        val project = args(4)
        val table = args(5)
        var numPartitions = 1
        if(args.length > 6)
          numPartitions = args(6).toInt
    
        val ss = SparkSession.builder().appName("Test Odps Read").getOrCreate()
    
        import ss.implicits._
    
        val dataSeq = (1 to 1000000).map {
          index => (index, (index-3).toString)
        }.toSeq
    
    
        val df = ss.sparkContext.makeRDD(dataSeq).toDF("a", "b")
    
        System.out.println("*****" + table + ",before overwrite table")
        df.write.format("org.apache.spark.aliyun.odps.datasource")
          .option("odpsUrl", odpsUrl)
          .option("tunnelUrl", tunnelUrl)
          .option("table", table)
          .option("project", project)
          .option("accessKeySecret", accessKeySecret)
          .option("partitionSpec", "pt='2018-04-01'")
          .option("allowCreateNewPartition", true)
          .option("accessKeyId", accessKeyId).mode(SaveMode.Overwrite).save()
        System.out.println("*****" + table + ",after overwrite table, before read table")
    
        val readDF = ss.read
          .format("org.apache.spark.aliyun.odps.datasource")
          .option("odpsUrl", odpsUrl)
          .option("tunnelUrl", tunnelUrl)
          .option("table", table)
          .option("project", project)
          .option("accessKeySecret", accessKeySecret)
          .option("accessKeyId", accessKeyId)
          .option("partitionSpec", "pt='2018-04-01'")
          .option("numPartitions",numPartitions).load()
    
        readDF.collect().foreach(println)
      }
    }
    POM file that contains the dependencies of MaxCompute:
            <dependency>
                <groupId>com.aliyun.odps</groupId>
                <artifactId>odps-sdk-commons</artifactId>
                <version>0.28.4-public</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun.apsaradb</groupId>
                <artifactId>maxcompute-spark</artifactId>
                <version>0.28.4-public_2.4.3-1.0-SNAPSHOT</version>    
            </dependency>
  2. Log on to the DLA console.
  3. In the top navigation bar, select the region where the MaxCompute project resides.
  4. In the left-side navigation pane, choose Serverless Spark > Submit job.
  5. On the Parameter Configuration page, click Create Job.
  6. In the Create Job dialog box, configure the parameters as prompted and click OK to create a Spark job.
    3
  7. In the Job List navigation tree, click the name of the Spark job that you created. In the code editor, enter the following content of the job. Replace the parameter values as required based on the following parameter description. Then, save and submit the job.
    {
        "args": [
            "xxx1",  #The AccessKey ID that is used to access the MaxCompute project.
            "xxx2",  #The AccessKey secret that is used to access the MaxCompute project.
            "http://service.cn.maxcompute.aliyun-inc.com/api",  #The endpoint of the VPC where the MaxCompute project resides.
            "http://dt.cn-shenzhen.maxcompute.aliyun-inc.com",  #The Tunnel endpoint of the VPC where the MaxCompute project resides.
            "spark_on_maxcompute",  #The name of the workspace that is associated with the MaxCompute project.
            "sparktest",  #The name of the MaxCompute data table.
            "2"  #Then number of partitions in the MaxCompute data table.
        ],
        "file": "oss://spark_test/jars/maxcompute/spark-examples-0.0.1-SNAPSHOT.jar", #The OSS directory in which the test code package is saved.
        "name": "Maxcompute-test",
        "jars": [
            ##The OSS directories in which the test code dependencies are saved.
            "oss://spark_test/jars/maxcompute/maxcompute-spark-0.28.4-public_2.4.3-1.0-SNAPSHOT.jar",
            "oss://spark_test/jars/maxcompute/odps-sdk-commons-0.28.4-public.jar",
            "oss://spark_test/jars/maxcompute/odps-sdk-core-0.28.4-public.jar",
            "oss://spark_test/jars/maxcompute/mail-1.4.7.jar"
        ],
        "className": "com.aliyun.spark.MaxComputeDataSourcePartitionSample",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "small"
        }
    }

Result

After the job succeeds, find the job and click Log in the Operation column to view the logs of the job.