View By

Categories

 

Spark RDD, Transformations and Actions example

Updated: Nov 9, 2019

In this Apache Spark RDD tutorial you will learn about,

  • Spark RDD with example

  • What is RDD in Spark?

  • Spark transformations

  • Spark actions

  • Spark actions and transformations example

  • Spark RDD operations


What is a RDD in Spark?


According to Apache Spark documentation - "Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat".



Example (for easy understanding) - Not a practical case


I seriously didn't understand anything when I read above definition for the first time, except the fact that RDD is acronym for Resilient Distributed Dataset.


Let's try to understand RDD with a simple example. Assume that you have a collection of 100 movies and you have stored it on your personal laptop. This way you have complete data residing on a single machine (you can say it a node) i.e. your personal laptop.


Now instead of having all movies on single machine, let's say you distributed the movies - 50 movies on laptop A and 50 movies on laptop B. This is where Distributed term comes into picture, 50% of your data is residing on one machine and 50% on another.


Now let's say you were worried that any of the laptop can malfunction and you will lose your movies so you took the backup of movies. Backup of 50 movies which were present on laptop A on laptop B and similarly backup of 50 movies which were present on laptop B on laptop A. This is where the term Resilient or Fault-tolerant comes into picture, dictionary meaning of resilient is to withstand or recover quickly from difficult conditions and basically backup of your movies makes sure that you can recover data anytime from another machine (so called node) if system malfunctions.


Number of times you create the backup or replicate the data into another machine for recovery is also called as replication factor. In above case replication factor was one, as you replicated the data once.

In real life scenarios you will encounter huge amount of data (like movies data in above example) distributed across thousands of worker nodes (like laptop in above example) combination of which is called a cluster with higher replication factors (in above example it was just 1) in order to maintain fault tolerant system.



Basic facts about Spark RDD

  • Resilient Distributed Datasets (RDDs) are basically an immutable collection of elements which is used as fundamental data structure in Apache Spark.

  • You can create RDDs by two methods - Parallelize collection & referencing external datasets.

  • RDDs are immutable i.e. read only data structures so you can't change original RDD. But you can always create a new one.

  • RDDs supports two types of Spark operations - Transformations & Actions.



Parallelize collection


scala> sc.parallelize(1 to 10 by 2)

res8: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:25


Referencing a dataset


scala> val dataFile = sc.textFile("/testdata/MountEverest.txt")

dataFile: org.apache.spark.rdd.RDD[String] = /testdata/MountEverest.txt


See - How to create a RDD?


Spark Transformations & Actions


In Spark, Transformations are functions that produces new RDD from an existing RDD. When you need actual data from a RDD, you need to apply actions. Below is the list of common transformations supported by Spark.


But before that, those who are new to programming..


You will be using lambda functions or sometimes called anonymous functions to pass through these Spark transformations. So you should have basic understanding of lambda functions.


In short, lambda functions are convenient way to write a function when you have to use functions just in one place. For example, if you want to double the number you can simply write; x => x + x like you do in Python and other languages. Syntax in Scala would be like this,


scala> val lfunc = (x:Int) => x + x

lfunc: Int => Int = <function1> // This tells that function takes integer and returns integer

scala> lfunc(3)

res0: Int = 6



Sample Data


I will be using "Where is the Mount Everest?" text data. I just picked some random data to go through these examples.


Where is Mount Everest? (MountEverest.txt)

Mount Everest (Nepali: Sagarmatha सगरमाथा; Tibetan: Chomolungma ཇོ་མོ་གླང་མ; Chinese Zhumulangma 珠穆朗玛) is Earth's highest mountain above sea level, located in the Mahalangur Himal sub-range of the Himalayas. The international border between Nepal (Province No. 1) and China (Tibet Autonomous Region) runs across its summit point. - Reference Wikipedia


scala> val mountEverest = sc.textFile("/testdata/MountEverest.txt")

mountEverest: org.apache.spark.rdd.RDD[String] = /testdata/MountEverest.txt MapPartitionsRDD[1] at textFile at <console>:24


Spark Transformations


I encourage you all to run these examples on Spark-shell side-by-side. Don't just read through them. Type them on your keyboard it will help you learn.


map(func)

This transformation redistributes the data after passing each element through func.


