Spark read Text file into Dataframe
Main menu: Spark Scala Tutorial
In this Spark Scala tutorial you will learn how to read data from a text file & CSV to dataframe. This blog has two sections:
Spark read Text File
Spark read CSV with schema/header
There are various methods to load a text file in Spark. You can refer Spark documentation.
Spark Read Text File
I am loading a text file which is space (" ") delimited. I have chosen this format because in most of the practical cases you will find delimited text files with fixed number of fields.
Further, I will be adding a header to dataframe and transform it to some extent. I have tried to keep the code as simple as possible so that anyone can understand it. You can change the separator, name/number of fields, data type according to your requirement.
I am using squid logs as sample data for this example. It has date, integer and string fields which will help us to apply data type conversions and play around with Spark SQL. You can find complete squid file structure details at this.
No. of fields = 10
Separator is a space character
Sample Data
1286536309.586 921 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml
1286536309.608 829 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml
1286536309.660 785 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml
1286536309.684 808 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml
1286536309.775 195 192.168.0.227 TCP_MISS/200 4120 GET http://i4.ytimg.com/vi/gTHZnIAzmdY/default.jpg - DIRECT/209.85.153.118 image/jpeg
1286536309.795 215 192.168.0.227 TCP_MISS/200 5331 GET http://i2.ytimg.com/vi/-jBxVLD4fzg/default.jpg - DIRECT/209.85.153.118 image/jpeg
1286536309.815 234 192.168.0.227 TCP_MISS/200 5261 GET http://i1.ytimg.com/vi/dCjp28ps4qY/default.jpg - DIRECT/209.85.153.118 image/jpeg
Creating Sample Text File
I have created sample text file - squid.txt with above mentioned records (just copy-paste).
Filename: squid.txt
Path: /Users/Rajput/Documents/testdata
Eclipse IDE Setup (for beginners)
Before writing the Spark program it's necessary to setup Scala project in Eclipse IDE. I assume that you have installed Eclipse, if not please refer my previous blogs for installation steps (Windows | Mac users). These steps will be same for other sections like reading CSV, JSON, JDBC.
1. Create a new Scala project "txtReader"
Go to File → New → Project and enter txtReader in project name field and click finish.
2. Create a new Scala Package "com.dataneb.spark"
Right click on the txtReader project in the Package Explorer panel → New → Package and enter name com.dataneb.spark and finish.
3. Create a Scala object "textfileReader"
Expand the txtReader project tree and right click on the com.dataneb.spark package → New → Scala Object → enter textfileReader in the object name and press finish.
4. Add external jar files (if needed)
Right click on txtReader project → properties → Java Build Path → Add External Jars
Now navigate to the path where you have installed Spark. You will find all the jar files under /spark/jars folder.
Now select all the jar files and click open. Apply and Close.
After adding these jar files you will find Referenced Library folder created on left panel of the screen below Scala object.
5. Setup Scala compiler
Now right click on txtReader project → properties → Scala Compiler and check the box Use Project Settings and select Fixed Scala installation: 2.11.11 (built-in) from drop-down options.
Write the code!
[For beginners] Before you write the Spark program,
I have written separate blog to explain Spark RDD, various transformations and actions. You can go through this for basic understanding.
Refer these blogs for Spark-shell and SparkContext basics if you are new to Spark programming.
However, I have explained little bit in comments above each line of code what it actually does. For list of spark functions you can refer this.
Now, open textfileReader.scala and copy-paste the below code.
// Your package name
package com.dataneb.spark
// Each library has its significance, I have commented when it's used import org.apache.spark._ import org.apache.spark.sql._ import org.apache.log4j._ import org.apache.spark.sql.types.{StructType, StructField, StringType} import org.apache.spark.sql.Row
object textfileReader { // Reducing the error level to just "ERROR" messages // It uses library org.apache.log4j._ // You can apply other logging levels like ALL, DEBUG, ERROR, INFO, FATAL, OFF etc Logger.getLogger("org").setLevel(Level.ERROR) // Defining Spark configuration to set application name and master // It uses library org.apache.spark._ val conf = new SparkConf().setAppName("textfileReader") conf.setMaster("local") // Using above configuration to define our SparkContext val sc = new SparkContext(conf) // Defining SQL context to run Spark SQL // It uses library org.apache.spark.sql._ val sqlContext = new SQLContext(sc) // Main function where all operations will occur def main (args:Array[String]): Unit = { // Reading the text file val squidString = sc.textFile("/Users/Rajput/Documents/testdata/squid.txt") // Defining the data-frame header structure val squidHeader = "time duration client_add result_code bytes req_method url user hierarchy_code type" // Defining schema from header which we defined above // It uses library org.apache.spark.sql.types.{StructType, StructField, StringType} val schema = StructType(squidHeader.split(" ").map(fieldName => StructField(fieldName,StringType, true))) // Converting String RDD to Row RDD for 10 attributes val rowRDD = squidString.map(_.split(" ")).map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5) , x(6) , x(7) , x(8), x(9))) // Creating data-frame based on Row RDD and schema val squidDF = sqlContext.createDataFrame(rowRDD, schema) // Saving as temporary table squidDF.registerTempTable("squid") // Retrieving all the records val allrecords = sqlContext.sql("select * from squid") // Showing top 5 records with false truncation i.e. showing complete row value allrecords.show(5,false)
/* Further you can apply Spark transformations according to your need */ allrecords.write.saveAsTable("allrecords") // Printing schema before transformation allrecords.printSchema() // Something like this for date, integer and string conversion // To have multiline sql use triple quotes val transformedData = sqlContext.sql(""" -- multiline sql select from_unixtime(time) as time, -- you can apply to_date cast(duration as int) as duration, -- casting to integer cast (req_method as string) as req_method from allrecords -- casting to string just to explain where type like '%application%' -- filtering """) // To print schema after transformation, you can see new fields data types transformedData.printSchema() transformedData.show() sc.stop() } }
Result
Right click anywhere on the screen and select Run As Scala Application.
If you have followed the steps properly you will find the result in Console.
Key Notes
First output is complete data-frame with all the fields as string type.
Second output is the schema without any transformation, you will find all the datatypes as string.
Third output is schema after applying datatype conversions.
Fourth output is our transformed data (minor transformations).
You might face error if;
You have missed to import required jar files.
You have missed to configure Scala compiler.
You have missed to import referenced libraries.
You have defined rowRDD with wrong number of fields like (x(0) to x(10)) you will see "ArrayIndexOutOfBoundsException" error.
Spark Read CSV
To demonstrate this I am using Spark-shell but you can always follow similar steps like above to create Scala project in Eclipse IDE.
I have downloaded sample “books” data from Kaggle. I like Kaggle for free data files, you should try as well. Sample books.csv has 10 columns and its approximately 1.5 MB file, yeah I know it’s very small for Apache Spark. But this is just for demonstration purpose so it should be fine.
Columns - bookID, title, authors, average_rating, isbn, isbn13, language_code, num_pages, ratings_count, text_reviews_count
Path - /Volumes/MYLAB/testdata
Files - book.csv
Start Spark-shell
I am using Spark version 2.3.1 and Scala version 2.11.8
// Create books dataframe using SparkSession available as spark
scala> val booksDF = spark.read.csv("/Volumes/MYLAB/testdata/")
booksDF: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 8 more fields]
// Showing top 10 records in dataframe
scala> booksDF.show(10)
// To include header you can set option header => true
scala> spark
.read
.format("csv")
.option("header", "true")
.load("/Volumes/MYLAB/testdata/")
.show()
// Also if you want to store Schema of dataframe you need to set option inferSchema => true
scala> val booksDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/MYLAB/testdata/")
booksDF: org.apache.spark.sql.DataFrame = [bookID: string, title: string ... 8 more fields]
scala> booksDF.printSchema
root
|-- bookID: string (nullable = true)
|-- title: string (nullable = true)
|-- authors: string (nullable = true)
|-- average_rating: string (nullable = true)
|-- isbn: string (nullable = true)
|-- isbn13: string (nullable = true)
|-- language_code: string (nullable = true)
|-- # num_pages: string (nullable = true)
|-- ratings_count: string (nullable = true)
|-- text_reviews_count: string (nullable = true)
// You can save this data in a temp table and run SQL
scala> booksDF.registerTempTable("books")
scala> booksDF.sqlContext.sql("select title from books").show(false)
// You can write any sql you want, for example lets say you want to see books with rating over 4.5
scala> booksDF.sqlContext.sql("select title, average_rating from books where average_rating > 4.5").show(false)
You can see what all options you can apply on a dataframe by pressing tab, for example,
Thank you folks! If you have any questions please mention in comments section below.
Navigation menu
1. Apache Spark and Scala Installation
2. Getting Familiar with Scala IDE
3. Spark data structure basics
4. Spark Shell
5. Reading data files in Spark
6. Writing data files in Spark
7. Spark streaming
ความคิดเห็น