top of page



Spark RDD, Transformations and Actions example

Updated: Nov 26, 2022

In this Apache Spark RDD tutorial you will learn about,

  • Spark RDD with example

  • What is RDD in Spark?

  • Spark transformations

  • Spark actions

  • Spark actions and transformations example

  • Spark RDD operations


What is a RDD in Spark?

According to Apache Spark documentation - "Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat".

Example (for easy understanding) - Not a practical case

I seriously didn't understand anything when I read above definition for the first time, except the fact that RDD is acronym for Resilient Distributed Dataset.

Let's try to understand RDD with a simple example. Assume that you have a collection of 100 movies and you have stored it on your personal laptop. This way you have complete data residing on a single machine (you can say it a node) i.e. your personal laptop.

Now instead of having all movies on single machine, let's say you distributed the movies - 50 movies on laptop A and 50 movies on laptop B. This is where Distributed term comes into picture, 50% of your data is residing on one machine and 50% on another.

Now let's say you were worried that any of the laptop can malfunction and you will lose your movies so you took the backup of movies. Backup of 50 movies which were present on laptop A on laptop B and similarly backup of 50 movies which were present on laptop B on laptop A. This is where the term Resilient or Fault-tolerant comes into picture, dictionary meaning of resilient is to withstand or recover quickly from difficult conditions and basically backup of your movies makes sure that you can recover data anytime from another machine (so called node) if system malfunctions.

Number of times you create the backup or replicate the data into another machine for recovery is also called as replication factor. In above case replication factor was one, as you replicated the data once.

In real life scenarios you will encounter huge amount of data (like movies data in above example) distributed across thousands of worker nodes (like laptop in above example) combination of which is called a cluster with higher replication factors (in above example it was just 1) in order to maintain fault tolerant system.


Basic facts about Spark RDDs

  • Resilient Distributed Datasets (RDDs) are basically an immutable collection of elements which is used as fundamental data structure in Apache Spark.

  • You can create RDDs by two methods - Parallelize collection & referencing external datasets.

  • RDDs are immutable i.e. read only data structures so you can't change original RDD. But you can always create a new one.

  • RDDs supports two types of Spark operations - Transformations & Actions.

Parallelize collection

scala> sc.parallelize(1 to 10 by 2)

res8: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:25

Referencing a dataset

scala> val dataFile = sc.textFile("/testdata/MountEverest.txt")

dataFile: org.apache.spark.rdd.RDD[String] = /testdata/MountEverest.txt


Spark Transformations & Actions

In Spark, Transformations are functions that produces new RDD from an existing RDD. When you need actual data from a RDD, you need to apply actions. Below is the list of common transformations supported by Spark.

But before that, those who are new to programming..

You will be using lambda functions or sometimes called anonymous functions to pass through these Spark transformations. So you should have basic understanding of lambda functions.

In short, lambda functions are convenient way to write a function when you have to use functions just in one place. For example, if you want to double the number you can simply write; x => x + x like you do in Python and other languages. Syntax in Scala would be like this,

scala> val lfunc = (x:Int) => x + x

lfunc: Int => Int = <function1> // This tells that function takes integer and returns integer

scala> lfunc(3)

res0: Int = 6


Sample Data

I will be using "Where is the Mount Everest?" text data. I just picked some random data to go through these examples.

Where is Mount Everest? (MountEverest.txt)
Mount Everest (Nepali: Sagarmatha सगरमाथा; Tibetan: Chomolungma ཇོ་མོ་གླང་མ; Chinese Zhumulangma 珠穆朗玛) is Earth's highest mountain above sea level, located in the Mahalangur Himal sub-range of the Himalayas. The international border between Nepal (Province No. 1) and China (Tibet Autonomous Region) runs across its summit point. - Reference Wikipedia

scala> val mountEverest = sc.textFile("/testdata/MountEverest.txt")

mountEverest: org.apache.spark.rdd.RDD[String] = /testdata/MountEverest.txt MapPartitionsRDD[1] at textFile at <console>:24


Spark Transformations

I encourage you all to run these examples on Spark-shell side-by-side. Don't just read through them. Type them on your keyboard it will help you learn.


This transformation redistributes the data after passing each element through func.

1. For example, if you want to split the Mount Everest text into individual words, you just need to pass this lambda func x => x.split(" ") and it will create a new RDD as shown below.

scala> val words = => x.split(" "))

words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[3] at map at <console>:25

Did you spot the difference between mountEverest and words RDD? Yeah exactly, one is String type and after applying map transformation it's now Array of String.

scala> words.collect()

