全部產品
Search
文件中心

E-MapReduce:Spark Shell和RDD基礎操作

更新時間:Jul 11, 2025

本文為您介紹如何使用Spark Shell,以及RDD的基礎操作。

啟動Spark Shell

Spark的Shell作為一個強大的互動式資料分析工具,提供了一個簡單的方式學習API。 Spark既可以使用Scala,也可以使用Python。

您可以按照以下操作步驟來啟動Spark Shell。

  1. 使用SSH方式登入叢集的Master節點,詳情請參見登入叢集

  2. 執行以下命令,啟動Spark Shell。

    spark-shell

    在Spark Shell中,已經在名為sc的變數中為您建立了一個特殊的SparkContext,如果您自己建立SparkContext會不生效。您可以使用--master參數設定SparkContext串連到哪個主節點,並且可以通過--jars參數來設定添加到CLASSPATH的JAR包,多個JAR包時使用逗號(,)分隔。更多參數資訊,您可以通過命令spark-shell --help擷取。

RDD基礎操作

Spark圍繞著彈性分布式資料集(RDD)的概念展開,RDD是可以並行操作的元素的容錯集合。Spark支援通過集合來建立RDD和通過外部資料集構建RDD兩種方式來建立RDD。例如,共用檔案系統、HDFS、HBase或任何提供Hadoop InputFormat的資料集。

建立RDD樣本:

  • 通過集合來建立RDD

    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)
  • 通過外部資料集構建RDD

    val distFile = sc.textFile("data.txt")

通常,Spark RDD的常用操作有兩種,分別為Transformation操作和Action操作。Transformation操作並不會立即執行,而是到了Action操作才會被執行。

  • Transformation操作

    操作

    描述

    map()

    參數是函數,函數應用於RDD每一個元素,傳回值是新的RDD。

    flatMap()

    參數是函數,函數應用於RDD每一個元素,拆分元素資料,變成迭代器,傳回值是新的RDD。

    filter()

    參數是函數,函數會過濾掉不合格元素,傳回值是新的RDD。

    distinct()

    沒有參數,將RDD裡的元素進行去重操作。

    union()

    參數是RDD,產生包含兩個RDD所有元素的新RDD。

    intersection()

    參數是RDD,求出兩個RDD的共同元素。

    subtract()

    參數是RDD,去掉原RDD裡和參數RDD裡相同的元素。

    cartesian()

    參數是RDD,求兩個RDD的笛卡爾積。

  • Action操作

    操作

    描述

    collect()

    返回RDD所有元素。

    count()

    返回RDD中的元素個數。

    countByValue()

    返回各元素在RDD中出現的次數。

    reduce()

    並行整合所有RDD資料,例如求和操作。

    fold(0)(func)

    reduce()功能一樣,但是fold帶有初始值。

    aggregate(0)(seqOp,combop)

    reduce()功能一樣,但是返回的RDD資料類型和原RDD不一樣。

    foreach(func)

    對RDD每個元素都是使用特定函數。

Spark Shell入門樣本

先從外部儲存系統讀一個文字檔構造了一個RDD,然後通過RDD的Map運算元計算得到文字檔中每一行的長度,最後通過Reduce運算元計算得到了文字檔中各行長度之和。

  1. 使用SSH方式登入叢集的Master節點,詳情請參見登入叢集

  2. 建立data.txt檔案,並將其上傳到HDFS。

    本文樣本中data.txt檔案的內容所示。

    Hello Spark
    This is a test file
    1234567890

    您可以通過以下命令將data.txt上傳到HDFS。

    hadoop fs -put data.txt /user/root/
  3. 啟動Spark Shell。

    spark-shell
  4. 執行以下命令,統計data.txt檔案中所有行的字元長度總和。

    val lines = sc.textFile("data.txt")
    val lineLengths = lines.map(s => s.length)
    val totalLength = lineLengths.reduce((a, b) => a + b)