This topic describes how to set up a Spark on MaxCompute development environment.

Prerequisites

The following software has been installed:
  • JDK 1.8
  • Python 2.7
  • Maven
  • Git

Download the Spark on MaxCompute package

The Spark on MaxCompute package is released with the MaxCompute authorization feature. This allows Spark on MaxCompute to serve as a client that uses the spark-submit script to submit jobs. The following packages have been released for Spark 1.x and Spark 2.x applications:

Configure environment variables

In the command line window of the operating system that you use, run commands to configure the following environment variables:

  • Configure the JAVA_HOME environment variable.
    # We recommend that you use JDK 1.8. Use the Linux operating system for an example. 
    export JAVA_HOME=/path/to/jdk
    export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
    export PATH=$JAVA_HOME/bin:$PATH
  • Configure the SPARK_HOME environment variable.
    Replace the SPARK_HOME environment variable with the path to which the Spark on MaxCompute package is decompressed.
    # Use the Linux operating system for an example. 
    export SPARK_HOME=/path/to/spark-x.x.x-public
    export PATH=$SPARK_HOME/bin:$PATH
  • To use PySpark, you must install Python 2.7 and configure the PATH environment variable.
    # Use the Linux operating system for an example. 
    export PATH=/path/to/python/bin/:$PATH

Configure the spark-defaults.conf file

If you use the Spark on MaxCompute client for the first time, you must configure the spark-defaults.conf file in the path to which the client package is decompressed.

The spark-defaults.conf.template file is stored in the $SPARK_HOME/conf path. You can use this file as a template to configure the spark-defaults.conf file.
Notice Rename spark-defaults.conf.template as spark-defaults.conf before you configure the spark-defaults.conf file. If you do not rename the spark-defaults.conf.template file, the configurations cannot take effect.
The following code shows sample configurations in the spark-defaults.conf file.
# spark-defaults.conf
# Enter the name of the MaxCompute project and account information that is used to access the MaxCompute project. 
spark.hadoop.odps.project.name = XXX  
spark.hadoop.odps.access.id = XXX     
spark.hadoop.odps.access.key = XXX

# Retain the following configurations. 
spark.hadoop.odps.end.point = http://service.<regionid>.maxcompute.aliyun.com/api   # The endpoint that is used to connect the Spark on MaxCompute client to your MaxCompute project. You can configure the endpoint based on your business requirements. For more information, see Endpoints. 
spark.hadoop.odps.runtime.end.point = http://service.<regionid>.maxcompute.aliyun-inc.com/api  # The endpoint of the environment in which the Spark on MaxCompute client runs. Set this parameter to the endpoint of the virtual private cloud (VPC) to which MaxCompute belongs based on your region. You can change the value of this parameter based on your business requirements. 
spark.sql.catalogImplementation=odps ## In Spark 2.3.0, set this parameter to odps. In Spark 2.4.5, set this parameter to hive. 
spark.hadoop.odps.task.major.version = cupid_v2
spark.hadoop.odps.cupid.container.image.enable = true
spark.hadoop.odps.cupid.container.vm.engine.type = hyper

spark.hadoop.odps.cupid.webproxy.endpoint = http://service.cn.maxcompute.aliyun-inc.com/api
spark.hadoop.odps.moye.trackurl.host = http://jobview.odps.aliyun.com

Other configurations are required for special scenarios and features. For more information, see Spark on MaxCompute configuration details.

Make project preparations

The Spark on MaxCompute client provides a demo project template. We recommend that you download and copy the template to develop your applications.
Note In the demo project, the dependency scope is provided. Do not change this scope. Otherwise, the job that you submit may not run as expected.
  • Download and compile the Spark-1.x template.
    git clone https://github.com/aliyun/MaxCompute-Spark.git
    cd spark-1.x
    mvn clean package
  • Download and compile the Spark-2.x template.
    git clone https://github.com/aliyun/MaxCompute-Spark.git
    cd spark-2.x
    mvn clean package