1. For example, if you want to split the Mount Everest text into individual words, you just need to pass this lambda func x => x.split(" ") and it will create a new RDD as shown below.


scala> val words = mountEverest.map(x => x.split(" "))

words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[3] at map at <console>:25


Did you spot the difference between mountEverest and words RDD? Yeah exactly, one is String type and after applying map transformation it's now Array of String.


scala> words.collect()

res1: Array[Array[String]] = Array(Array(Mount, Everest, (Nepali:, Sagarmatha, सगरमाथा;, Tibetan:, Chomolungma, ཇོ་མོ་གླང་མ;, Chinese, Zhumulangma, 珠穆朗玛), is, Earth's, highest, mountain, above, sea, level,, located, in, the, Mahalangur, Himal, sub-range, of, the, Himalayas., The, international, border, between, Nepal, (Province, No., 1), and, China, (Tibet, Autonomous, Region), runs, across, its, summit, point.))


To return all the elements of words RDD we have called collect() action. It's very basic Spark action.


2. Now, suppose you want to get the word count in this text file, you can do something like this - first split the file and then get the length or size of collection.



scala> mountEverest.map(x => x.split(" ").length).collect()

res6: Array[Int] = Array(45) // Mount Everest file has 45 words


scala> mountEverest.map(x => x.split(" ").size).collect()

res7: Array[Int] = Array(45)


3. Lets say you want to get total number of characters in the file, you can do it like this.


scala> mountEverest.map(x => x.length).collect()

res5: Array[Int] = Array(329) // Mount Everest file has 329 characters


4. Suppose you want to make all text upper/lower case, you can do it like this.


scala> mountEverest.map(x => x.toUpperCase()).collect()

res9: Array[String] = Array(MOUNT EVEREST (NEPALI: SAGARMATHA सगरमाथा; TIBETAN: CHOMOLUNGMA ཇོ་མོ་གླང་མ; CHINESE ZHUMULANGMA 珠穆朗玛) IS EARTH'S HIGHEST MOUNTAIN ABOVE SEA LEVEL, LOCATED IN THE MAHALANGUR HIMAL SUB-RANGE OF THE HIMALAYAS. THE INTERNATIONAL BORDER BETWEEN NEPAL (PROVINCE NO. 1) AND CHINA (TIBET AUTONOMOUS REGION) RUNS ACROSS ITS SUMMIT POINT.)


scala> mountEverest.map(x=>x.toLowerCase()).collect()

res35: Array[String] = Array(mount everest (nepali: sagarmatha सगरमाथा; tibetan: chomolungma ཇོ་མོ་གླང་མ; chinese zhumulangma 珠穆朗玛) is earth's highest mountain above sea level, located in the mahalangur himal sub-range of the himalayas.the international border between nepal (province no. 1) and china (tibet autonomous region) runs across its summit point.)



flatmap(func)

As name says it's flattened map. This is also similar to map, except the fact that it gives you more flattened output. For example,


scala> val rdd = sc.parallelize(Seq("Where is Mount Everest","Himalayas India"))

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:24


scala> rdd.collect

res26: Array[String] = Array(Where is Mount Everest, Himalayas India)


1. We have two items in Parallel Collection RDD - "Where is Mount Everest" and "Himalayas India".


scala> rdd.map(x => x.split(" ")).collect

res21: Array[Array[String]] = Array(Array(Where, is, Mount, Everest), Array(Himalayas, India))


2. When map() transformation is applied, it results into two separate array of strings (1st element (Where, is, Mount, Everest) and 2nd element => (Himalayas, India)).


scala> rdd.flatMap(x => x.split(" ")).collect

res23: Array[String] = Array(Where, is, Mount, Everest, Himalayas, India)


3. For flatMap(), output is flattened to single array of string Array[String]. Thus flatMap() is similar to map, where each input item is mapped to 0 or more output items (1st item => 4 elements, 2nd item => 2 elements). This will give you clear picture,


scala> rdd.map(x => x.split(" ")).count()

res24: Long = 2 // as map gives one to one output hence 2=>2

scala> rdd.flatMap(x => x.split(" ")).count()

res25: Long = 6 // as flatMap gives one to zero or more output hence 2=>6


map() => [Where is Mount Everest, Himalayas India] => [[Where, is, Mount, Everest],[Himalayas, India]]

flatMap() => [Where is Mount Everest, Himalayas India] => [Where, is, Mount, Everest, Himalayas, India]


4. Getting back to mountEverest RDD, suppose you want to get the length of each individual word.


scala> mountEverest.flatMap(x=>x.split(" ")).map(x=>(x, x.length)).collect

res82: Array[(String, Int)] = Array((Mount,5), (Everest,7), ((Nepali:,8), (Sagarmatha,10), (सगरमाथा;,8), (Tibetan:,8), (Chomolungma,11), (ཇོ་མོ་གླང་མ;,12), (Chinese,7), (Zhumulangma,11), (珠穆朗玛),5), (is,2), (Earth's,7), (highest,7), (mountain,8), (above,5), (sea,3), (level,,6), (located,7), (in,2), (the,3), (Mahalangur,10), (Himal,5), (sub-range,9), (of,2), (the,3), (Himalayas.The,13), (international,13), (border,6), (between,7), (Nepal,5), ((Province,9), (No.,3), (1),2), (and,3), (China,5), ((Tibet,6), (Autonomous,10), (Region),7), (runs,4), (across,6), (its,3), (summit,6), (point.,6))


filter(func)

As name tells it is used to filter elements same like where clause in SQL and it is case sensitive. For example,



scala> rdd.collect

res26: Array[String] = Array(Where is Mount Everest, Himalayas India)


// Returns one match

scala> rdd.filter(x=>x.contains("Himalayas")).collect

res31: Array[String] = Array(Himalayas India)


// Contains is case sensitive

scala> rdd.filter(x=>x.contains("himalayas")).collect

res33: Array[String] = Array()


scala> rdd.filter(x=>x.toLowerCase.contains("himalayas")).collect

res37: Array[String] = Array(Himalayas India)


Filtering even numbers,

scala> sc.parallelize(1 to 15).filter(x=>(x%2==0)).collect

res57: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14)


scala> sc.parallelize(1 to 15).filter(_%5==0).collect

res59: Array[Int] = Array(5, 10, 15)



mapPartitions(func type Iterator)

Similar to map() transformation but in this case function runs separately on each partition (block) of RDD unlike map() where it was running on each element of partition. Hence mapPartitions are also useful when you are looking for performance gain (calls your function once/partition not once/element).

  • Suppose you have elements from 1 to 100 distributed among 10 partitions i.e. 10 elements/partition. map() transformation will call func 100 times to process these 100 elements but in case of mapPartitions(), func will be called once/partition i.e. 10 times.

  • Secondly, mapPartitions() holds the data in-memory i.e. it will store the result in memory until all the elements of the partition has been processed.

  • mapPartitions() will return the result only after it finishes processing of whole partition.

  • mapPartitions() requires an iterator input unlike map() transformation.

What is an Iterator? - An iterator is a way to access collection of elements one-by-one, its similar to collection of elements like List(), Array() etc in few ways but the difference is that iterator doesn't load the whole collection of elements in memory all together. Instead iterator loads elements one after another. In Scala you access these elements with hasNext and Next operation.


For example,


scala> sc.parallelize(1 to 9, 3).map(x=>(x, "Hello")).collect

res3: Array[(Int, String)] = Array((1,Hello), (2,Hello), (3,Hello), (4,Hello), (5,Hello), (6,Hello), (7,Hello), (8,Hello), (9,Hello))


scala> sc.parallelize(1 to 9, 3).partitions.size

res95: Int = 3


scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(Array("Hello").iterator)).collect

res7: Array[String] = Array(Hello, Hello, Hello)


scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next).iterator)).collect

res11: Array[Int] = Array(1, 4, 7)


In first example, I have applied map() transformation on dataset distributed between 3 partitions so that you can see function is called 9 times. In second example, when we applied mapPartitions(), you will notice it ran 3 times i.e. for each partition once. We had to convert string "Hello" into iterator because mapPartitions() takes iterator as input. In thirds step, I tried to get the iterator next value to show you the element. Note that next is always increasing value, so you can't step back.


See this,


scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next,x.next, "|").iterator)).collect

