本文为您介绍如何使用Spark Shell,以及RDD的基础操作。
启动Spark Shell
Spark的Shell作为一个强大的交互式数据分析工具,提供了一个简单的方式学习API。 Spark既可以使用Scala,也可以使用Python。
您可以按照以下操作步骤来启动Spark Shell。
使用SSH方式登录集群的Master节点,详情请参见登录集群。
执行以下命令,启动Spark Shell。
spark-shell
在Spark Shell中,已经在名为sc的变量中为您创建了一个特殊的SparkContext,如果您自己创建SparkContext会不生效。您可以使用
--master
参数设置SparkContext连接到哪个主节点,并且可以通过--jars
参数来设置添加到CLASSPATH的JAR包,多个JAR包时使用逗号(,)分隔。更多参数信息,您可以通过命令spark-shell --help
获取。
RDD基础操作
Spark围绕着弹性分布式数据集(RDD)的概念展开,RDD是可以并行操作的元素的容错集合。Spark支持通过集合来创建RDD和通过外部数据集构建RDD两种方式来创建RDD。例如,共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据集。
创建RDD示例:
通过集合来创建RDD
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)
通过外部数据集构建RDD
val distFile = sc.textFile("data.txt")
通常,Spark RDD的常用操作有两种,分别为Transformation操作和Action操作。Transformation操作并不会立即执行,而是到了Action操作才会被执行。
Transformation操作
操作
描述
map()
参数是函数,函数应用于RDD每一个元素,返回值是新的RDD。
flatMap()
参数是函数,函数应用于RDD每一个元素,拆分元素数据,变成迭代器,返回值是新的RDD。
filter()
参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD。
distinct()
没有参数,将RDD里的元素进行去重操作。
union()
参数是RDD,生成包含两个RDD所有元素的新RDD。
intersection()
参数是RDD,求出两个RDD的共同元素。
subtract()
参数是RDD,去掉原RDD里和参数RDD里相同的元素。
cartesian()
参数是RDD,求两个RDD的笛卡尔积。
Action操作
操作
描述
collect()
返回RDD所有元素。
count()
返回RDD中的元素个数。
countByValue()
返回各元素在RDD中出现的次数。
reduce()
并行整合所有RDD数据,例如求和操作。
fold(0)(func)
和
reduce()
功能一样,但是fold带有初始值。aggregate(0)(seqOp,combop)
和
reduce()
功能一样,但是返回的RDD数据类型和原RDD不一样。foreach(func)
对RDD每个元素都是使用特定函数。
Spark Shell入门示例
先从外部存储系统读一个文本文件构造了一个RDD,然后通过RDD的Map算子计算得到文本文件中每一行的长度,最后通过Reduce算子计算得到了文本文件中各行长度之和。
使用SSH方式登录集群的Master节点,详情请参见登录集群。
创建data.txt文件,并将其上传到HDFS。
本文示例中data.txt文件的内容所示。
Hello Spark This is a test file 1234567890
您可以通过以下命令将data.txt上传到HDFS。
hadoop fs -put data.txt /user/root/
启动Spark Shell。
spark-shell
执行以下命令,统计data.txt文件中所有行的字符长度总和。
val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b)