Configure dependencies

  • Configure the dependencies for Spark jobs to access MaxCompute tables.
    Spark jobs use the odps-spark-datasource package to access MaxCompute tables. The following code shows sample dependencies in the Maven POM file.
    <!-- Dependency for a Spark-2.x job -->
    <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>odps-spark-datasource_2.11</artifactId>
        <version>3.3.8-public</version>
    </dependency>
    
    <!-- Dependency for a Spark-1.x job -->
    <dependency>
      <groupId>com.aliyun.odps</groupId>
      <artifactId>odps-spark-datasource_2.10</artifactId>
      <version>3.3.8-public</version>
    </dependency>
  • Configure the dependency for Spark jobs to access Object Storage Service (OSS).
    If Spark jobs need to access OSS, add the following dependency:
    <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>hadoop-fs-oss</artifactId>
        <version>3.3.8-public</version>
    </dependency>
  • Configure the dependency for a Spark-2.x job.

    For more information about the POM file for a Spark-2.x job, see Spark-2.x POM file.

  • Configure the dependency for a Spark-1.x job.

    For more information about the POM file for a Spark-2.x job, see Spark-1.x POM file.

Reference external files

If you develop the following types of Spark jobs, you must reference external files:
  • Spark jobs that need to read data from specific configuration files.
  • Spark jobs that require additional resource packages or third-party libraries, such as JAR packages or Python libraries.
In practical application, you must upload external files before you reference the files. You can upload external files by using one of the following methods:
  • Method 1: Upload external files by using Spark parameters.

    Spark on MaxCompute supports the following parameters that are defined by Apache Spark: --jars, --py-files, --files, and --archives. You can configure these parameters to upload external files when you submit jobs. This way, these external files are uploaded to the working directories of the driver and each executor when the jobs are running.

    • Use the spark-submit script to upload files on the Spark on MaxCompute client.
      Note
      • --jars: This parameter specifies the JAR packages that you want to upload to the current working directories of the driver and each executor. Package names are separated by commas (,). These JAR packages are added to the classpaths of the driver and each executor. You can use "./your_jar_name" in the configurations of Spark jobs to specify these packages. This is the case in Apache Spark.
      • --files and --py-files: The two parameters specify the common files or Python files that you want to upload to the current working directories of the driver and each executor. File names are separated by commas (,). You can use "./your_file_name" in the configurations of Spark jobs to specify these files. This is the case in Apache Spark.
      • --archives: This parameter is slightly different from the parameter defined by Apache Spark. Configure this parameter in the xxx#yyy format and separate file names with commas (,). After you configure this parameter, the archive files that you specify, such as ZIP files, are decompressed to the subdirectories of the current working directories of the driver and each executor. For example, if this parameter is set to xx.zip#yy, you can use "./yy/xx/" to reference the content in the archive file. If this parameter is set to xx.zip, you can use "./xx.zip/xx/" to reference the content in the archive file. If you want to use "./xxx/" to decompress the archive file directly to the current working directory, use the spark.hadoop.odps.cupid.resources parameter.
    • Use DataWorks to add the resources required by a job. For more information, see Create a MaxCompute resource.
      Note In the DataWorks console, you can upload files with a size of up to 50 MB. If the size of files that you want to upload exceeds 50 MB, you can use the MaxCompute client to upload these files as MaxCompute resources and add these resources to the Data Analytics tab in the DataWorks console. For more information about MaxCompute resources, see MaxCompute resources.
  • Method 2: Upload files as MaxCompute resources.
    Spark on MaxCompute provides the spark.hadoop.odps.cupid.resources parameter. This parameter allows you to directly reference resources in MaxCompute. When you run a job, resources that are referenced are uploaded to the working directories. To upload files by using this method, perform the following steps:
    1. Log on to the MaxCompute client and upload files to the MaxCompute client. The maximum size of a file that you can upload is 500 MB.
    2. Add the spark.hadoop.odps.cupid.resources parameter to the configurations of a Spark job. This parameter specifies the MaxCompute resources that are required for running the Spark job. Configure this parameter in the <projectname>.<resourcename> format. If you want to reference multiple files, separate the file names with commas (,). The following code shows an example:
      spark.hadoop.odps.cupid.resources=public.python-python-2.7-ucs4.zip,public.myjar.jar
      The specified resource file is downloaded to the current working directories of the driver and each executor. By default, the downloaded file is named in the <projectname>.<resourcename> format.
      You can also rename the file in the <projectname>.<resourcename>:<newresourcename> format. The following code shows an example:
      spark.hadoop.odps.cupid.resources=public.myjar.jar:myjar.jar
      Notice The spark.hadoop.odps.cupid.resources parameter takes effect only after you configure this parameter in the spark-defaults.conf file or in the DataWorks console. This parameter does not take effect if this parameter is written in code.
After you upload files by using one of the preceding methods, you can reference external files in code. The following code shows an example:
val targetFile = "File name"
val file = Source.fromFile(targetFile)
for (line <- file.getLines)
    println(line)
file.close

Conduct SparkPi smoke testing

