This topic describes how to configure Spark 2.x dependencies and provides some examples.

Dependency configurations for Spark 2.x

If you want to submit your Spark 2.x application by using Spark on MaxCompute, you must add the following dependencies to the pom.xml file. For more information about pom.xml, see pom.xml.
<properties>
    <spark.version>2.3.0</spark.version>
    <cupid.sdk.version>3.3.8-public</cupid.sdk.version>
    <scala.version>2.11.8</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>cupid-sdk</artifactId>
    <version>${cupid.sdk.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>hadoop-fs-oss</artifactId>
    <version>${cupid.sdk.version}</version>
</dependency>
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
    <version>${cupid.sdk.version}</version>
</dependency>
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
</dependency>
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-actors</artifactId>
    <version>${scala.version}</version>
</dependency>
In the preceding code, set the scope parameter based on the following instructions:
  • Set scope to provided for all packages that are released in the Apache Spark community, such as spark-core and spark-sql.
  • Set scope to compile for odps-spark-datasource.

WordCount example

  • WordCount.scala
  • How to commit
    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    
    # For more information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment.
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class \
        com.aliyun.odps.spark.examples.WordCount \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

GraphX PageRank example

  • PageRank.scala
  • How to commit
    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    # For more information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment.
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class \
        com.aliyun.odps.spark.examples.graphx.PageRank \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

Mllib Kmeans-ON-OSS example

For more information about how to configure spark.hadoop.fs.oss.ststoken.roleArn and spark.hadoop.fs.oss.endpoint, see OSS access notes.

  • KmeansModelSaveToOss.scala
  • How to commit
    # Edit code.
    val modelOssDir = "oss://bucket/kmeans-model" // Enter the path of the OSS bucket.
    val spark = SparkSession
      .builder()
      .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider")
      .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole")
      .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
      .appName("KmeansModelSaveToOss")
      .getOrCreate()
    
    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    # For more information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment.
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class \
        com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

OSS UnstructuredData example

For more information about how to configure spark.hadoop.fs.oss.ststoken.roleArn and spark.hadoop.fs.oss.endpoint, see OSS access notes.

  • SparkUnstructuredDataCompute.scala
  • How to commit
    # Edit code.
    val pathIn = "oss://bucket/inputdata/" // Enter the path of the OSS bucket.
    val spark = SparkSession
      .builder()
      .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider")
      .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole")
      .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
      .appName("SparkUnstructuredDataCompute")
      .getOrCreate()
    
    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    # For more information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment.
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class \
        com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

Example of reading data from or writing data to a MaxCompute table

  • SparkSQL.scala
  • How to commit
    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    # For more information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment.
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class \
        com.aliyun.odps.spark.examples.sparksql.SparkSQL \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

Example of using PySpark to read data from or write data to a MaxCompute table

  • spark_sql.py
  • How to commit
    # For more information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment.
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --jars /path/to/odps-spark-datasource_2.11-3.3.8-public.jar \
        /path/to/MaxCompute-Spark/spark-2.x/src/main/python/spark_sql.py

Example of using PySpark to write data to OSS

  • spark_oss.py
  • How to commit
    # For more information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment.
    # For more information about OSS configurations, see OSS access notes.
    
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --jars /path/to/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar \
        /path/to/MaxCompute-Spark/spark-2.x/src/main/python/spark_oss.py
    # Compile Spark-2.x to obtain the spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar package.

Spark Streaming LogHub example

  • LogHubStreamingDemo.scala
  • How to commit
    # For more information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment.
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class \
        com.aliyun.odps.spark.examples.streaming.loghub.LogHubStreamingDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

Spark Streaming DataHub example

  • DataHubStreamingDemo.scala
  • How to commit
    # For more information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment.
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class \
        com.aliyun.odps.spark.examples.streaming.datahub.DataHubStreamingDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

Spark Streaming Kafka example

  • KafkaStreamingDemo.scala
  • How to commit
    # For more information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment.
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class \
        com.aliyun.odps.spark.examples.streaming.kafka.KafkaStreamingDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Note For more information, see Spark on MaxCompute.

Example of reading data from MaxCompute and writing the data to HBase

  • Sample code
    object McToHbase {
      def main(args: Array[String]) {
        val spark = SparkSession
          .builder()
          .appName("spark_sql_ddl")
          .config("spark.sql.catalogImplementation", "odps")
          .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
          .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
          .getOrCreate()
          val sc = spark.sparkContext
          val config = HBaseConfiguration.create()
          val zkAddress = ""
          config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
          val jobConf = new JobConf(config)
          jobConf.setOutputFormat(classOf[TableOutputFormat])
          jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")
    
        try{
          import spark._
          spark.sql("select '7', 'long'").rdd.map(row => {
            val id = row(0).asInstanceOf[String]
            val name = row(1).asInstanceOf[String]
            val put = new Put(Bytes.toBytes(id))
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(name))
            (new ImmutableBytesWritable, put)
        }).saveAsHadoopDataset(jobConf)
      } finally {
        sc.stop()
      }
    
      }
    }
  • How to commit: Use IntelliJ IDEA to commit and run the sample code.
    Note For more information about how to configure the Spark development environment, see Running modes of Spark on MaxCompute.

