View By

Categories

 

Understanding SparkContext textFile & parallelize method

Updated: Oct 25, 2019


In this blog you will learn,

  • How Spark reads text file or any other external dataset.

  • Referencing a dataset (SparkContext's textfile), SparkContext parallelize method and spark dataset textFile method.

As we read in previous post, Apache Spark has mainly three types of objects or you can say data structures (also called Spark APIs) - RDDs, dataframe and datasets. RDD was the primary API when Apache Spark was founded.



RDD - Resilient Distributed Dataset

  • Consider you have collection of 100 words and you distribute them across 10 partitions so that each partition has 10 words (more or less). Each partition has a backup so that it can be recovered in case of failure (resilient).

  • Now, this seems very generic. In practical environment data will be distributed in a cluster with thousand of nodes (with backup nodes), and if you want to access the data you need to apply Spark actions which you will learn soon. This type of immutable distributed collection of elements is called RDD.


Dataframes

  • This has also similar distribution of elements like RDD but in this case, data is organized into a structure, like a table of relational database. Consider you have distributed collection of [row] type object, like a record distributed across thousand of nodes. You will get more clear picture when we will create dataframe, so don't worry.


Datasets

  • Dataset was introduced in late 2016. Do you remember case class which you created in "Just enough Scala for Spark"? Dataset is like the collection of strongly typed such objects, like the following case class Order which has 2 attributes orderNum (Int) and orderItem (String).



It was the introduction, so even if you don't understand, thats's fine. You will get more clear picture with practical examples.



Question is.. Which data structure you should implement?


It totally depends on the business use case which data structure you should implement.

  • For instance, Datasets and RDDs are basically used for unstructured data like streams of media texts, when schema and columnar format of data is not mandatory requirement (like accessing data by column name and any other tabular attributes).

  • Also, RRDs are often used when you want full control over physical distribution of data over thousands of nodes in a cluster.

  • Similarly, Dataframes are often used with Spark SQL when you have structured data and you need schema and columnar format of data maintained throughout the process.

  • Datasets are also used in such scenario where you have unstructured or semi-structured data and you want to run Spark SQL.


That being said, we have mainly following methods to load data in Spark.

  1. SparkContext's textfile method which results into RDD.

  2. SparkContext's parallelize collection, which also results into RDD.

  3. Spark read textFile method which results into Dataset.

  4. SQLContext read json which results into Dataframe.

  5. Spark session read json which results into Dataframe.

  6. You can also create these with parquet files, read parquet method. Similarly there are other methods, it's difficult to list all of them but these examples will give you a picture how you can create them.



1. SparkContext textfile [spark.rdd family]


Text file RDDs can be created using SparkContext's textfile method. Define SparkConf and SparkContext like we did in earlier post and use SparkContext to read the textfile. I have created a sample text file with text data regarding - Where is Mount Everest? Got the answer from Wikipedia.

scala> val dataFile = sc.textFile("/Users/Rajput/Documents/testdata/MountEverest.txt")

dataFile: org.apache.spark.rdd.RDD[String] = /Users/Rajput/Documents/testdata/MountEverest.txt MapPartitionsRDD[1] at textFile at <console>:27



  • File has 9 lines and you can see the first line in above screenshot.

  • Further, you can count the number of words in the file by splitting the text (with space character) and applying count() action. You will learn about transformations like flatMap and action count soon, so don't worry.

scala> dataFile.flatMap(line => line.split(" ")).count()

res4: Long = 544

  • Right now the motive is to tell - how you read text file with textFile member of SparkContext family. The resultant is an RDD.


Important notes:

  • We can use wildcards characters to read multiple files together ("/file/path/*.txt).

  • It can read compressed files (*.gz), files from HDFS, Amazon S3, Hbase etc.



2. SparkContext parallelize collection [spark.rdd family]


This method is used to distribute the collection of same type of elements (in an array, list etc). This distributed dataset can be operated in parallel.


// Parallelizing list of strings

scala> val distData = sc.parallelize(List("apple","orange","banana","grapes"))

distData: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:27


// 4 total elements

scala> distData.count()

res5: Long = 4


or like these,

scala> sc.parallelize(Array("Hello Dataneb! How are you?"))

res3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:25


scala> sc.parallelize(Array("Hello","Spark","Dataneb","Apache"))

res4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:25

scala> sc.parallelize(List(1 to 10))

res6: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[2] at parallelize at <console>:25

scala> sc.parallelize(1 to 10)

res7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:25


scala> sc.parallelize(1 to 10 by 2)

res8: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:25


You can also see the size of partitions,


scala> res8.partitions.size

res13: Int = 4



3. Read text file to create Dataset [spark.sql family]


You can create dataset from a text file or any other file system like HDFS. Here, you can use default spark session which gets created when you start spark-shell.


// creating dataset

scala> val distDataset = spark.read.textFile("/Users/Rajput/Documents/testdata/MountEverest.txt")

distDataset: org.apache.spark.sql.Dataset[String] = [value: string]


// 9 lines

scala> distDataset.count()

res0: Long = 9


// 544 total word count

scala> distDataset.flatMap(line => line.split(" ")).count()

res2: Long = 544


// 5 Lines with Everest

scala> distDataset.filter(line => line.contains("Everest")).count()

res3: Long = 5


Here is the shell screenshot;




4. SQLContext read json to create Dataframe [spark.sql family]


You can create dataframes with SQLContext. SQLContext is a type of class in Spark which is like entry point for Spark SQL.


// you need to import sql library to create SQLContext

scala> import org.apache.spark.sql._

import org.apache.spark.sql._


// telling Spark to use same configuration as Spark context

scala> val sqlContext = new SQLContext(sc)

sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@40eb85e9


My json file looks like this,

[ { "color": "red", "value": "#f00" }, { "color": "green", "value": "#0f0" }, { "color": "blue", "value": "#00f" }, { "color": "cyan", "value": "#0ff" }, { "color": "magenta", "value": "#f0f" }, { "color": "yellow", "value": "#ff0" }, { "color": "black", "value": "#000" } ]


// creating dataframe

scala> val df = sqlContext.read.json("/Volumes/MYLAB/testdata/multilinecolors.json")

df: org.apache.spark.sql.DataFrame = [color: string, value: string]


// printing schema of dataframe, like a table

scala> df.printSchema()

root

|-- color: string (nullable = true)

|-- value: string (nullable = true)


// storing this dataframe into temp table

scala> df.registerTempTable("tmpTable")


// retrieving data

scala> sqlContext.sql("select * from tmpTable").show()

+-------+-----+

| color|value|

+-------+-----+

| red| #f00|

| green| #0f0|

| blue| #00f|

| cyan| #0ff|

|magenta| #f0f|

| yellow| #ff0|

| black| #000|

+-------+-----+



5. Spark Session to create dataframe [spark.sql family]


You can also create dataframe from default spark session which is created when you start the spark-shell. Refer spark-shell blog.


scala> spark

res14: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@6c9fe061


scala> spark.read.json("/Volumes/MYLAB/testdata/multilinecolors.json")

res16: org.apache.spark.sql.DataFrame = [color: string, value: string]


scala> res16.show()

+-------+-----+

| color|value|

+-------+-----+

| red| #f00|

| green| #0f0|

| blue| #00f|

| cyan| #0ff|

|magenta| #f0f|

| yellow| #ff0|

| black| #000|

+-------+-----+


scala> res16.printSchema()

root

|-- color: string (nullable = true)

|-- value: string (nullable = true)


scala> res16.select("color").show()

+-------+

| color|

+-------+

| red|

| green|

| blue|

| cyan|

|magenta|

| yellow|

| black|

+-------+


scala> res16.filter($"color"==="blue").show()

+-----+-----+

|color|value|

+-----+-----+

| blue| #00f|

+-----+-----+



You can also convert dataframe back to JSON like this,

scala> res16.toJSON.show(false)

+----------------------------------+

|value |

+----------------------------------+

|{"color":"red","value":"#f00"} |

|{"color":"green","value":"#0f0"} |

|{"color":"blue","value":"#00f"} |

|{"color":"cyan","value":"#0ff"} |

|{"color":"magenta","value":"#f0f"}|

|{"color":"yellow","value":"#ff0"} |

|{"color":"black","value":"#000"} |

+----------------------------------+


You can also create dataframes from parquet, text files etc. You will learn this soon.


That's all guys! If you have any question or suggestion please write in comments section below. Thank you folks.


Next: Spark Transformations


Navigation menu

1. Apache Spark and Scala Installation

1.1 Spark installation on Windows​

1.2 Spark installation on Mac

2. Getting Familiar with Scala IDE

2.1 Hello World with Scala IDE​

3. Spark data structure basics

3.1 Spark RDD Transformations and Actions example

4. Spark Shell

4.1 Starting Spark shell with SparkContext example​

5. Reading data files in Spark

5.1 SparkContext Parallelize and read textFile method

5.2 Loading JSON file using Spark Scala

5.3 Loading TEXT file using Spark Scala

5.4 How to convert RDD to dataframe?

6. Writing data files in Spark

​6.1 How to write single CSV file in Spark

7. Spark streaming

7.1 Word count example Scala

7.2 Analyzing Twitter texts

8. Sample Big Data Architecture with Apache Spark

9. What's Artificial Intelligence, Machine Learning, Deep Learning, Predictive Analytics, Data Science?

10. Spark Interview Questions and Answers

Write your first blog & earn!!

Home   |   Contact Us

©2019 by Data Nebulae