Topik ini menjelaskan cara mengonfigurasi dependensi Spark 2.x dan menyediakan beberapa contoh.
Mengonfigurasi dependensi untuk Spark 2.x
Untuk mengirimkan aplikasi Spark 2.x menggunakan Spark di MaxCompute, tambahkan dependensi berikut ke file pom.xml. Untuk informasi lebih lanjut tentang pom.xml, lihat pom.xml.
<properties>
<spark.version>2.3.0</spark.version>
<cupid.sdk.version>3.3.8-public</cupid.sdk.version>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>cupid-sdk</artifactId>
<version>${cupid.sdk.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>hadoop-fs-oss</artifactId>
<version>${cupid.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
<version>${cupid.sdk.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>${scala.version}</version>
</dependency>Dalam kode sebelumnya, atur parameter scope sesuai dengan instruksi berikut:
Atur scope ke provided untuk semua paket yang dirilis di komunitas Apache Spark, seperti spark-core dan spark-sql.
Atur scope ke compile untuk modul odps-spark-datasource.
Contoh WordCount (Scala)
Kode Sampel
Cara Melakukan Commit
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class \ com.aliyun.odps.spark.examples.WordCount \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Contoh membaca data dari atau menulis data ke tabel MaxCompute (Scala)
Kode Sampel
Cara Melakukan Commit
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.sparksql.SparkSQL \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Contoh GraphX PageRank (Scala)
Kode Sampel
Cara Melakukan Commit
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.graphx.PageRank \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Contoh MLlib KMeans-ON-OSS (Scala)
Untuk informasi tentang cara mengonfigurasi spark.hadoop.fs.oss.ststoken.roleArn dan spark.hadoop.fs.oss.endpoint, lihat Catatan Akses OSS.
Kode Sampel
Cara Melakukan Commit
# Edit kode. val modelOssDir = "oss://bucket/kmeans-model" // Masukkan jalur bucket OSS. val spark = SparkSession .builder() .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider") .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com") .appName("KmeansModelSaveToOss") .getOrCreate() cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Contoh Data Tidak Terstruktur OSS (Scala)
Untuk informasi tentang cara mengonfigurasi spark.hadoop.fs.oss.ststoken.roleArn dan spark.hadoop.fs.oss.endpoint, lihat Catatan Akses OSS.
Kode Sampel
Cara Mengirim
# Edit kode. val pathIn = "oss://bucket/inputdata/" // Masukkan jalur bucket OSS. val spark = SparkSession .builder() .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider") .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com") .appName("SparkUnstructuredDataCompute") .getOrCreate() cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Contoh SparkPi (Scala)
Kode Sampel
Cara Melakukan Commit
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.SparkPi \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Contoh Spark Streaming LogHub (Scala)
Kode Sampel
Cara Melakukan Commit
# Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.loghub.LogHubStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Contoh menggunakan Spark Streaming LogHub untuk menulis data ke MaxCompute (Scala)
Kode Sampel
Cara Melakukan Commit
# Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.loghub.LogHub2OdpsDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Contoh Spark Streaming DataHub (Scala)
Kode Sampel
Cara Melakukan Commit
# Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.datahub.DataHubStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Contoh menggunakan Spark Streaming DataHub untuk menulis data ke MaxCompute (Scala)
Kode Sampel
Cara Melakukan Commit
# Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.datahub.DataHub2OdpsDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Contoh Spark Streaming Kafka (Scala)
Kode Sampel
Cara Melakukan Commit
# Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.kafka.KafkaStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Untuk informasi lebih lanjut, lihat Spark on MaxCompute.
Contoh Spark StructuredStreaming DataHub (Scala)
Kode Sampel
Cara Melakukan Commit
# Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.datahub.DatahubStructuredStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Contoh Spark StructuredStreaming Kafka (Scala)
Kode Sampel
Cara Melakukan Commit
# Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.kafka.KafkaStructuredStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Contoh Spark StructuredStreaming LogHub (Scala)
Kode Sampel
Cara Melakukan Commit
# Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.loghub.LoghubStructuredStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Contoh menggunakan PySpark untuk membaca data dari atau menulis data ke tabel MaxCompute (Python)
Kode Sampel
Cara Mengirim
# Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --jars /path/to/odps-spark-datasource_2.11-3.3.8-public.jar \ /path/to/MaxCompute-Spark/spark-2.x/src/main/python/spark_sql.py
Contoh menggunakan PySpark untuk menulis data ke OSS (Python)
Kode Sampel
Cara Melakukan Commit
# Untuk informasi tentang cara mengonfigurasi variabel lingkungan dalam file spark-defaults.conf, lihat Set up a Spark on MaxCompute development environment. # Untuk informasi konfigurasi OSS, lihat Catatan Akses OSS. cd $SPARK_HOME bin/spark-submit --master yarn-cluster --jars /path/to/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar \ /path/to/MaxCompute-Spark/spark-2.x/src/main/python/spark_oss.py # Kompilasi Spark 2.x untuk mendapatkan paket spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar.
Contoh Spark SQL (Java)
Untuk informasi lebih lanjut tentang kode sampel Spark SQL Java, lihat JavaSparkSQL.java.
Contoh membaca data dari MaxCompute dan menulis data ke HBase
Gunakan IntelliJ IDEA untuk menulis kode guna membaca data dari MaxCompute dan menulis data ke HBase.
Kode Sampel
object McToHbase { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("spark_sql_ddl") .config("spark.sql.catalogImplementation", "odps") .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api") .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api") .getOrCreate() val sc = spark.sparkContext val config = HBaseConfiguration.create() val zkAddress = "" config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress); val jobConf = new JobConf(config) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test") try{ import spark._ spark.sql("select '7', 'long'").rdd.map(row => { val id = row(0).asInstanceOf[String] val name = row(1).asInstanceOf[String] val put = new Put(Bytes.toBytes(id)) put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(name)) (new ImmutableBytesWritable, put) }).saveAsHadoopDataset(jobConf) } finally { sc.stop() } } }Cara Mengirim: Gunakan IntelliJ IDEA untuk mengirim dan menjalankan kode sampel. Untuk informasi lebih lanjut, lihat Mode Operasi Spark pada MaxCompute.
Contoh membaca data dari dan menulis data ke objek OSS
Gunakan IntelliJ IDEA atau DataWorks untuk membaca data dari dan menulis data ke objek OSS.
Kode Sampel
Contoh 1: Kode Sampel untuk Mode Lokal
package com.aliyun.odps.spark.examples import java.io.ByteArrayInputStream import org.apache.spark.sql.SparkSession object SparkOSS { def main(args: Array[String]) { val spark = SparkSession .builder() .config("spark.master", "local[4]") // Kode dapat dijalankan setelah Anda mengatur spark.master ke local[N]. N menunjukkan jumlah pekerjaan Spark yang berjalan secara bersamaan. .config("spark.hadoop.fs.oss.accessKeyId", "") .config("spark.hadoop.fs.oss.accessKeySecret", "") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-beijing.aliyuncs.com") .appName("SparkOSS") .getOrCreate() val sc = spark.sparkContext try { // Membaca data dari objek OSS. val pathIn = "oss://spark-oss/workline.txt" val inputData = sc.textFile(pathIn, 5) // Menulis Resilient Distributed Datasets (RDD). inputData.repartition(1).saveAsTextFile("oss://spark-oss/user/data3") } finally { sc.stop() } } }nullSebelum menjalankan kode sampel, periksa apakah dependensi
hadoop-fs-osstelah ditambahkan. Jika dependensi tidak ditambahkan, kesalahan akan dikembalikan.Contoh 2: Kode Sampel untuk Mode Lokal
package com.aliyun.odps.spark.examples import java.io.ByteArrayInputStream import com.aliyun.oss.{OSSClientBuilder,OSSClient} import org.apache.spark.sql.SparkSession object SparkOSS { def main(args: Array[String]) { val spark = SparkSession .builder() .config("spark.master", "local[4]") // Kode dapat dijalankan setelah Anda mengatur spark.master ke local[N]. N menunjukkan jumlah pekerjaan Spark yang berjalan secara bersamaan. .config("spark.hadoop.fs.oss.accessKeyId", "") .config("spark.hadoop.fs.oss.accessKeySecret", "") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-beijing.aliyuncs.com") .appName("SparkOSS") .getOrCreate() val sc = spark.sparkContext try { // Membaca data dari objek OSS. val pathIn = "oss://spark-oss/workline.txt" val inputData = sc.textFile(pathIn, 5) val cnt = inputData.count inputData.count() println(s"count: $cnt") // Menulis data ke objek OSS. // Pasangan AccessKey akun Alibaba Cloud memiliki izin pada semua operasi API. Menggunakan kredensial ini untuk melakukan operasi adalah operasi berisiko tinggi. Kami merekomendasikan Anda menggunakan pengguna RAM untuk memanggil operasi API atau melakukan pemeliharaan rutin. Untuk membuat pengguna RAM, masuk ke konsol RAM. // Dalam contoh ini, ID AccessKey dan Rahasia AccessKey dikonfigurasi sebagai variabel lingkungan. Anda juga dapat menyimpan pasangan AccessKey Anda dalam file konfigurasi sesuai dengan kebutuhan bisnis Anda. // Kami merekomendasikan Anda untuk tidak langsung menentukan ID AccessKey dan Rahasia AccessKey dalam kode untuk mencegah kebocoran pasangan AccessKey. val ossClient = new OSSClientBuilder().build("oss-cn-beijing.aliyuncs.com", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) val filePath="user/data" ossClient.putObject("spark-oss",filePath , new ByteArrayInputStream(cnt.toString.getBytes())) ossClient.shutdown() } finally { sc.stop() } } }Contoh 3: Kode Sampel untuk Mode Kluster
package com.aliyun.odps.spark.examples import java.io.ByteArrayInputStream import com.aliyun.oss.{OSSClientBuilder,OSSClient} import org.apache.spark.sql.SparkSession object SparkOSS { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("SparkOSS") .getOrCreate() val sc = spark.sparkContext try { // Membaca data dari objek OSS. val pathIn = "oss://spark-oss/workline.txt" val inputData = sc.textFile(pathIn, 5) val cnt = inputData.count inputData.count() println(s"count: $cnt") // inputData.repartition(1).saveAsTextFile("oss://spark-oss/user/data3") // Menulis data ke objek OSS. // Pasangan AccessKey akun Alibaba Cloud memiliki izin pada semua operasi API. Menggunakan kredensial ini untuk melakukan operasi adalah operasi berisiko tinggi. Kami merekomendasikan Anda menggunakan pengguna RAM untuk memanggil operasi API atau melakukan pemeliharaan rutin. Untuk membuat pengguna RAM, masuk ke konsol RAM. // Dalam contoh ini, ID AccessKey dan Rahasia AccessKey dikonfigurasi sebagai variabel lingkungan. Anda juga dapat menyimpan pasangan AccessKey Anda dalam file konfigurasi sesuai dengan kebutuhan bisnis Anda. // Kami merekomendasikan Anda untuk tidak langsung menentukan ID AccessKey dan Rahasia AccessKey dalam kode untuk mencegah kebocoran pasangan AccessKey. val ossClient = new OSSClientBuilder().build("oss-cn-beijing.aliyuncs.com", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) val filePath="user/data" ossClient.putObject("spark-oss",filePath , new ByteArrayInputStream(cnt.toString.getBytes())) ossClient.shutdown() } finally { sc.stop() } } }
Cara Melakukan Commit
Gunakan IntelliJ IDEA untuk mengembangkan, menguji, dan mengirim kode untuk mode lokal. Untuk informasi lebih lanjut, lihat Mode Operasi Spark pada MaxCompute.
Gunakan node ODPS Spark untuk mengirim dan menjalankan kode di konsol DataWorks. Untuk informasi lebih lanjut, lihat Mengembangkan Tugas ODPS Spark.
Contoh membaca data dari MaxCompute dan menulis data ke OSS
Gunakan IntelliJ IDEA atau DataWorks untuk membaca data dari MaxCompute dan menulis data ke OSS.
Kode Sampel
Kode Sampel untuk Mode Lokal
package com.aliyun.odps.spark.examples.userpakage import org.apache.spark.sql.{SaveMode, SparkSession} object SparkODPS2OSS { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("Spark2OSS") .config("spark.master", "local[4]")// Kode dapat dijalankan setelah Anda mengatur spark.master ke local[N]. N menunjukkan jumlah pekerjaan Spark yang berjalan secara bersamaan. .config("spark.hadoop.odps.project.name", "") .config("spark.hadoop.odps.access.id", "") .config("spark.hadoop.odps.access.key", "") .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api") .config("spark.sql.catalogImplementation", "odps") .config("spark.hadoop.fs.oss.accessKeyId","") .config("spark.hadoop.fs.oss.accessKeySecret","") .config("spark.hadoop.fs.oss.endpoint","oss-cn-beijing.aliyuncs.com") .getOrCreate() try{ // Gunakan Spark SQL untuk menanyakan tabel. val data = spark.sql("select * from user_detail") // Tampilkan hasil query. data.show(10) // Simpan hasil query ke objek OSS. data.toDF().coalesce(1).write.mode(SaveMode.Overwrite).csv("oss://spark-oss/user/data3") }finally { spark.stop() } } }Kode Sampel untuk Mode Kluster
package com.aliyun.odps.spark.examples.userpakage import org.apache.spark.sql.{SaveMode, SparkSession} object SparkODPS2OSS { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("SparkODPS2OSS") .getOrCreate() try{ // Gunakan Spark SQL untuk menanyakan tabel. val data = spark.sql("select * from user_detail") // Tampilkan hasil query. data.show(10) // Simpan hasil query ke objek OSS. data.toDF().coalesce(1).write.mode(SaveMode.Overwrite).csv("oss://spark-oss/user/data3") }finally { spark.stop() } } }
Cara Melakukan Commit
Gunakan IntelliJ IDEA untuk mengembangkan, menguji, dan mengirim kode untuk mode lokal.
Gunakan node ODPS Spark untuk mengirim dan menjalankan kode di konsol DataWorks. Untuk informasi lebih lanjut, lihat Mengembangkan Tugas ODPS Spark.
nullUntuk informasi lebih lanjut tentang cara mengonfigurasi lingkungan pengembangan Spark, lihat Mode Operasi Spark pada MaxCompute.