All Products
Search
Document Center

Data Lake Analytics - Deprecated:MaxCompute

Last Updated:Feb 19, 2024

MaxCompute (previously known as ODPS) is a rapid, fully managed data warehousing solution that can process terabytes or petabytes of data. This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access MaxCompute.

Important

DLA is discontinued. AnalyticDB for MySQL supports the features of DLA and provides additional features and enhanced performance. For more information about how to use AnalyticDB for MySQL to access MaxCompute, see Access MaxCompute.

Prerequisites

  • Object Storage Service (OSS) is activated. For more information, see Activate OSS.

  • A MaxCompute project is created. For more information, see Create a MaxCompute project. In this example, the project is named spark_on_maxcompute and the mode of the workspace that is associated with the project is Basic Mode (Production Environment Only).

  • A table is created in the MaxCompute project. In this example, the table is named sparktest. The following sample code provides an example on how to create a table:

    CREATE TABLE `sparktest` (
     `a` int,
     `b` STRING 
    ) 
    PARTITIONED BY (pt string);
  • A Resource Access Management (RAM) user is added for the MaxCompute project and roles are assigned to the RAM user. For more information, see Grant permissions to a RAM user.

Procedure

  1. Compile the following test code and download the POM file that contains the dependencies required to access MaxCompute. Then, package the test code and dependencies into separate JAR files and upload the JAR files to OSS.

    Sample test code:

    package com.aliyun.spark
    
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    object MaxComputeDataSourcePartitionSample {
      def main(args: Array[String]): Unit = {
        val accessKeyId = args(0)
        val accessKeySecret = args(1)
        val odpsUrl = args(2)
        val tunnelUrl = args(3)
        val project = args(4)
        val table = args(5)
        var numPartitions = 1
        if(args.length > 6)
          numPartitions = args(6).toInt
    
        val ss = SparkSession.builder().appName("Test Odps Read").getOrCreate()
    
        import ss.implicits._
    
        val dataSeq = (1 to 1000000).map {
          index => (index, (index-3).toString)
        }.toSeq
    
    
        val df = ss.sparkContext.makeRDD(dataSeq).toDF("a", "b")
    
        System.out.println("*****" + table + ",before overwrite table")
        df.write.format("org.apache.spark.aliyun.odps.datasource")
          .option("odpsUrl", odpsUrl)
          .option("tunnelUrl", tunnelUrl)
          .option("table", table)
          .option("project", project)
          .option("accessKeySecret", accessKeySecret)
          .option("partitionSpec", "pt='2018-04-01'")
          .option("allowCreateNewPartition", true)
          .option("accessKeyId", accessKeyId).mode(SaveMode.Overwrite).save()
        System.out.println("*****" + table + ",after overwrite table, before read table")
    
        val readDF = ss.read
          .format("org.apache.spark.aliyun.odps.datasource")
          .option("odpsUrl", odpsUrl)
          .option("tunnelUrl", tunnelUrl)
          .option("table", table)
          .option("project", project)
          .option("accessKeySecret", accessKeySecret)
          .option("accessKeyId", accessKeyId)
          .option("partitionSpec", "pt='2018-04-01'")
          .option("numPartitions",numPartitions).load()
    
        readDF.collect().foreach(println)
      }
    }

    POM file that contains the dependencies of MaxCompute:

            <dependency>
                <groupId>com.aliyun.odps</groupId>
                <artifactId>odps-sdk-commons</artifactId>
                <version>0.28.4-public</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun.apsaradb</groupId>
                <artifactId>maxcompute-spark</artifactId>
                <version>0.28.4-public_2.4.3-1.0-SNAPSHOT</version>    
            </dependency>
  2. Log on to the DLA console.

  3. In the top navigation bar, select the region in which the MaxCompute project resides.

  4. In the left-side navigation pane, choose Serverless Spark > Submit job.

  5. On the Parameter Configuration page, click Create a job Template.

  6. In the Create a job Template dialog box, configure the parameters as prompted and click OK.

    3

  7. On the Job list tab, click the name of the Spark job that you created and enter the following content of the job in the code editor. Then, save and submit the Spark job.

    {
        "args": [
            "xxx1",  #The AccessKey ID that is used to access the MaxCompute project. 
            "xxx2",  #The AccessKey secret that is used to access the MaxCompute project. 
            "http://service.cn.maxcompute.aliyun-inc.com/api",  #The endpoint of the virtual private cloud (VPC) in which the MaxCompute project resides. 
            "http://dt.cn-shenzhen.maxcompute.aliyun-inc.com",  #The Tunnel endpoint of the VPC in which the MaxCompute project resides. 
            "spark_on_maxcompute",  #The name of the workspace that is associated with the MaxCompute project. 
            "sparktest",  #The name of the MaxCompute data table. 
            "2"  #The number of partitions in the MaxCompute data table. 
        ],
        "file": "oss://spark_test/jars/maxcompute/spark-examples-0.0.1-SNAPSHOT.jar", #The OSS directory in which the test code package is saved. 
        "name": "Maxcompute-test",
        "jars": [
            ##The OSS directories in which the test code dependencies are saved. 
            "oss://spark_test/jars/maxcompute/maxcompute-spark-0.28.4-public_2.4.3-1.0-SNAPSHOT.jar",
            "oss://spark_test/jars/maxcompute/odps-sdk-commons-0.28.4-public.jar",
            "oss://spark_test/jars/maxcompute/odps-sdk-core-0.28.4-public.jar",
            "oss://spark_test/jars/maxcompute/mail-1.4.7.jar"
        ],
        "className": "com.aliyun.spark.MaxComputeDataSourcePartitionSample",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "small"
        }
    }

Result

After the job succeeds, find the job and click Log in the Operation column to view the logs of the job. If the following log appears, the job succeeds.