This topic describes the basic operations on the Spark shell and resilient distributed datasets (RDDs).
Start the Spark shell
The Spark shell is a powerful tool for interactive data analysis and provides a simple way to learn APIs. The Spark shell supports both Scala and Python.
To start the Spark shell, perform the following steps:
Log on to the master node of the cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to start the Spark shell:
spark-shellIn the Spark shell, a special SparkContext is already created in a variable named sc. A self-created SparkContext does not take effect. You can use the
--masterparameter to specify the master node to which the SparkContext is connected and the--jarsparameter to specify the JAR packages added to the classpath. Separate multiple JAR packages with commas (,). For more information about the parameters, you can run thespark-shell --helpcommand.
Basic RDD operations
Spark provides RDDs. An RDD is a collection of elements that can be operated on in parallel. You can create an RDD by using a parallelized collection or an external dataset, such as a shared file system, Hadoop Distributed File System (HDFS), HBase, or an external dataset that provides Hadoop InputFormat.
Sample code:
Create an RDD by using a parallelized collection
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)Create an RDD by using an external dataset
val distFile = sc.textFile("data.txt")
RDD operations can be classified into two types: transformations and actions. Transformations are not immediately executed, unlike actions.
Transformations
Operation
Description
map()Applies a function to each element of an RDD and returns the result as a new RDD.
flatMap()Applies a function to each element of an RDD, flattens the RDD, and then returns the result as a new RDD.
filter()Selects the elements of an RDD on which a function returns true, and returns the result as a new RDD.
distinct()Returns distinct elements of an RDD without using a function.
union()Joins the elements of two RDDs and returns the result as a new RDD.
intersection()Returns elements that appear in both RDDs.
subtract()Removes elements that appear in both RDDs.
cartesian()Generates the Cartesian product of two RDDs.
Actions
Operation
Description
collect()Returns all elements of an RDD.
count()Returns the number of elements of an RDD.
countByValue()Returns the number of times that each element appears in an RDD.
reduce()Aggregates all elements in an RDD, such as calculating the sum.
fold(0)(func)Similar to
reduce()except taking a zero value as the initial value.aggregate(0)(seqOp,combop)Similar to
reduce(), but the data type of the returned RDD is different from that of the original RDD.foreach(func)Applies a specific function to each element of an RDD.
Spark shell example
First, an RDD is created by reading a text file from an external storage system. A map operator is used to calculate the length of each row in the text file. A reduce operator is used to sum up the lengths. This way, you can obtain the total length of all rows in the text file.
Log on to the master node of the cluster in SSH mode. For more information, see Log on to a cluster.
Create a data.txt file and upload it to HDFS.
The content of the data.txt file in this example is shown below.
Hello Spark This is a test file 1234567890You can upload data.txt to HDFS using the following command.
hadoop fs -put data.txt /user/root/Start the Spark shell.
spark-shellRun the following command to calculate the total character length of all lines in the data.txt file.
val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b)