res1: Array[Array[String]] = Array(Array(Mount, Everest, (Nepali:, Sagarmatha, सगरमाथा;, Tibetan:, Chomolungma, ཇོ་མོ་གླང་མ;, Chinese, Zhumulangma, 珠穆朗玛), is, Earth's, highest, mountain, above, sea, level,, located, in, the, Mahalangur, Himal, sub-range, of, the, Himalayas., The, international, border, between, Nepal, (Province, No., 1), and, China, (Tibet, Autonomous, Region), runs, across, its, summit, point.))

To return all the elements of words RDD we have called collect() action. It's very basic Spark action.

2. Now, suppose you want to get the word count in this text file, you can do something like this - first split the file and then get the length or size of collection.

scala> => x.split(" ").length).collect()

res6: Array[Int] = Array(45) // Mount Everest file has 45 words

scala> => x.split(" ").size).collect()

res7: Array[Int] = Array(45)

3. Lets say you want to get total number of characters in the file, you can do it like this.

scala> => x.length).collect()

res5: Array[Int] = Array(329) // Mount Everest file has 329 characters

4. Suppose you want to make all text upper/lower case, you can do it like this.

scala> => x.toUpperCase()).collect()



res35: Array[String] = Array(mount everest (nepali: sagarmatha सगरमाथा; tibetan: chomolungma ཇོ་མོ་གླང་མ; chinese zhumulangma 珠穆朗玛) is earth's highest mountain above sea level, located in the mahalangur himal sub-range of the himalayas.the international border between nepal (province no. 1) and china (tibet autonomous region) runs across its summit point.)



As name says it's flattened map. This is also similar to map, except the fact that it gives you more flattened output. For example,

scala> val rdd = sc.parallelize(Seq("Where is Mount Everest","Himalayas India"))

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:24

scala> rdd.collect

res26: Array[String] = Array(Where is Mount Everest, Himalayas India)

1. We have two items in Parallel Collection RDD - "Where is Mount Everest" and "Himalayas India".

scala> => x.split(" ")).collect

res21: Array[Array[String]] = Array(Array(Where, is, Mount, Everest), Array(Himalayas, India))

2. When map() transformation is applied, it results into two separate array of strings (1st element (Where, is, Mount, Everest) and 2nd element => (Himalayas, India)).

scala> rdd.flatMap(x => x.split(" ")).collect

res23: Array[String] = Array(Where, is, Mount, Everest, Himalayas, India)

3. For flatMap(), output is flattened to single array of string Array[String]. Thus flatMap() is similar to map, where each input item is mapped to 0 or more output items (1st item => 4 elements, 2nd item => 2 elements). This will give you clear picture,

scala> => x.split(" ")).count()

res24: Long = 2 // as map gives one to one output hence 2=>2

scala> rdd.flatMap(x => x.split(" ")).count()

res25: Long = 6 // as flatMap gives one to zero or more output hence 2=>6

map() => [Where is Mount Everest, Himalayas India] => [[Where, is, Mount, Everest],[Himalayas, India]]

flatMap() => [Where is Mount Everest, Himalayas India] => [Where, is, Mount, Everest, Himalayas, India]

4. Getting back to mountEverest RDD, suppose you want to get the length of each individual word.

scala> mountEverest.flatMap(x=>x.split(" ")).map(x=>(x, x.length)).collect

