すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:Spark 2.xの例

最終更新日:Jan 07, 2025

このトピックでは、Spark 2.xの依存関係を設定する方法といくつかの例を示します。

Spark 2.xの依存関係の設定

MaxComputeでSparkを使用してSpark 2.xアプリケーションを送信する場合は、次の依存関係をpom.xmlファイルに追加する必要があります。 pom.xmlの詳細については、「pom.xml」をご参照ください。

<properties>
    <spark.version>2.3.0</spark.version>
    <cupid.sdk.version>3.3.8-public</cupid.sdk.version>
    <scala.version>2.11.8</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
</properties>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>cupid-sdk</artifactId>
    <version>${cupid.sdk.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>hadoop-fs-oss</artifactId>
    <version>${cupid.sdk.version}</version>
</dependency>
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
    <version>${cupid.sdk.version}</version>
</dependency>
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
</dependency>
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-actors</artifactId>
    <version>${scala.version}</version>
</dependency>

上記のコードでは、次の手順に基づいてscopeパラメーターを設定します。

  • Spark-coreやspark-sqlなど、Apache sparkコミュニティでリリースされているすべてのパッケージに対して、scopeprovidedに設定します。

  • odps-spark-datasourceモジュールのscopecompileに設定します。

WordCountの例 (Scala)

  • サンプルコード

    WordCount.scala

  • コミットする方法

    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    
    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class \
        com.aliyun.odps.spark.examples.WordCount \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

MaxComputeテーブルからのデータの読み取りまたはMaxComputeテーブルへのデータの書き込み例 (Scala)

  • サンプルコード

    SparkSQL.scala

  • コミットする方法

    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.sparksql.SparkSQL \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

GraphX PageRankの例 (Scala)

  • サンプルコード

    PageRank.scala

  • コミットする方法

    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.graphx.PageRank \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

MLlib KMeans-ON-OSSの例 (Scala)

spark.hadoop.fs.oss.ststoken.roleArnおよびspark.hadoop.fs.oss.endpointの設定方法については、「OSSアクセスノート」をご参照ください。

  • サンプルコード

    KmeansModelSaveToOss.scala

  • コミットする方法

    # Edit the code. 
    val modelOssDir = "oss://bucket/kmeans-model" // Enter the path of the OSS bucket. 
    val spark = SparkSession
      .builder()
      .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider")
      .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole")
      .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
      .appName("KmeansModelSaveToOss")
      .getOrCreate()
    
    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

OSS UnstructuredDataの例 (Scala)

spark.hadoop.fs.oss.ststoken.roleArnおよびspark.hadoop.fs.oss.endpointの設定方法については、「OSSアクセスノート」をご参照ください。

  • サンプルコード

    SparkUnstructuredDataCompute.scala

  • コミットする方法

    # Edit the code. 
    val pathIn = "oss://bucket/inputdata/" // Enter the path of the OSS bucket. 
    val spark = SparkSession
      .builder()
      .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider")
      .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole")
      .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
      .appName("SparkUnstructuredDataCompute")
      .getOrCreate()
    
    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

SparkPiの例 (Scala)

  • サンプルコード

    SparkPi.scala

  • コミットする方法

    cd /path/to/MaxCompute-Spark/spark-2.x
    mvn clean package
    
    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.SparkPi \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

Spark Streaming LogHubの例 (Scala)

  • サンプルコード

    LogHubStreamingDemo.scala

  • コミットする方法

    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.loghub.LogHubStreamingDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

Spark Streaming LogHubを使用してMaxCompute (Scala) にデータを書き込む例

  • サンプルコード

    LogHub2OdpsDemo.scala

  • コミットする方法

    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.loghub.LogHub2OdpsDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

Spark Streaming DataHubの例 (Scala)

  • サンプルコード

    DataHubStreamingDemo.scala

  • コミットする方法

    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.datahub.DataHubStreamingDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

Spark Streaming DataHubを使用してMaxCompute (Scala) にデータを書き込む例

  • サンプルコード

    DataHub2OdpsDemo.scala

  • コミットする方法

    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.datahub.DataHub2OdpsDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

Spark Streaming Kafkaの例 (Scala)

  • サンプルコード

    KafkaStreamingDemo.scala

  • コミットする方法

    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.kafka.KafkaStreamingDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
説明

詳細については、「Spark on MaxCompute」をご参照ください。

Spark StructuredStreaming DataHubの例 (Scala)

  • サンプルコード

    DatahubStructuredStreamingDemo.scala

  • コミットする方法

    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.datahub.DatahubStructuredStreamingDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

Spark StructuredStreaming Kafkaの例 (Scala)

  • サンプルコード

    KafkaStructuredStreamingDemo.scala

  • コミットする方法

    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.kafka.KafkaStructuredStreamingDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

Spark StructuredStreaming LogHubの例 (Scala)

  • サンプルコード

    LoghubStructuredStreamingDemo.scala

  • コミットする方法

    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.loghub.LoghubStructuredStreamingDemo \
        /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

PySparkを使用してMaxComputeテーブルからデータを読み書きする例 (Python)

  • サンプルコード

    spark_sql.py

  • コミットする方法

    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --jars /path/to/odps-spark-datasource_2.11-3.3.8-public.jar \
        /path/to/MaxCompute-Spark/spark-2.x/src/main/python/spark_sql.py

PySparkを使用してOSSにデータを書き込む例 (Python)

  • サンプルコード

    spark_oss.py

  • コミットする方法

    # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. 
    # For information about OSS configurations, see OSS access notes. 
    
    cd $SPARK_HOME
    bin/spark-submit --master yarn-cluster --jars /path/to/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar \
        /path/to/MaxCompute-Spark/spark-2.x/src/main/python/spark_oss.py
    # Compile Spark 2.x to obtain the spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar package.

Spark SQLの例 (Java)

Spark SQL Javaサンプルコードの詳細については、「JavaSparkSQL.java」をご参照ください。

MaxComputeからのデータの読み取りとHBaseへのデータの書き込みの例

IntelliJ IDEAを使用して、MaxComputeからデータを読み取り、HBaseにデータを書き込むコードを記述します。

  • サンプルコード

    object McToHbase {
      def main(args: Array[String]) {
        val spark = SparkSession
          .builder()
          .appName("spark_sql_ddl")
          .config("spark.sql.catalogImplementation", "odps")
          .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
          .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
          .getOrCreate()
          val sc = spark.sparkContext
          val config = HBaseConfiguration.create()
          val zkAddress = ""
          config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
          val jobConf = new JobConf(config)
          jobConf.setOutputFormat(classOf[TableOutputFormat])
          jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")
    
        try{
          import spark._
          spark.sql("select '7', 'long'").rdd.map(row => {
            val id = row(0).asInstanceOf[String]
            val name = row(1).asInstanceOf[String]
            val put = new Put(Bytes.toBytes(id))
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(name))
            (new ImmutableBytesWritable, put)
        }).saveAsHadoopDataset(jobConf)
      } finally {
        sc.stop()
      }
    
      }
    }
  • コミットする方法: IntelliJ IDEAを使用して、サンプルコードをコミットして実行します。 詳細については、「MaxComputeでのSparkの実行モード」をご参照ください。