res18: Array[Any] = Array(1, 2, |, 4, 5, |, 7, 8, |)


In first call next value for partition 1 changed from 1 => 2 , for partition 2 it changed from 4 => 5 and similarly for partition 3 it changed from 7 => 8. You can keep this increasing until hasNext is False (hasNext is a property of iteration which tells you whether collection has ended or not, it returns you True or False based on items left in the collection). For example,


scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next, x.hasNext).iterator)).collect

res19: Array[AnyVal] = Array(1, true, 4, true, 7, true)


You can see hasNext is true because there are elements left in each partition. Now suppose we access all three elements from each partition, then hasNext will result false. For example,


scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next, x.next, x.next, x.hasNext).iterator)).collect

res20: Array[AnyVal] = Array(1, 2, 3, false, 4, 5, 6, false, 7, 8, 9, false)



Just for our understanding, if you will try to access next 4th time, you will get error which is expected.


scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next, x.next, x.next, x.next,x.hasNext).iterator)).collect

19/07/31 11:14:42 ERROR Executor: Exception in task 1.0 in stage 18.0 (TID 56)

java.util.NoSuchElementException: next on empty iterator


Think, map() transformation as special case of mapPartitions() where you have just 1 element in each partition. Isn't it?



mapPartitionsWithIndex(func)