res82: Array[(String, Int)] = Array((Mount,5), (Everest,7), ((Nepali:,8), (Sagarmatha,10), (सगरमाथा;,8), (Tibetan:,8), (Chomolungma,11), (ཇོ་མོ་གླང་མ;,12), (Chinese,7), (Zhumulangma,11), (珠穆朗玛),5), (is,2), (Earth's,7), (highest,7), (mountain,8), (above,5), (sea,3), (level,,6), (located,7), (in,2), (the,3), (Mahalangur,10), (Himal,5), (sub-range,9), (of,2), (the,3), (Himalayas.The,13), (international,13), (border,6), (between,7), (Nepal,5), ((Province,9), (No.,3), (1),2), (and,3), (China,5), ((Tibet,6), (Autonomous,10), (Region),7), (runs,4), (across,6), (its,3), (summit,6), (point.,6))



As name tells it is used to filter elements same like where clause in SQL and it is case sensitive. For example,

scala> rdd.collect

res26: Array[String] = Array(Where is Mount Everest, Himalayas India)

// Returns one match

scala> rdd.filter(x=>x.contains("Himalayas")).collect

res31: Array[String] = Array(Himalayas India)

// Contains is case sensitive

scala> rdd.filter(x=>x.contains("himalayas")).collect

res33: Array[String] = Array()

scala> rdd.filter(x=>x.toLowerCase.contains("himalayas")).collect

res37: Array[String] = Array(Himalayas India)

Filtering even numbers,

scala> sc.parallelize(1 to 15).filter(x=>(x%2==0)).collect

res57: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14)

scala> sc.parallelize(1 to 15).filter(_%5==0).collect

res59: Array[Int] = Array(5, 10, 15)


mapPartitions(func type Iterator)

Similar to map() transformation but in this case function runs separately on each partition (block) of RDD unlike map() where it was running on each element of partition. Hence mapPartitions are also useful when you are looking for performance gain (calls your function once/partition not once/element).

  • Suppose you have elements from 1 to 100 distributed among 10 partitions i.e. 10 elements/partition. map() transformation will call func 100 times to process these 100 elements but in case of mapPartitions(), func will be called once/partition i.e. 10 times.

  • Secondly, mapPartitions() holds the data in-memory i.e. it will store the result in memory until all the elements of the partition has been processed.

  • mapPartitions() will return the result only after it finishes processing of whole partition.

  • mapPartitions() requires an iterator input unlike map() transformation.

What is an Iterator? - An iterator is a way to access collection of elements one-by-one, its similar to collection of elements like List(), Array() etc in few ways but the difference is that iterator doesn't load the whole collection of elements in memory all together. Instead iterator loads elements one after another. In Scala you access these elements with hasNext and Next operation.

For example,

scala> sc.parallelize(1 to 9, 3).map(x=>(x, "Hello")).collect

res3: Array[(Int, String)] = Array((1,Hello), (2,Hello), (3,Hello), (4,Hello), (5,Hello), (6,Hello), (7,Hello), (8,Hello), (9,Hello))

scala> sc.parallelize(1 to 9, 3).partitions.size

res95: Int = 3

scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(Array("Hello").iterator)).collect

res7: Array[String] = Array(Hello, Hello, Hello)

scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(

res11: Array[Int] = Array(1, 4, 7)

In first example, I have applied map() transformation on dataset distributed between 3 partitions so that you can see function is called 9 times. In second example, when we applied mapPartitions(), you will notice it ran 3 times i.e. for each partition once. We had to convert string "Hello" into iterator because mapPartitions() takes iterator as input. In thirds step, I tried to get the iterator next value to show you the element. Note that next is always increasing value, so you can't step back.

See this,

scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(,, "|").iterator)).collect

res18: Array[Any] = Array(1, 2, |, 4, 5, |, 7, 8, |)

In first call next value for partition 1 changed from 1 => 2 , for partition 2 it changed from 4 => 5 and similarly for partition 3 it changed from 7 => 8. You can keep this increasing until hasNext is False (hasNext is a property of iteration which tells you whether collection has ended or not, it returns you True or False based on items left in the collection). For example,

scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(, x.hasNext).iterator)).collect

res19: Array[AnyVal] = Array(1, true, 4, true, 7, true)

You can see hasNext is true because there are elements left in each partition. Now suppose we access all three elements from each partition, then hasNext will result false. For example,

scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(,,, x.hasNext).iterator)).collect

res20: Array[AnyVal] = Array(1, 2, 3, false, 4, 5, 6, false, 7, 8, 9, false)

Just for our understanding, if you will try to access next 4th time, you will get error which is expected.

scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(,,,,x.hasNext).iterator)).collect

19/07/31 11:14:42 ERROR Executor: Exception in task 1.0 in stage 18.0 (TID 56)

java.util.NoSuchElementException: next on empty iterator

Think, map() transformation as special case of mapPartitions() where you have just 1 element in each partition. Isn't it?



Similar to mapPartitions, but good part is that you have index to see the partition position. For example,

scala> val mp = sc.parallelize(List("One","Two","Three","Four","Five","Six","Seven","Eight","Nine"), 3)

mp: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[38] at parallelize at <console>:24

scala> mp.collect

res23: Array[String] = Array(One, Two, Three, Four, Five, Six, Seven, Eight, Nine)

scala> mp.mapPartitionsWithIndex((index, iterator) => { => x + "=>" + index ).iterator} ).collect

res26: Array[String] = Array(One=>0, Two=>0, Three=>0, Four=>1, Five=>1, Six=>1, Seven=>2, Eight=>2, Nine=>2)

Index 0 (first partition) has three values as expected, similarly other 2 partitions. If you have any question please mention it in comments section at the end of this blog.


sample(withReplacement, fraction, seed)

