このトピックでは、Spark 2.xの依存関係を設定する方法といくつかの例を示します。
Spark 2.xの依存関係の設定
MaxComputeでSparkを使用してSpark 2.xアプリケーションを送信する場合は、次の依存関係をpom.xmlファイルに追加する必要があります。 pom.xmlの詳細については、「pom.xml」をご参照ください。
<properties>
<spark.version>2.3.0</spark.version>
<cupid.sdk.version>3.3.8-public</cupid.sdk.version>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>cupid-sdk</artifactId>
<version>${cupid.sdk.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>hadoop-fs-oss</artifactId>
<version>${cupid.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
<version>${cupid.sdk.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>${scala.version}</version>
</dependency>上記のコードでは、次の手順に基づいてscopeパラメーターを設定します。
Spark-coreやspark-sqlなど、Apache sparkコミュニティでリリースされているすべてのパッケージに対して、scopeをprovidedに設定します。
odps-spark-datasourceモジュールのscopeをcompileに設定します。
WordCountの例 (Scala)
サンプルコード
コミットする方法
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class \ com.aliyun.odps.spark.examples.WordCount \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
MaxComputeテーブルからのデータの読み取りまたはMaxComputeテーブルへのデータの書き込み例 (Scala)
サンプルコード
コミットする方法
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.sparksql.SparkSQL \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
GraphX PageRankの例 (Scala)
サンプルコード
コミットする方法
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.graphx.PageRank \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
MLlib KMeans-ON-OSSの例 (Scala)
spark.hadoop.fs.oss.ststoken.roleArnおよびspark.hadoop.fs.oss.endpointの設定方法については、「OSSアクセスノート」をご参照ください。
サンプルコード
コミットする方法
# Edit the code. val modelOssDir = "oss://bucket/kmeans-model" // Enter the path of the OSS bucket. val spark = SparkSession .builder() .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider") .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com") .appName("KmeansModelSaveToOss") .getOrCreate() cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
OSS UnstructuredDataの例 (Scala)
spark.hadoop.fs.oss.ststoken.roleArnおよびspark.hadoop.fs.oss.endpointの設定方法については、「OSSアクセスノート」をご参照ください。
サンプルコード
コミットする方法
# Edit the code. val pathIn = "oss://bucket/inputdata/" // Enter the path of the OSS bucket. val spark = SparkSession .builder() .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider") .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com") .appName("SparkUnstructuredDataCompute") .getOrCreate() cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
SparkPiの例 (Scala)
サンプルコード
コミットする方法
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.SparkPi \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Spark Streaming LogHubの例 (Scala)
サンプルコード
コミットする方法
# For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.loghub.LogHubStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Spark Streaming LogHubを使用してMaxCompute (Scala) にデータを書き込む例
サンプルコード
コミットする方法
# For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.loghub.LogHub2OdpsDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Spark Streaming DataHubの例 (Scala)
サンプルコード
コミットする方法
# For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.datahub.DataHubStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Spark Streaming DataHubを使用してMaxCompute (Scala) にデータを書き込む例
サンプルコード
コミットする方法
# For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.datahub.DataHub2OdpsDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Spark Streaming Kafkaの例 (Scala)
サンプルコード
コミットする方法
# For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.kafka.KafkaStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
詳細については、「Spark on MaxCompute」をご参照ください。
Spark StructuredStreaming DataHubの例 (Scala)
サンプルコード
コミットする方法
# For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.datahub.DatahubStructuredStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Spark StructuredStreaming Kafkaの例 (Scala)
サンプルコード
コミットする方法
# For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.kafka.KafkaStructuredStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Spark StructuredStreaming LogHubの例 (Scala)
サンプルコード
コミットする方法
# For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.loghub.LoghubStructuredStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
PySparkを使用してMaxComputeテーブルからデータを読み書きする例 (Python)
サンプルコード
コミットする方法
# For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --jars /path/to/odps-spark-datasource_2.11-3.3.8-public.jar \ /path/to/MaxCompute-Spark/spark-2.x/src/main/python/spark_sql.py
PySparkを使用してOSSにデータを書き込む例 (Python)
サンプルコード
コミットする方法
# For information about how to configure the environment variables in the spark-defaults.conf file, see Set up a Spark on MaxCompute development environment. # For information about OSS configurations, see OSS access notes. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --jars /path/to/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar \ /path/to/MaxCompute-Spark/spark-2.x/src/main/python/spark_oss.py # Compile Spark 2.x to obtain the spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar package.
Spark SQLの例 (Java)
Spark SQL Javaサンプルコードの詳細については、「JavaSparkSQL.java」をご参照ください。
MaxComputeからのデータの読み取りとHBaseへのデータの書き込みの例
IntelliJ IDEAを使用して、MaxComputeからデータを読み取り、HBaseにデータを書き込むコードを記述します。
サンプルコード
object McToHbase { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("spark_sql_ddl") .config("spark.sql.catalogImplementation", "odps") .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api") .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api") .getOrCreate() val sc = spark.sparkContext val config = HBaseConfiguration.create() val zkAddress = "" config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress); val jobConf = new JobConf(config) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test") try{ import spark._ spark.sql("select '7', 'long'").rdd.map(row => { val id = row(0).asInstanceOf[String] val name = row(1).asInstanceOf[String] val put = new Put(Bytes.toBytes(id)) put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(name)) (new ImmutableBytesWritable, put) }).saveAsHadoopDataset(jobConf) } finally { sc.stop() } } }コミットする方法: IntelliJ IDEAを使用して、サンプルコードをコミットして実行します。 詳細については、「MaxComputeでのSparkの実行モード」をご参照ください。
OSSオブジェクトからのデータの読み取りとOSSオブジェクトへのデータの書き込みの例
IntelliJ IDEAまたはDataWorksを使用して、OSSオブジェクトからデータを読み書きします。
サンプルコード
例1: ローカルモードのサンプルコード
package com.aliyun.odps.spark.examples import java.io.ByteArrayInputStream import org.apache.spark.sql.SparkSession object SparkOSS { def main(args: Array[String]) { val spark = SparkSession .builder() .config("spark.master", "local[4]") // The code can run after you set spark.master to local[N]. N indicates the number of concurrent Spark jobs. .config("spark.hadoop.fs.oss.accessKeyId", "") .config("spark.hadoop.fs.oss.accessKeySecret", "") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-beijing.aliyuncs.com") .appName("SparkOSS") .getOrCreate() val sc = spark.sparkContext try { // Read data from OSS objects. val pathIn = "oss://spark-oss/workline.txt" val inputData = sc.textFile(pathIn, 5) // Write Resilient Distributed Datasets (RDD). inputData.repartition(1).saveAsTextFile("oss://spark-oss/user/data3") } finally { sc.stop() } } }説明サンプルコードを実行する前に、
hadoop-fs-oss依存関係が追加されているかどうかを確認します。 依存関係が追加されない場合、エラーが返されます。例2: ローカルモードのサンプルコード
package com.aliyun.odps.spark.examples import java.io.ByteArrayInputStream import com.aliyun.oss.{OSSClientBuilder,OSSClient} import org.apache.spark.sql.SparkSession object SparkOSS { def main(args: Array[String]) { val spark = SparkSession .builder() .config("spark.master", "local[4]") // The code can run after you set spark.master to local[N]. N indicates the number of concurrent Spark jobs. .config("spark.hadoop.fs.oss.accessKeyId", "") .config("spark.hadoop.fs.oss.accessKeySecret", "") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-beijing.aliyuncs.com") .appName("SparkOSS") .getOrCreate() val sc = spark.sparkContext try { // Read data from OSS objects. val pathIn = "oss://spark-oss/workline.txt" val inputData = sc.textFile(pathIn, 5) val cnt = inputData.count inputData.count() println(s"count: $cnt") // Write data to OSS objects. // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console. // In this example, the AccessKey ID and AccessKey secret are configured as environment variables. You can also save your AccessKey pair in the configuration file based on your business requirements. // We recommend that you do not directly specify the AccessKey ID and AccessKey secret in code to prevent AccessKey pair leaks. val ossClient = new OSSClientBuilder().build("oss-cn-beijing.aliyuncs.com", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) val filePath="user/data" ossClient.putObject("spark-oss",filePath , new ByteArrayInputStream(cnt.toString.getBytes())) ossClient.shutdown() } finally { sc.stop() } } }例3: クラスターモードのサンプルコード
package com.aliyun.odps.spark.examples import java.io.ByteArrayInputStream import com.aliyun.oss.{OSSClientBuilder,OSSClient} import org.apache.spark.sql.SparkSession object SparkOSS { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("SparkOSS") .getOrCreate() val sc = spark.sparkContext try { // Read data from OSS objects. val pathIn = "oss://spark-oss/workline.txt" val inputData = sc.textFile(pathIn, 5) val cnt = inputData.count inputData.count() println(s"count: $cnt") // inputData.repartition(1).saveAsTextFile("oss://spark-oss/user/data3") // Write data to OSS objects. // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console. // In this example, the AccessKey ID and AccessKey secret are configured as environment variables. You can also save your AccessKey pair in the configuration file based on your business requirements. // We recommend that you do not directly specify the AccessKey ID and AccessKey secret in code to prevent AccessKey pair leaks. val ossClient = new OSSClientBuilder().build("oss-cn-beijing.aliyuncs.com", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) val filePath="user/data" ossClient.putObject("spark-oss",filePath , new ByteArrayInputStream(cnt.toString.getBytes())) ossClient.shutdown() } finally { sc.stop() } } }
コミットする方法
IntelliJ IDEAを使用して、ローカルモードのコードを開発、テスト、およびコミットします。 詳細については、「MaxComputeでのSparkの実行モード」をご参照ください。
ODPS Sparkノードを使用して、DataWorksコンソールでコードをコミットして実行します。 詳細については、「ODPS Sparkタスクの開発」をご参照ください。
MaxComputeからデータを読み取り、OSSにデータを書き込む例
IntelliJ IDEAまたはDataWorksを使用して、MaxComputeからデータを読み取り、OSSにデータを書き込みます。
サンプルコード
ローカルモードのサンプルコード
package com.aliyun.odps.spark.examples.userpakage import org.apache.spark.sql.{SaveMode, SparkSession} object SparkODPS2OSS { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("Spark2OSS") .config("spark.master", "local[4]")// The code can run after you set spark.master to local[N]. N indicates the number of concurrent Spark jobs. .config("spark.hadoop.odps.project.name", "") .config("spark.hadoop.odps.access.id", "") .config("spark.hadoop.odps.access.key", "") .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api") .config("spark.sql.catalogImplementation", "odps") .config("spark.hadoop.fs.oss.accessKeyId","") .config("spark.hadoop.fs.oss.accessKeySecret","") .config("spark.hadoop.fs.oss.endpoint","oss-cn-beijing.aliyuncs.com") .getOrCreate() try{ // Use Spark SQL to query tables. val data = spark.sql("select * from user_detail") // Show the query results. data.show(10) // Store the query results to an OSS object. data.toDF().coalesce(1).write.mode(SaveMode.Overwrite).csv("oss://spark-oss/user/data3") }finally { spark.stop() } } }クラスターモードのサンプルコード
package com.aliyun.odps.spark.examples.userpakage import org.apache.spark.sql.{SaveMode, SparkSession} object SparkODPS2OSS { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("SparkODPS2OSS") .getOrCreate() try{ // Use Spark SQL to query tables. val data = spark.sql("select * from user_detail") // Show the query results. data.show(10) // Store the query results to an OSS object. data.toDF().coalesce(1).write.mode(SaveMode.Overwrite).csv("oss://spark-oss/user/data3") }finally { spark.stop() } } }
コミットする方法
IntelliJ IDEAを使用して、ローカルモードのコードを開発、テスト、およびコミットします。
ODPS Sparkノードを使用して、DataWorksコンソールでコードをコミットして実行します。 詳細については、「ODPS Sparkタスクの開発」をご参照ください。
説明Spark開発環境の設定方法の詳細については、「MaxComputeでのSparkの実行モード」をご参照ください。