OSSオブジェクトからのデータの読み取りとOSSオブジェクトへのデータの書き込みの例

IntelliJ IDEAまたはDataWorksを使用して、OSSオブジェクトからデータを読み書きします。

  • サンプルコード

    • 例1: ローカルモードのサンプルコード

      package com.aliyun.odps.spark.examples
      import java.io.ByteArrayInputStream
      import org.apache.spark.sql.SparkSession
      
      object SparkOSS {
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .config("spark.master", "local[4]") // The code can run after you set spark.master to local[N]. N indicates the number of concurrent Spark jobs. 
            .config("spark.hadoop.fs.oss.accessKeyId", "")
            .config("spark.hadoop.fs.oss.accessKeySecret", "")
            .config("spark.hadoop.fs.oss.endpoint", "oss-cn-beijing.aliyuncs.com")
            .appName("SparkOSS")
            .getOrCreate()
      
          val sc = spark.sparkContext
          try {
            // Read data from OSS objects. 
            val pathIn = "oss://spark-oss/workline.txt"
            val inputData = sc.textFile(pathIn, 5)
                  // Write Resilient Distributed Datasets (RDD). 
            inputData.repartition(1).saveAsTextFile("oss://spark-oss/user/data3")
      
          } finally {
            sc.stop()
          }
        }
      }
      説明

      サンプルコードを実行する前に、hadoop-fs-oss依存関係が追加されているかどうかを確認します。 依存関係が追加されない場合、エラーが返されます。

    • 例2: ローカルモードのサンプルコード

      package com.aliyun.odps.spark.examples
      import java.io.ByteArrayInputStream
      import com.aliyun.oss.{OSSClientBuilder,OSSClient}
      import org.apache.spark.sql.SparkSession
      
      object SparkOSS {
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .config("spark.master", "local[4]") // The code can run after you set spark.master to local[N]. N indicates the number of concurrent Spark jobs. 
            .config("spark.hadoop.fs.oss.accessKeyId", "")
            .config("spark.hadoop.fs.oss.accessKeySecret", "")
            .config("spark.hadoop.fs.oss.endpoint", "oss-cn-beijing.aliyuncs.com")
            .appName("SparkOSS")
            .getOrCreate()
      
          val sc = spark.sparkContext
          try {
            // Read data from OSS objects. 
            val pathIn = "oss://spark-oss/workline.txt"
            val inputData = sc.textFile(pathIn, 5)
            val cnt = inputData.count
            inputData.count()
            println(s"count: $cnt")
      
            // Write data to OSS objects. 
            // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console.
      		  // In this example, the AccessKey ID and AccessKey secret are configured as environment variables. You can also save your AccessKey pair in the configuration file based on your business requirements.
      			// We recommend that you do not directly specify the AccessKey ID and AccessKey secret in code to prevent AccessKey pair leaks.
            val ossClient = new OSSClientBuilder().build("oss-cn-beijing.aliyuncs.com", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
            val filePath="user/data"
            ossClient.putObject("spark-oss",filePath , new ByteArrayInputStream(cnt.toString.getBytes()))
            ossClient.shutdown()
          } finally {
            sc.stop()
          }
        }
      }
    • 例3: クラスターモードのサンプルコード

      package com.aliyun.odps.spark.examples
      import java.io.ByteArrayInputStream
      import com.aliyun.oss.{OSSClientBuilder,OSSClient}
      import org.apache.spark.sql.SparkSession
      
      object SparkOSS {
        def main(args: Array[String]) {
          val spark = SparkSession
            .builder()
            .appName("SparkOSS")
            .getOrCreate()
      
          val sc = spark.sparkContext
          try {
            // Read data from OSS objects. 
            val pathIn = "oss://spark-oss/workline.txt"
            val inputData = sc.textFile(pathIn, 5)
            val cnt = inputData.count
            inputData.count()
            println(s"count: $cnt")
      
            // inputData.repartition(1).saveAsTextFile("oss://spark-oss/user/data3")
            // Write data to OSS objects. 
            // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console.
      			// In this example, the AccessKey ID and AccessKey secret are configured as environment variables. You can also save your AccessKey pair in the configuration file based on your business requirements.
      			// We recommend that you do not directly specify the AccessKey ID and AccessKey secret in code to prevent AccessKey pair leaks.
            val ossClient = new OSSClientBuilder().build("oss-cn-beijing.aliyuncs.com", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
            val filePath="user/data"
            ossClient.putObject("spark-oss",filePath , new ByteArrayInputStream(cnt.toString.getBytes()))
            ossClient.shutdown()
          } finally {
            sc.stop()
          }
        }
      }
  • コミットする方法

    • IntelliJ IDEAを使用して、ローカルモードのコードを開発、テスト、およびコミットします。 詳細については、「MaxComputeでのSparkの実行モード」をご参照ください。

    • ODPS Sparkノードを使用して、DataWorksコンソールでコードをコミットして実行します。 詳細については、「ODPS Sparkタスクの開発」をご参照ください。

MaxComputeからデータを読み取り、OSSにデータを書き込む例

IntelliJ IDEAまたはDataWorksを使用して、MaxComputeからデータを読み取り、OSSにデータを書き込みます。

  • サンプルコード

    • ローカルモードのサンプルコード

      package com.aliyun.odps.spark.examples.userpakage
      
      import org.apache.spark.sql.{SaveMode, SparkSession}
      
      object SparkODPS2OSS {
        def main(args: Array[String]): Unit = {
          val spark = SparkSession
            .builder()
            .appName("Spark2OSS")
            .config("spark.master", "local[4]")// The code can run after you set spark.master to local[N]. N indicates the number of concurrent Spark jobs.
            .config("spark.hadoop.odps.project.name", "")
            .config("spark.hadoop.odps.access.id", "")
            .config("spark.hadoop.odps.access.key", "")
            .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
            .config("spark.sql.catalogImplementation", "odps")
            .config("spark.hadoop.fs.oss.accessKeyId","")
            .config("spark.hadoop.fs.oss.accessKeySecret","")
            .config("spark.hadoop.fs.oss.endpoint","oss-cn-beijing.aliyuncs.com")
            .getOrCreate()
      
          try{
      
            // Use Spark SQL to query tables.
            val data = spark.sql("select * from  user_detail")
           // Show the query results.
            data.show(10)
            // Store the query results to an OSS object.
            data.toDF().coalesce(1).write.mode(SaveMode.Overwrite).csv("oss://spark-oss/user/data3")
          }finally {
            spark.stop()
          }
      
        }
      }
    • クラスターモードのサンプルコード

      package com.aliyun.odps.spark.examples.userpakage
      import org.apache.spark.sql.{SaveMode, SparkSession}
      
      object SparkODPS2OSS {
        def main(args: Array[String]): Unit = {
          val spark = SparkSession
            .builder()
            .appName("SparkODPS2OSS")
            .getOrCreate()
      
          try{
      
            // Use Spark SQL to query tables.
            val data = spark.sql("select * from  user_detail")
           // Show the query results.
            data.show(10)
            // Store the query results to an OSS object.
            data.toDF().coalesce(1).write.mode(SaveMode.Overwrite).csv("oss://spark-oss/user/data3")
      
          }finally {
            spark.stop()
          }
      
        }
      }
  • コミットする方法

    • IntelliJ IDEAを使用して、ローカルモードのコードを開発、テスト、およびコミットします。

    • ODPS Sparkノードを使用して、DataWorksコンソールでコードをコミットして実行します。 詳細については、「ODPS Sparkタスクの開発」をご参照ください。

    説明

    Spark開発環境の設定方法の詳細については、「MaxComputeでのSparkの実行モード」をご参照ください。