Generates a fraction RDD from an input RDD. Note that second argument fraction doesn't represent the fraction of actual RDD. It actually tells the probability of each element in the dataset getting selected for the sample. Seed is optional. First boolean argument decides type of sampling algorithm. For example,

scala> sc.parallelize(1 to 10).sample(true, .4).collect

res103: Array[Int] = Array(4)

scala> sc.parallelize(1 to 10).sample(true, .4).collect

res104: Array[Int] = Array(1, 4, 6, 6, 6, 9)

// Here you can see fraction 0.2 doesn't represent fraction of rdd, 4 elements selected out of 10.

scala> sc.parallelize(1 to 10).sample(true, .2).collect

res109: Array[Int] = Array(2, 4, 7, 10)

// Fraction set to 1 which is the max probability (0 to 1), so each element got selected.

scala> sc.parallelize(1 to 10).sample(false, 1).collect

res111: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)



Similar to SQL union, but it keeps duplicate data.

scala> val rdd1 = sc.parallelize(List("apple","orange","grapes","mango","orange"))

rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[159] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List("red","green","yellow"))

rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[160] at parallelize at <console>:24

scala> rdd1.union(rdd2).collect

res116: Array[String] = Array(apple, orange, grapes, mango, orange, red, green, yellow)

scala> rdd2.union(rdd1).collect

res117: Array[String] = Array(red, green, yellow, apple, orange, grapes, mango, orange)



Returns intersection of two datasets. For example,

scala> val rdd1 = sc.parallelize(-5 to 5)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[171] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(1 to 10)

rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[172] at parallelize at <console>:24

scala> rdd1.intersection(rdd2).collect

res119: Array[Int] = Array(4, 1, 5, 2, 3)



Returns new dataset with distinct elements. For example, we don't have duplicate orange now.

scala> val rdd = sc.parallelize(List("apple","orange","grapes","mango","orange"))

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[186] at parallelize at <console>:24

scala> rdd.distinct.collect

res121: Array[String] = Array(grapes, orange, apple, mango)

Due to some technical issues I had to move some content of this page to other area. Please refer this for remaining list of transformations. Sorry for the inconvenience guys.

  • groupByKey()

  • reduceByKey()

  • aggregateByKey()

  • sortByKey()

  • join()

  • cartesian()

  • coalesce()

  • repartition()


Now, as said earlier, RDDs are immutable so you can't change original RDD but you can always create a new RDD with spark transformations like map, flatmap, filter, groupByKey, reduceByKey, mapValues, sample, union, intersection, distinct, sortByKey etc.

RDDs transformations are broadly classified into two categories - Narrow & Wide transformation.

  • In narrow transformation like map & filter, all the elements that are required to compute the records in single partition live in the single partition of parent RDD. A limited subset of partition is used to calculate the result.

  • In wide transformation like groupByKey and reduceByKey, all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. The partition may live in many partitions of parent RDD.


Spark Actions

When you want to work on actual dataset, you need to perform spark actions on RDDs like count, reduce, collect, first, takeSample, saveAsTextFile etc.

  • Transformations are lazy in nature i.e. nothing happens when the code is evaluated. Meaning actual execution happens only when code is executed. RDDs are computed only when an action is applied on them. Also called as lazy evaluation. Spark evaluates the expression only when its value is needed by action.

  • When you call an action, it actually triggers transformations to act upon RDD, dataset or dataframe. After that RDD, dataset or dataframe is calculated in memory. In short, transformations will actually occur only when you apply an action. Before that it’s just line of evaluated code :)

Below is the list of Spark actions.


It aggregates the elements of the dataset. For example,

scala> val rdd = sc.parallelize(1 to 15).collect

rdd: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)

scala> val rdd = sc.parallelize(1 to 15).reduce(_ + _)

rdd: Int = 120

scala> val rdd = sc.parallelize(Array("Hello", "Dataneb", "Spark")).reduce(_ + _)

rdd: String = SparkHelloDataneb

scala> val rdd = sc.parallelize(Array("Hello", "Dataneb", "Spark")).map(x =>(x, x.length)).flatMap(l=> List(l._2)).collect

rdd: Array[Int] = Array(5, 7, 5)

scala> rdd.reduce(_ + _)

res96: Int = 17

scala> rdd.reduce((x, y)=>x+y)

res99: Int = 17


collect(), count(), first(), take()

Collect returns all the elements of the dataset as an array. For example

scala> sc.parallelize(1 to 20, 4).collect

res100: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

Counts the number of elements

scala> sc.parallelize(1 to 20, 4).count

res101: Long = 20