本文為您介紹如何使用Spark Shell,以及RDD的基礎操作。
啟動Spark Shell
Spark的Shell作為一個強大的互動式資料分析工具,提供了一個簡單的方式學習API。 Spark既可以使用Scala,也可以使用Python。
您可以按照以下操作步驟來啟動Spark Shell。
使用SSH方式登入叢集的Master節點,詳情請參見登入叢集。
執行以下命令,啟動Spark Shell。
spark-shell在Spark Shell中,已經在名為sc的變數中為您建立了一個特殊的SparkContext,如果您自己建立SparkContext會不生效。您可以使用
--master參數設定SparkContext串連到哪個主節點,並且可以通過--jars參數來設定添加到CLASSPATH的JAR包,多個JAR包時使用逗號(,)分隔。更多參數資訊,您可以通過命令spark-shell --help擷取。
RDD基礎操作
Spark圍繞著彈性分布式資料集(RDD)的概念展開,RDD是可以並行操作的元素的容錯集合。Spark支援通過集合來建立RDD和通過外部資料集構建RDD兩種方式來建立RDD。例如,共用檔案系統、HDFS、HBase或任何提供Hadoop InputFormat的資料集。
建立RDD樣本:
通過集合來建立RDD
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)通過外部資料集構建RDD
val distFile = sc.textFile("data.txt")
通常,Spark RDD的常用操作有兩種,分別為Transformation操作和Action操作。Transformation操作並不會立即執行,而是到了Action操作才會被執行。
Transformation操作
操作
描述
map()參數是函數,函數應用於RDD每一個元素,傳回值是新的RDD。
flatMap()參數是函數,函數應用於RDD每一個元素,拆分元素資料,變成迭代器,傳回值是新的RDD。
filter()參數是函數,函數會過濾掉不合格元素,傳回值是新的RDD。
distinct()沒有參數,將RDD裡的元素進行去重操作。
union()參數是RDD,產生包含兩個RDD所有元素的新RDD。
intersection()參數是RDD,求出兩個RDD的共同元素。
subtract()參數是RDD,去掉原RDD裡和參數RDD裡相同的元素。
cartesian()參數是RDD,求兩個RDD的笛卡爾積。
Action操作
操作
描述
collect()返回RDD所有元素。
count()返回RDD中的元素個數。
countByValue()返回各元素在RDD中出現的次數。
reduce()並行整合所有RDD資料,例如求和操作。
fold(0)(func)和
reduce()功能一樣,但是fold帶有初始值。aggregate(0)(seqOp,combop)和
reduce()功能一樣,但是返回的RDD資料類型和原RDD不一樣。foreach(func)對RDD每個元素都是使用特定函數。
Spark Shell入門樣本
先從外部儲存系統讀一個文字檔構造了一個RDD,然後通過RDD的Map運算元計算得到文字檔中每一行的長度,最後通過Reduce運算元計算得到了文字檔中各行長度之和。
使用SSH方式登入叢集的Master節點,詳情請參見登入叢集。
建立data.txt檔案,並將其上傳到HDFS。
本文樣本中data.txt檔案的內容所示。
Hello Spark This is a test file 1234567890您可以通過以下命令將data.txt上傳到HDFS。
hadoop fs -put data.txt /user/root/啟動Spark Shell。
spark-shell執行以下命令,統計data.txt檔案中所有行的字元長度總和。
val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b)