全部产品
Search
文档中心

开源大数据平台E-MapReduce:Spark Shell和RDD基础操作

更新时间:Jul 10, 2025

本文为您介绍如何使用Spark Shell,以及RDD的基础操作。

启动Spark Shell

Spark的Shell作为一个强大的交互式数据分析工具,提供了一个简单的方式学习API。 Spark既可以使用Scala,也可以使用Python。

您可以按照以下操作步骤来启动Spark Shell。

  1. 使用SSH方式登录集群的Master节点,详情请参见登录集群

  2. 执行以下命令,启动Spark Shell。

    spark-shell

    在Spark Shell中,已经在名为sc的变量中为您创建了一个特殊的SparkContext,如果您自己创建SparkContext会不生效。您可以使用--master参数设置SparkContext连接到哪个主节点,并且可以通过--jars参数来设置添加到CLASSPATH的JAR包,多个JAR包时使用逗号(,)分隔。更多参数信息,您可以通过命令spark-shell --help获取。

RDD基础操作

Spark围绕着弹性分布式数据集(RDD)的概念展开,RDD是可以并行操作的元素的容错集合。Spark支持通过集合来创建RDD和通过外部数据集构建RDD两种方式来创建RDD。例如,共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据集。

创建RDD示例:

  • 通过集合来创建RDD

    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)
  • 通过外部数据集构建RDD

    val distFile = sc.textFile("data.txt")

通常,Spark RDD的常用操作有两种,分别为Transformation操作和Action操作。Transformation操作并不会立即执行,而是到了Action操作才会被执行。

  • Transformation操作

    操作

    描述

    map()

    参数是函数,函数应用于RDD每一个元素,返回值是新的RDD。

    flatMap()

    参数是函数,函数应用于RDD每一个元素,拆分元素数据,变成迭代器,返回值是新的RDD。

    filter()

    参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD。

    distinct()

    没有参数,将RDD里的元素进行去重操作。

    union()

    参数是RDD,生成包含两个RDD所有元素的新RDD。

    intersection()

    参数是RDD,求出两个RDD的共同元素。

    subtract()

    参数是RDD,去掉原RDD里和参数RDD里相同的元素。

    cartesian()

    参数是RDD,求两个RDD的笛卡尔积。

  • Action操作

    操作

    描述

    collect()

    返回RDD所有元素。

    count()

    返回RDD中的元素个数。

    countByValue()

    返回各元素在RDD中出现的次数。

    reduce()

    并行整合所有RDD数据,例如求和操作。

    fold(0)(func)

    reduce()功能一样,但是fold带有初始值。

    aggregate(0)(seqOp,combop)

    reduce()功能一样,但是返回的RDD数据类型和原RDD不一样。

    foreach(func)

    对RDD每个元素都是使用特定函数。

Spark Shell入门示例

先从外部存储系统读一个文本文件构造了一个RDD,然后通过RDD的Map算子计算得到文本文件中每一行的长度,最后通过Reduce算子计算得到了文本文件中各行长度之和。

  1. 使用SSH方式登录集群的Master节点,详情请参见登录集群

  2. 创建data.txt文件,并将其上传到HDFS。

    本文示例中data.txt文件的内容所示。

    Hello Spark
    This is a test file
    1234567890

    您可以通过以下命令将data.txt上传到HDFS。

    hadoop fs -put data.txt /user/root/
  3. 启动Spark Shell。

    spark-shell
  4. 执行以下命令,统计data.txt文件中所有行的字符长度总和。

    val lines = sc.textFile("data.txt")
    val lineLengths = lines.map(s => s.length)
    val totalLength = lineLengths.reduce((a, b) => a + b)