Spark RDD, Transformations and Actions example

Updated: Sep 16, 2020

Main menu: Spark Scala Tutorial

In this Apache Spark RDD tutorial you will learn about,

  • Spark RDD with example

  • What is RDD in Spark?

  • Spark transformations

  • Spark actions

  • Spark actions and transformations example

  • Spark RDD operations


What is a RDD in Spark?

According to Apache Spark documentation - "Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat".

Example (for easy understanding) - Not a practical case

I seriously didn't understand anything when I read above definition for the first time, except the fact that RDD is acronym for Resilient Distributed Dataset.

Let's try to understand RDD with a simple example. Assume that you have a collection of 100 movies and you have stored it on your personal laptop. This way you have complete data residing on a single machine (you can say it a node) i.e. your personal laptop.

Now instead of having all movies on single machine, let's say you distributed the movies - 50 movies on laptop A and 50 movies on laptop B. This is where Distributed term comes into picture, 50% of your data is residing on one machine and 50% on another.

Now let's say you were worried that any of the laptop can malfunction and you will lose your movies so you took the backup of movies. Backup of 50 movies which were present on laptop A on laptop B and similarly backup of 50 movies which were present on laptop B on laptop A. This is where the term Resilient or Fault-tolerant comes into picture, dictionary meaning of resilient is to withstand or recover quickly from difficult conditions and basically backup of your movies makes sure that you can recover data anytime from another machine (so called node) if system malfunctions.

Number of times you create the backup or replicate the data into another machine for recovery is also called as replication factor. In above case replication factor was one, as you replicated the data once.

In real life scenarios you will encounter huge amount of data (like movies data in above example) distributed across thousands of worker nodes (like laptop in above example) combination of which is called a cluster with higher replication factors (in above example it was just 1) in order to maintain fault tolerant system.


Basic facts about Spark RDDs

  • Resilient Distributed Datasets (RDDs) are basically an immutable collection of elements which is used as fundamental data structure in Apache Spark.

  • You can create RDDs by two methods - Parallelize collection & referencing external datasets.

  • RDDs are immutable i.e. read only data structures so you can't change original RDD. But you can always create a new one.

  • RDDs supports two types of Spark operations - Transformations & Actions.

Parallelize collection

scala> sc.parallelize(1 to 10 by 2)

res8: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:25

Referencing a dataset

scala> val dataFile = sc.textFile("/testdata/MountEverest.txt")

dataFile: org.apache.spark.rdd.RDD[String] = /testdata/MountEverest.txt

See - How to create a RDD?


Spark Transformations & Actions

In Spark, Transformations are functions that produces new RDD from an existing RDD. When you need actual data from a RDD, you need to apply actions. Below is the list of common transformations supported by Spark.

But before that, those who are new to programming..

You will be using lambda functions or sometimes called anonymous functions to pass through these Spark transformations. So you should have basic understanding of lambda functions.

In short, lambda functions are convenient way to write a function when you have to use functions just in one place. For example, if you want to double the number you can simply write; x => x + x like you do in Python and other languages. Syntax in Scala would be like this,

scala> val lfunc = (x:Int) => x + x

lfunc: Int => Int = <function1> // This tells that function takes integer and returns integer

scala> lfunc(3)

res0: Int = 6


Sample Data

I will be using "Where is the Mount Everest?" text data. I just picked some random data to go through these examples.

Where is Mount Everest? (MountEverest.txt)
Mount Everest (Nepali: Sagarmatha सगरमाथा; Tibetan: Chomolungma ཇོ་མོ་གླང་མ; Chinese Zhumulangma 珠穆朗玛) is Earth's highest mountain above sea level, located in the Mahalangur Himal sub-range of the Himalayas. The international border between Nepal (Province No. 1) and China (Tibet Autonomous Region) runs across its summit point. - Reference Wikipedia

scala> val mountEverest = sc.textFile("/testdata/MountEverest.txt")

mountEverest: org.apache.spark.rdd.RDD[String] = /testdata/MountEverest.txt MapPartitionsRDD[1] at textFile at <console>:24


Spark Transformations

I encourage you all to run these examples on Spark-shell side-by-side. Don't just read through them. Type them on your keyboard it will help you learn.


This transformation redistributes the data after passing each element through func.

1. For example, if you want to split the Mount Everest text into individual words, you just need to pass this lambda func x => x.split(" ") and it will create a new RDD as shown below.

scala> val words = => x.split(" "))

words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[3] at map at <console>:25

Did you spot the difference between mountEverest and words RDD? Yeah exactly, one is String type and after applying map transformation it's now Array of String.

scala> words.collect()