Similar to mapPartitions, but good part is that you have index to see the partition position. For example,


scala> val mp = sc.parallelize(List("One","Two","Three","Four","Five","Six","Seven","Eight","Nine"), 3)

mp: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[38] at parallelize at <console>:24

scala> mp.collect

res23: Array[String] = Array(One, Two, Three, Four, Five, Six, Seven, Eight, Nine)


scala> mp.mapPartitionsWithIndex((index, iterator) => {iterator.toList.map(x => x + "=>" + index ).iterator} ).collect

res26: Array[String] = Array(One=>0, Two=>0, Three=>0, Four=>1, Five=>1, Six=>1, Seven=>2, Eight=>2, Nine=>2)


Index 0 (first partition) has three values as expected, similarly other 2 partitions. If you have any question please mention it in comments section at the end of this blog.



sample(withReplacement, fraction, seed)

Generates a fraction RDD from an input RDD. Note that second argument fraction doesn't represent the fraction of actual RDD. It actually tells the probability of each element in the dataset getting selected for the sample. Seed is optional. First boolean argument decides type of sampling algorithm. For example,


scala> sc.parallelize(1 to 10).sample(true, .4).collect

res103: Array[Int] = Array(4)


scala> sc.parallelize(1 to 10).sample(true, .4).collect

res104: Array[Int] = Array(1, 4, 6, 6, 6, 9)


// Here you can see fraction 0.2 doesn't represent fraction of rdd, 4 elements selected out of 10.

scala> sc.parallelize(1 to 10).sample(true, .2).collect

res109: Array[Int] = Array(2, 4, 7, 10)


// Fraction set to 1 which is the max probability (0 to 1), so each element got selected.

scala> sc.parallelize(1 to 10).sample(false, 1).collect

res111: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)



union(otherDataset)

Similar to SQL union, but it keeps duplicate data.


scala> val rdd1 = sc.parallelize(List("apple","orange","grapes","mango","orange"))

rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[159] at parallelize at <console>:24


scala> val rdd2 = sc.parallelize(List("red","green","yellow"))

rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[160] at parallelize at <console>:24


scala> rdd1.union(rdd2).collect

res116: Array[String] = Array(apple, orange, grapes, mango, orange, red, green, yellow)


scala> rdd2.union(rdd1).collect

res117: Array[String] = Array(red, green, yellow, apple, orange, grapes, mango, orange)



intersection(otherDataset)

Returns intersection of two datasets. For example,


scala> val rdd1 = sc.parallelize(-5 to 5)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[171] at parallelize at <console>:24



scala> val rdd2 = sc.parallelize(1 to 10)

rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[172] at parallelize at <console>:24


scala> rdd1.intersection(rdd2).collect

res119: Array[Int] = Array(4, 1, 5, 2, 3)



distinct()

Returns new dataset with distinct elements. For example, we don't have duplicate orange now.


scala> val rdd = sc.parallelize(List("apple","orange","grapes","mango","orange"))

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[186] at parallelize at <console>:24


scala> rdd.distinct.collect

res121: Array[String] = Array(grapes, orange, apple, mango)



Due to some technical issues I had to move some content of this page to other area. Please refer this for remaining list of transformations. Sorry for the inconvenience guys.

  • groupByKey()

  • reduceByKey()

  • aggregateByKey()

  • sortByKey()

  • join()

  • cartesian()

  • coalesce()

  • repartition()