Examples of reading data from and writing data to OSS objects

  • Sample code
    • Example 1: Sample code for the local mode
      package com.aliyun.odps.spark.examples
      import java.io.ByteArrayInputStream
      import org.apache.spark.sql.SparkSession
      
      object SparkOSS {
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .config("spark.master", "local[4]") // The code can run after you set spark.master to local[N]. N indicates the number of concurrent Spark jobs.
            .config("spark.hadoop.fs.oss.accessKeyId", "")
            .config("spark.hadoop.fs.oss.accessKeySecret", "")
            .config("spark.hadoop.fs.oss.endpoint", "oss-cn-beijing.aliyuncs.com")
            .appName("SparkOSS")
            .getOrCreate()
      
          val sc = spark.sparkContext
          try {
            // Read data from OSS objects.
            val pathIn = "oss://spark-oss/workline.txt"
            val inputData = sc.textFile(pathIn, 5)
                  // Write Resilient Distributed Datasets (RDD).
            inputData.repartition(1).saveAsTextFile("oss://spark-oss/user/data3")
      
          } finally {
            sc.stop()
          }
        }
      }
      Note Before you run the sample code, check whether the hadoop-fs-oss dependency is added. If the dependency is not added, an error is returned.
    • Example 2: Sample code for the local mode
      package com.aliyun.odps.spark.examples
      import java.io.ByteArrayInputStream
      import com.aliyun.oss.{ OSSClientBuilder,OSSClient}
      import org.apache.spark.sql.SparkSession
      
      object SparkOSS {
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .config("spark.master", "local[4]") // The code can run after you set spark.master to local[N]. N indicates the number of concurrent Spark jobs.
            .config("spark.hadoop.fs.oss.accessKeyId", "")
            .config("spark.hadoop.fs.oss.accessKeySecret", "")
            .config("spark.hadoop.fs.oss.endpoint", "oss-cn-beijing.aliyuncs.com")
            .appName("SparkOSS")
            .getOrCreate()
      
          val sc = spark.sparkContext
          try {
            // Read data from OSS objects.
            val pathIn = "oss://spark-oss/workline.txt"
            val inputData = sc.textFile(pathIn, 5)
            val cnt = inputData.count
            inputData.count()
            println(s"count: $cnt")
      
            // Write data to OSS objects.
            val ossClient = new OSSClientBuilder().build("oss-cn-beijing.aliyuncs.com", "<accessKeyId>", "<accessKeySecret>")
            val filePath="user/data"
            ossClient.putObject("spark-oss",filePath , new ByteArrayInputStream(cnt.toString.getBytes()))
            ossClient.shutdown()
          } finally {
            sc.stop()
          }
        }
      }
    • Example 3: Sample code for the cluster mode
      package com.aliyun.odps.spark.examples
      import java.io.ByteArrayInputStream
      import com.aliyun.oss.{ OSSClientBuilder,OSSClient}
      import org.apache.spark.sql.SparkSession
      
      object SparkOSS {
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .appName("SparkOSS")
            .getOrCreate()
      
          val sc = spark.sparkContext
          try {
            // Read data from OSS objects.
            val pathIn = "oss://spark-oss/workline.txt"
            val inputData = sc.textFile(pathIn, 5)
            val cnt = inputData.count
            inputData.count()
            println(s"count: $cnt")
      
            // inputData.repartition(1).saveAsTextFile("oss://spark-oss/user/data3")
            // Write data to OSS objects.
            val ossClient = new OSSClientBuilder().build("oss-cn-beijing.aliyuncs.com", "<accessKeyId>", "<accessKeySecret>")
            val filePath="user/data"
            ossClient.putObject("spark-oss",filePath , new ByteArrayInputStream(cnt.toString.getBytes()))
            ossClient.shutdown()
          } finally {
            sc.stop()
          }
        }
      }
  • How to commit
    • Use IntelliJ IDEA to develop, test, and commit the code for the local mode.
    • Use an ODPS Spark node to commit and run the code in the DataWorks console. For more information, see Create an ODPS Spark node.
    Note For more information about how to configure the Spark development environment, see Running modes of Spark on MaxCompute.

