The Spark shell is an interactive environment for exploring data and learning the Spark API. This topic shows you how to start the Spark shell on an E-MapReduce (EMR) cluster and work with resilient distributed datasets (RDDs).
Prerequisites
Before you begin, ensure that you have:
-
An EMR cluster with Spark installed
-
SSH access to the master node of the cluster. See Log on to a cluster
Start the Spark shell
The Spark shell supports both Scala and Python.
-
Log on to the master node via SSH.
-
Start the shell:
spark-shellAfter the shell starts, a SparkContext is available as
sc. Any SparkContext you create manually will not take effect — use the pre-createdsc. Use--masterto specify the cluster master URL and--jarsto add JAR packages to the classpath. Separate multiple JAR paths with commas. For the full list of options, runspark-shell --help.
RDD basics
An RDD is a collection of elements that Spark can process in parallel. RDDs support two creation methods:
-
Parallelized collection — wrap an in-memory collection with
sc.parallelize():val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data) -
External dataset — read from a file system such as Hadoop Distributed File System (HDFS), HBase, a shared file system, or any storage source that provides Hadoop InputFormat:
val distFile = sc.textFile("data.txt")
Transformations and actions
RDD operations fall into two categories:
-
Transformations — define a new RDD from an existing one. Transformations are lazy: Spark records what to compute but does nothing until an action is called.
-
Actions — trigger computation and return a result to the driver or write data to storage.
Understanding this distinction matters in practice. When you chain transformations, Spark builds a computation plan. Only when you call an action does Spark break that plan into tasks and run them across the cluster.
Transformations
| Operation | Description |
|---|---|
map() |
Applies a function to each element and returns a new RDD. |
flatMap() |
Applies a function to each element, flattens the results, and returns a new RDD. |
filter() |
Returns a new RDD containing only elements for which the function returns true. |
distinct() |
Returns a new RDD with duplicate elements removed. |
union() |
Returns a new RDD containing all elements from both RDDs. |
intersection() |
Returns a new RDD containing only elements present in both RDDs. |
subtract() |
Returns a new RDD with elements from the second RDD removed. |
cartesian() |
Returns a new RDD with the Cartesian product of both RDDs. |
Actions
| Operation | Description |
|---|---|
collect() |
Returns all elements of the RDD to the driver. |
count() |
Returns the number of elements in the RDD. |
countByValue() |
Returns the number of times that each element appears in an RDD. |
reduce() |
Aggregates all elements in an RDD, such as calculating the sum. |
fold(0)(func) |
Like reduce(), but uses a zero value as the initial value. |
aggregate(0)(seqOp, combOp) |
Like reduce(), but the return type can differ from the element type. |
foreach(func) |
Applies a function to each element of an RDD. |
Example: calculate total line length
This example reads a text file from HDFS, uses a map transformation to get the length of each line, and uses a reduce action to sum those lengths.
Step 1: Upload the data file
-
Log on to the master node via SSH.
-
Create a file named
data.txtwith the following content:Hello Spark This is a test file 1234567890 -
Upload the file to HDFS:
hadoop fs -put data.txt /user/root/
Step 2: Start the Spark shell
spark-shell
Step 3: Calculate total line length
Run the following in the shell:
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
Here is what each line does:
-
val lines = sc.textFile("data.txt")— creates a base RDD pointing to the file. The file is not read yet;linesis merely a pointer. -
val lineLengths = lines.map(s => s.length)— defines a transformation that will compute each line's length. Nothing runs at this point. -
val totalLength = lineLengths.reduce((a, b) => a + b)— this is an action. Spark reads the file, applies themap, and runs the reduction across the cluster.