Now, as said earlier, RDDs are immutable so you can't change original RDD but you can always create a new RDD with spark transformations like map, flatmap, filter, groupByKey, reduceByKey, mapValues, sample, union, intersection, distinct, sortByKey etc.


RDDs transformations are broadly classified into two categories - Narrow & Wide transformation.

  • In narrow transformation like map & filter, all the elements that are required to compute the records in single partition live in the single partition of parent RDD. A limited subset of partition is used to calculate the result.

  • In wide transformation like groupByKey and reduceByKey, all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. The partition may live in many partitions of parent RDD.



Spark Actions


When you want to work on actual dataset, you need to perform spark actions on RDDs like count, reduce, collect, first, takeSample, saveAsTextFile etc.

  • Transformations are lazy in nature i.e. nothing happens when the code is evaluated. Meaning actual execution happens only when code is executed. RDDs are computed only when an action is applied on them. Also called as lazy evaluation. Spark evaluates the expression only when its value is needed by action.

  • When you call an action, it actually triggers transformations to act upon RDD, dataset or dataframe. After that RDD, dataset or dataframe is calculated in memory. In short, transformations will actually occur only when you apply an action. Before that it’s just line of evaluated code :)



Below is the list of Spark actions.


reduce()

It aggregates the elements of the dataset. For example,


scala> val rdd = sc.parallelize(1 to 15).collect

rdd: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)


scala> val rdd = sc.parallelize(1 to 15).reduce(_ + _)

rdd: Int = 120


scala> val rdd = sc.parallelize(Array("Hello", "Dataneb", "Spark")).reduce(_ + _)

rdd: String = SparkHelloDataneb


scala> val rdd = sc.parallelize(Array("Hello", "Dataneb", "Spark")).map(x =>(x, x.length)).flatMap(l=> List(l._2)).collect

rdd: Array[Int] = Array(5, 7, 5)


scala> rdd.reduce(_ + _)

res96: Int = 17


scala> rdd.reduce((x, y)=>x+y)

res99: Int = 17



collect(), count(), first(), take()

Collect returns all the elements of the dataset as an array. For example

scala> sc.parallelize(1 to 20, 4).collect

res100: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)


Counts the number of elements

scala> sc.parallelize(1 to 20, 4).count

res101: Long = 20


First returns the first element

scala> sc.parallelize(1 to 20, 4).first

res102: Int = 1


Take returns the number of elements you pass as argument

scala> sc.parallelize(1 to 20, 4).take(5)

res104: Array[Int] = Array(1, 2, 3, 4, 5)



takeSample()

It returns the random sample of size n. Boolean input is for with or without replacement. For example,

scala> sc.parallelize(1 to 20, 4).takeSample(false,4)

res107: Array[Int] = Array(15, 2, 5, 17)


scala> sc.parallelize(1 to 20, 4).takeSample(false,4)

res108: Array[Int] = Array(12, 5, 4, 11)


scala> sc.parallelize(1 to 20, 4).takeSample(true,4)

res109: Array[Int] = Array(18, 4, 1, 18)



takeOrdered()

It returns the elements in ordered fashion. For example,

scala> sc.parallelize(1 to 20, 4).takeOrdered(7)

res117: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7)



Just opposite to top() action

scala> sc.parallelize(1 to 20, 4).top(7)

res118: Array[Int] = Array(20, 19, 18, 17, 16, 15, 14)



countByKey()

It takes (key, value) pair and returns (key, count of key). For example,

scala> sc.parallelize(Array("Apple","Banana","Grapes","Oranges","Grapes","Banana")).map(k=>(k,1)).countByKey()

res121: scala.collection.Map[String,Long] = Map(Grapes -> 2, Oranges -> 1, Banana -> 2, Apple -> 1)



saveAsTextFile()

It saves the dataset as text files in local directory or HDFS etc. You can reduce the number of files by coalesce transformation.


scala>sc.parallelize(Array("Apple","Banana","Grapes","Oranges","Grapes","Banana")).saveAsTextFile("sampleFruits.txt")


// Just one partition file with coalesce

scala>sc.parallelize(Array("Apple","Banana","Grapes","Oranges","Grapes","Banana")).coalesce(1).saveAsTextFile("newsampleFruits.txt")