res1: Array[Array[String]] = Array(Array(Mount, Everest, (Nepali:, Sagarmatha, सगरमाथा;, Tibetan:, Chomolungma, ཇོ་མོ་གླང་མ;, Chinese, Zhumulangma, 珠穆朗玛), is, Earth's, highest, mountain, above, sea, level,, located, in, the, Mahalangur, Himal, sub-range, of, the, Himalayas., The, international, border, between, Nepal, (Province, No., 1), and, China, (Tibet, Autonomous, Region), runs, across, its, summit, point.))

To return all the elements of words RDD we have called collect() action. It's very basic Spark action.

2. Now, suppose you want to get the word count in this text file, you can do something like this - first split the file and then get the length or size of collection.

scala> => x.split(" ").length).collect()

res6: Array[Int] = Array(45) // Mount Everest file has 45 words

scala> => x.split(" ").size).collect()

res7: Array[Int] = Array(45)

3. Lets say you want to get total number of characters in the file, you can do it like this.

scala> => x.length).collect()

res5: Array[Int] = Array(329) // Mount Everest file has 329 characters

4. Suppose you want to make all text upper/lower case, you can do it like this.

scala> => x.toUpperCase()).collect()



res35: Array[String] = Array(mount everest (nepali: sagarmatha सगरमाथा; tibetan: chomolungma ཇོ་མོ་གླང་མ; chinese zhumulangma 珠穆朗玛) is earth's highest mountain above sea level, located in the mahalangur himal sub-range of the himalayas.the international border between nepal (province no. 1) and china (tibet autonomous region) runs across its summit point.)



As name says it's flattened map. This is also similar to map, except the fact that it gives you more flattened output. For example,

scala> val rdd = sc.parallelize(Seq("Where is Mount Everest","Himalayas India"))

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:24

scala> rdd.collect

res26: Array[String] = Array(Where is Mount Everest, Himalayas India)

1. We have two items in Parallel Collection RDD - "Where is Mount Everest" and "Himalayas India".

scala> => x.split(" ")).collect

res21: Array[Array[String]] = Array(Array(Where, is, Mount, Everest), Array(Himalayas, India))

2. When map() transformation is applied, it results into two separate array of strings (1st element (Where, is, Mount, Everest) and 2nd element => (Himalayas, India)).

scala> rdd.flatMap(x => x.split(" ")).collect

res23: Array[String] = Array(Where, is, Mount, Everest, Himalayas, India)

3. For flatMap(), output is flattened to single array of string Array[String]. Thus flatMap() is similar to map, where each input item is mapped to 0 or more output items (1st item => 4 elements, 2nd item => 2 elements). This will give you clear picture,

scala> => x.split(" ")).count()

res24: Long = 2 // as map gives one to one output hence 2=>2

scala> rdd.flatMap(x => x.split(" ")).count()

res25: Long = 6 // as flatMap gives one to zero or more output hence 2=>6

map() => [Where is Mount Everest, Himalayas India] => [[Where, is, Mount, Everest],[Himalayas, India]]

flatMap() => [Where is Mount Everest, Himalayas India] => [Where, is, Mount, Everest, Himalayas, India]

4. Getting back to mountEverest RDD, suppose you want to get the length of each individual word.

scala> mountEverest.flatMap(x=>x.split(" ")).map(x=>(x, x.length)).collect

res82: Array[(String, Int)] = Array((Mount,5), (Everest,7), ((Nepali:,8), (Sagarmatha,10), (सगरमाथा;,8), (Tibetan:,8), (Chomolungma,11), (ཇོ་མོ་གླང་མ;,12), (Chinese,7), (Zhumulangma,11), (珠穆朗玛),5), (is,2), (Earth's,7), (highest,7), (mountain,8), (above,5), (sea,3), (level,,6), (located,7), (in,2), (the,3), (Mahalangur,10), (Himal,5), (sub-range,9), (of,2), (the,3), (Himalayas.The,13), (international,13), (border,6), (between,7), (Nepal,5), ((Province,9), (No.,3), (1),2), (and,3), (China,5), ((Tibet,6), (Autonomous,10), (Region),7), (runs,4), (across,6), (its,3), (summit,6), (point.,6))



As name tells it is used to filter elements same like where clause in SQL and it is case sensitive. For example,

scala> rdd.collect

res26: Array[String] = Array(Where is Mount Everest, Himalayas India)

// Returns one match

scala> rdd.filter(x=>x.contains("Himalayas")).collect

res31: Array[String] = Array(Himalayas India)

// Contains is case sensitive

scala> rdd.filter(x=>x.contains("himalayas")).collect

res33: Array[String] = Array()

scala> rdd.filter(x=>x.toLowerCase.contains("himalayas")).collect

res37: Array[String] = Array(Himalayas India)