Understanding SparkContext textFile & parallelize method
Main menu: Spark Scala Tutorial
In this blog you will learn,
How Spark reads text file or any other external dataset.
Referencing a dataset (SparkContext's textfile), SparkContext parallelize method and spark dataset textFile method.
As we read in previous post, Apache Spark has mainly three types of objects or you can say data structures (also called Spark APIs) - RDDs, dataframe and datasets. RDD was the primary API when Apache Spark was founded.
RDD - Resilient Distributed Dataset
Consider you have collection of 100 words and you distribute them across 10 partitions so that each partition has 10 words (more or less). Each partition has a backup so that it can be recovered in case of failure (resilient).
Now, this seems very generic. In practical environment data will be distributed in a cluster with thousand of nodes (with backup nodes), and if you want to access the data you need to apply Spark actions which you will learn soon. This type of immutable distributed collection of elements is called RDD.
Dataframes
This has also similar distribution of elements like RDD but in this case, data is organized into a structure, like a table of relational database. Consider you have distributed collection of [row] type object, like a record distributed across thousand of nodes. You will get more clear picture when we will create dataframe, so don't worry.
Datasets
Dataset was introduced in late 2016. Do you remember case class which you created in "Just enough Scala for Spark"? Dataset is like the collection of strongly typed such objects, like the following case class Order which has 2 attributes orderNum (Int) and orderItem (String).
It was the introduction, so even if you don't understand, thats's fine. You will get more clear picture with practical examples.
Question is.. Which data structure you should implement?
It totally depends on the business use case which data structure you should implement.
For instance, Datasets and RDDs are basically used for unstructured data like streams of media texts, when schema and columnar format of data is not mandatory requirement (like accessing data by column name and any other tabular attributes).
Also, RRDs are often used when you want full control over physical distribution of data over thousands of nodes in a cluster.
Similarly, Dataframes are often used with Spark SQL when you have structured data and you need schema and columnar format of data maintained throughout the process.
Datasets are also used in such scenario where you have unstructured or semi-structured data and you want to run Spark SQL.
That being said, we have mainly following methods to load data in Spark.
SparkContext's textfile method which results into RDD.
SparkContext's parallelize collection, which also results into RDD.
Spark read textFile method which results into Dataset.
SQLContext read json which results into Dataframe.
Spark session read json which results into Dataframe.
You can also create these with parquet files, read parquet method. Similarly there are other methods, it's difficult to list all of them but these examples will give you a picture how you can create them.
1. SparkContext textfile [spark.rdd family]
Text file RDDs can be created using SparkContext's textfile method. Define SparkConf and SparkContext like we did in earlier post and use SparkContext to read the textfile. I have created a sample text file with text data regarding - Where is Mount Everest? Got the answer from Wikipedia.
scala> val dataFile = sc.textFile("/Users/Rajput/Documents/testdata/MountEverest.txt")
dataFile: org.apache.spark.rdd.RDD[String] = /Users/Rajput/Documents/testdata/MountEverest.txt MapPartitionsRDD[1] at textFile at <console>:27
File has 9 lines and you can see the first line in above screenshot.
Further, you can count the number of words in the file by splitting the text (with space character) and applying count() action. You will learn about transformations like flatMap and action count soon, so don't worry.
scala> dataFile.flatMap(line => line.split(" ")).count()
res4: Long = 544
Right now the motive is to tell - how you read text file with textFile member of SparkContext family. The resultant is an RDD.
Important notes:
We can use wildcards characters to read multiple files together ("/file/path/*.txt).
It can read compressed files (*.gz), files from HDFS, Amazon S3, Hbase etc.
2. SparkContext parallelize collection [spark.rdd family]
This method is used to distribute the collection of same type of elements (in an array, list etc). This distributed dataset can be operated in parallel.
// Parallelizing list of strings
scala> val distData = sc.parallelize(List("apple","orange","banana","grapes"))
distData: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:27
// 4 total elements
scala> distData.count()
res5: Long = 4
or like these,
scala> sc.parallelize(Array("Hello Dataneb! How are you?"))
res3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> sc.parallelize(Array("Hello","Spark","Dataneb","Apache"))
res4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:25
scala> sc.parallelize(List(1 to 10))
res6: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[2] at parallelize at <console>:25
scala> sc.parallelize(1 to 10)
res7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:25
scala> sc.parallelize(1 to 10 by 2)
res8: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:25
You can also see the size of partitions,
scala> res8.partitions.size
res13: Int = 4
3. Read text file to create Dataset [spark.sql family]
You can create dataset from a text file or any other file system like HDFS. Here, you can use default spark session which gets created when you start spark-shell.
// creating dataset
scala> val distDataset = spark.read.textFile("/Users/Rajput/Documents/testdata/MountEverest.txt")
distDataset: org.apache.spark.sql.Dataset[String] = [value: string]
// 9 lines
scala> distDataset.count()
res0: Long = 9
// 544 total word count
scala> distDataset.flatMap(line => line.split(" ")).count()
res2: Long = 544
// 5 Lines with Everest
scala> distDataset.filter(line => line.contains("Everest")).count()
res3: Long = 5
Here is the shell screenshot;
4. SQLContext read json to create Dataframe [spark.sql family]
You can create dataframes with SQLContext. SQLContext is a type of class in Spark which is like entry point for Spark SQL.
// you need to import sql library to create SQLContext
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
// telling Spark to use same configuration as Spark context
scala> val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@40eb85e9
My json file looks like this,
[ { "color": "red", "value": "#f00" }, { "color": "green", "value": "#0f0" }, { "color": "blue", "value": "#00f" }, { "color": "cyan", "value": "#0ff" }, { "color": "magenta", "value": "#f0f" }, { "color": "yellow", "value": "#ff0" }, { "color": "black", "value": "#000" } ]
// creating dataframe
scala> val df = sqlContext.read.json("/Volumes/MYLAB/testdata/multilinecolors.json")
df: org.apache.spark.sql.DataFrame = [color: string, value: string]
// printing schema of dataframe, like a table
scala> df.printSchema()
root
|-- color: string (nullable = true)
|-- value: string (nullable = true)
// storing this dataframe into temp table
scala> df.registerTempTable("tmpTable")
// retrieving data
scala> sqlContext.sql("select * from tmpTable").show()
+-------+-----+
| color|value|
+-------+-----+
| red| #f00|
| green| #0f0|
| blue| #00f|
| cyan| #0ff|
|magenta| #f0f|
| yellow| #ff0|
| black| #000|
+-------+-----+
5. Spark Session to create dataframe [spark.sql family]
You can also create dataframe from default spark session which is created when you start the spark-shell. Refer spark-shell blog.
scala> spark
res14: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@6c9fe061
scala> spark.read.json("/Volumes/MYLAB/testdata/multilinecolors.json")
res16: org.apache.spark.sql.DataFrame = [color: string, value: string]
scala> res16.show()
+-------+-----+
| color|value|
+-------+-----+
| red| #f00|
| green| #0f0|
| blue| #00f|
| cyan| #0ff|
|magenta| #f0f|
| yellow| #ff0|
| black| #000|
+-------+-----+
scala> res16.printSchema()
root
|-- color: string (nullable = true)
|-- value: string (nullable = true)
scala> res16.select("color").show()
+-------+
| color|
+-------+
| red|
| green|
| blue|
| cyan|
|magenta|
| yellow|
| black|
+-------+
scala> res16.filter($"color"==="blue").show()
+-----+-----+
|color|value|
+-----+-----+
| blue| #00f|
+-----+-----+
You can also convert dataframe back to JSON like this,
scala> res16.toJSON.show(false)
+----------------------------------+
|value |
+----------------------------------+
|{"color":"red","value":"#f00"} |
|{"color":"green","value":"#0f0"} |
|{"color":"blue","value":"#00f"} |
|{"color":"cyan","value":"#0ff"} |
|{"color":"magenta","value":"#f0f"}|
|{"color":"yellow","value":"#ff0"} |
|{"color":"black","value":"#000"} |
+----------------------------------+
You can also create dataframes from parquet, text files etc. You will learn this soon.
That's all guys! If you have any question or suggestion please write in comments section below. Thank you folks.
Next: Spark Transformations
Navigation menu
1. Apache Spark and Scala Installation
2. Getting Familiar with Scala IDE
3. Spark data structure basics
4. Spark Shell
5. Reading data files in Spark
6. Writing data files in Spark
7. Spark streaming
Comments