saveAsObjectFile()



It writes the data into simple format using Java serialization and you can load it again using sc.objectFile()

scala> sc.parallelize(List(1,2)).saveAsObjectFile("/Users/Rajput/sample")



foreach()

It is generally used when you want to carry out some operation on output for each element. Like loading each element into database.


scala> sc.parallelize("Hello").collect

res139: Array[Char] = Array(H, e, l, l, o)


scala> sc.parallelize("Hello").foreach(x=>println(x))

l

H

e

l

o


// Output order of elements is not same every time

scala> sc.parallelize("Hello").foreach(x=>println(x))

H

e

l

o

l



Spark Workflow

In this section you will understand how Spark program flows, like how you create intermediate RDDs and apply transformations and actions.

  1. You first create RDDs with parallelize method or referencing external dataset.

  2. Apply Transformations to create new RDDs based on your requirement.

  3. You will have list of RDDs called Lineage.

  4. Apply Actions on RDDs.

  5. Get your Result.




Transformations & Actions example


Let's try to implement above facts with some basic example which will give you more clear picture. Open spark-shell with below command in your terminal (refer mac/windows if you don't have spark installed yet).


./bin/spark-shell



You can see SparkContext automatically created for you with all (*) local resources and app id in above screenshot. You can also check spark context by running sc command. res0 is nothing but result set zero for command sc.



We already read about SparkContext in previous blog.


1. Create RDD, let's say by parallelize method with number of partitions 2. Below RDD will be basically list of characters distributed across 2 partitions.


2. Now, you can either apply transformation to create a new RDD (called lineage) or you can simply apply an action to show the result. Lets first apply few actions.


res1 to res5 shows you the result of each action - collect, first, count, take, reduce, saveAsTextFile. Note (lazy evaluation) when you execute an action spark does the actual evaluation to bring the result.


Now let's see the sample.csv file which is the last action result. Remember we created 2 partitions in first step, thats the reason we have 2 files with equal set of data as part-00000 & part-00001.



3. Now let's try to apply few transformations in order to create RDDs lineage.


Refer the image shown above.

  • In first step we have applied filter transformation to filter character 'a' creating a new RDD called MapPartitionsRDD[2] from our initial RDD ParallelCollectionRDD[0].

  • Similarly in third step we have filtered letter 'x' to create another RDD MapPartitionsRDD[3].

  • In last step we have used map & reduceByKey transformation to group the characters and get their counts, generating a new RDD ShuffleRDD[5].

  • As we have applied 2 transformations on one RDD i.e. map and reduceByKey, you will notice RDD[4] is missing. Spark internally saves the intermediate RDD[4] to generate the resultant ShuffleRDD[5] which is not printed in output.

  • ParallelCollectionRDD[0], MapPartitionsRDD[2], MapPartitionsRDD[3], RDD[4], ShuffleRDD[5] is basically called lineage. You can say intermediate collection of elements which is needed by spark to evaluate your next action.



4. Now, you can notice res7, res8 and res9 are nothing but actions which we applied on lineage RDDs to get the Results.


Thank you!! If you really like the post and you have any question, please don't forget to write in comments section below.



Next: Loading data in Apache Spark


Navigation menu

1. Apache Spark and Scala Installation

1.1 Spark installation on Windows​

1.2 Spark installation on Mac

2. Getting Familiar with Scala IDE

2.1 Hello World with Scala IDE​

3. Spark data structure basics

3.1 Spark RDD Transformations and Actions example

4. Spark Shell

4.1 Starting Spark shell with SparkContext example​

5. Reading data files in Spark

5.1 SparkContext Parallelize and read textFile method

5.2 Loading JSON file using Spark Scala

5.3 Loading TEXT file using Spark Scala

5.4 How to convert RDD to dataframe?

6. Writing data files in Spark

​6.1 How to write single CSV file in Spark

7. Spark streaming

7.1 Word count example Scala

7.2 Analyzing Twitter texts

8. Sample Big Data Architecture with Apache Spark

9. What's Artificial Intelligence, Machine Learning, Deep Learning, Predictive Analytics, Data Science?

10. Spark Interview Questions and Answers

Write your first blog & earn!!

Home   |   Contact Us

©2019 by Data Nebulae