After you perform the preceding operations, conduct SparkPi smoke testing to check whether Spark on MaxCompute can be connected. For example, you can run the following commands to conduct SparkPi smoke testing on Spark-2.1 applications:
# /path/to/MaxCompute-Spark Use the correct path of the compiled JAR package. 
cd $SPARK_HOME
bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.SparkPi \
/path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

# If the following log information is displayed, smoke testing is successful. 
19/06/11 11:57:30 INFO Client: 
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: 11.222.166.90
         ApplicationMaster RPC port: 38965
         queue: queue
         start time: 1560225401092
         final status: SUCCEEDED

Notes on using IntelliJ IDEA to run code in local mode

In most cases, you run code in cluster mode after local debugging is successful. Spark on MaxCompute allows you to run code in local mode by using IntelliJ IDEA. Before you run code by using IntelliJ IDEA, take note of the following points:
  • Set the spark.master parameter in code.
    val spark = SparkSession
          .builder()
          .appName("SparkPi")
          .config("spark.master", "local[4]") // The code can run after you set spark.master to local[N]. N indicates the parallelism. 
          .getOrCreate()
  • Add the related dependency of the Spark on MaxCompute client by using IntelliJ IDEA.
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope> 
    </dependency>
    In the pom.xml file, set the scope parameter to provided to prevent the "NoClassDefFoundError" error.
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/SparkSession$
        at com.aliyun.odps.spark.examples.SparkPi$.main(SparkPi.scala:27)
        at com.aliyun.odps.spark.examples.Spa. r. kPi.main(SparkPi.scala)
    Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.SparkSession$
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 2 more
    Use the following method to manually add the directory for storing JAR packages in Spark on MaxCompute to the project template of IntelliJ IDEA. This way, the configuration scope=provided is retained and no errors are returned when you run code in local mode by using IntelliJ IDEA.
    1. In the main menu bar of IntelliJ IDEA, choose File > Project Structure.
    2. In the Project Structure dialog box, click Modules in the left-side navigation pane. Then, select the resource package, and click the Dependencies tab for the resource package.
    3. On the Dependencies tab, click the plus sign (+) in the lower-left corner and select JARs or directories… to add the directory for storing JAR packages on Spark on MaxCompute.
  • The spark-defaults.conf file cannot be referenced in local mode. You must manually configure related parameters in code.
    If you submit jobs by using the spark-submit script, the system reads the configurations from the spark-defaults.conf file. If you submit jobs in local mode, you must manually configure related parameters in code. The following code shows how to use Spark SQL to read data from MaxCompute tables in local mode.
    val spark = SparkSession
          .builder()
          .appName("SparkPi")
          .config("spark.master", "local[4]") // The code can run after you set spark.master to local[N]. N indicates the parallelism. 
          .config("spark.hadoop.odps.project.name", "****")
          .config("spark.hadoop.odps.access.id", "****")
          .config("spark.hadoop.odps.access.key", "****")
          .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
          .config("spark.sql.catalogImplementation", "odps")
          .getOrCreate()

Notes on using Spark 2.4.5

  • Use Spark 2.4.5 to submit jobs
    • Submit a job in a Yarn cluster.
    • Set the spark.hadoop.odps.spark.version parameter to spark-2.4.5-odps0.33.0 in the DataWorks console. If the Spark version of exclusive resource groups in DataWorks is not updated to Spark 2.4.5, you can use shared resource groups to schedule jobs or contact the MaxCompute technical support team to update the Spark version.
  • Changes in using Spark 2.4.5
    • If you submit jobs in a Yarn cluster, you must run the export HADOOP_CONF_DIR=$SPARK_HOME/conf command to add the SPARK_HOME environment variable.
    • If you perform debugging in local mode, you must create a file named odps.conf in the $SPARK_HOME/conf path and add the following configurations to the file.
      odps.project.name = 
      odps.access.id = 
      odps.access.key =
      odps.end.point =
  • Changes in the parameter settings of Spark 2.4.5
    • spark.sql.catalogImplementation is set to hive.
    • spark.sql.sources.default is set to hive.
    • spark.sql.odps.columnarReaderBatchSize: specifies the number of rows from which the vectorized reader reads data at a time. Default value: 4096.
    • spark.sql.odps.enableVectorizedReader: specifies whether to enable the vectorized reader. Default value: True.
    • spark.sql.odps.enableVectorizedWriter: specifies whether to enable the vectorized writer. Default value: True.
    • spark.sql.odps.split.size: specifies the maximum size of data that can be read from a table in MaxCompute. Default value: 256. Unit: MB.