Example of reading data from MaxCompute and writing the data to OSS

  • Sample code
    • Sample code for the local mode
      package com.aliyun.odps.spark.examples.userpakage
      
      import org.apache.spark.sql.{ SaveMode, SparkSession}
      
      object SparkODPS2OSS {
        def main(args: Array[String]): Unit = {
          val spark = SparkSession
            .builder()
            .appName("Spark2OSS")
            .config("spark.master", "local[4]")// The code can run after you set spark.master to local[N]. N indicates the number of concurrent Spark jobs.
            .config("spark.hadoop.odps.project.name", "")
            .config("spark.hadoop.odps.access.id", "")
            .config("spark.hadoop.odps.access.key", "")
            .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
            .config("spark.sql.catalogImplementation", "odps")
            .config("spark.hadoop.fs.oss.accessKeyId","")
            .config("spark.hadoop.fs.oss.accessKeySecret","")
            .config("spark.hadoop.fs.oss.endpoint","oss-cn-beijing.aliyuncs.com")
            .getOrCreate()
      
          try{
      
            // Use Spark SQL to query tables.
            val data = spark.sql("select * from  user_detail")
           // Show the query results.
            data.show(10)
            // Store the query results to an OSS object.
            data.toDF().coalesce(1).write.mode(SaveMode.Overwrite).csv("oss://spark-oss/user/data3")
          }finally {
            spark.stop()
          }
      
        }
      }
    • Sample code for the cluster mode
      package com.aliyun.odps.spark.examples.userpakage
      import org.apache.spark.sql.{ SaveMode, SparkSession}
      
      object SparkODPS2OSS {
        def main(args: Array[String]): Unit = {
          val spark = SparkSession
            .builder()
            .appName("SparkODPS2OSS")
            .getOrCreate()
      
          try{
      
            // Use Spark SQL to query tables.
            val data = spark.sql("select * from  user_detail")
           // Show the query results.
            data.show(10)
            // Store the query results to an OSS object.
            data.toDF().coalesce(1).write.mode(SaveMode.Overwrite).csv("oss://spark-oss/user/data3")
      
          }finally {
            spark.stop()
          }
      
        }
      }
  • How to commit
    • Use IntelliJ IDEA to develop, test, and commit the code for the local mode.
    • Use an ODPS Spark node to commit and run the code in the DataWorks console. For more information, see Create an ODPS Spark node.
    Note For more information about how to configure the Spark development environment, see Running modes of Spark on MaxCompute.