top of page
BlogPageTop

Spark read Text file into Dataframe


In this Spark Scala tutorial you will learn how to read data from a text file & CSV to dataframe. This blog has two sections:

  1. Spark read Text File

  2. Spark read CSV with schema/header


There are various methods to load a text file in Spark. You can refer Spark documentation.


 

Spark Read Text File


I am loading a text file which is space (" ") delimited. I have chosen this format because in most of the practical cases you will find delimited text files with fixed number of fields.

Further, I will be adding a header to dataframe and transform it to some extent. I have tried to keep the code as simple as possible so that anyone can understand it. You can change the separator, name/number of fields, data type according to your requirement.


I am using squid logs as sample data for this example. It has date, integer and string fields which will help us to apply data type conversions and play around with Spark SQL. You can find complete squid file structure details at this.

  • No. of fields = 10

  • Separator is a space character


 

Sample Data


1286536309.586 921 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml

1286536309.608 829 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml

1286536309.660 785 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml

1286536309.684 808 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml

1286536309.775 195 192.168.0.227 TCP_MISS/200 4120 GET http://i4.ytimg.com/vi/gTHZnIAzmdY/default.jpg - DIRECT/209.85.153.118 image/jpeg

1286536309.795 215 192.168.0.227 TCP_MISS/200 5331 GET http://i2.ytimg.com/vi/-jBxVLD4fzg/default.jpg - DIRECT/209.85.153.118 image/jpeg

1286536309.815 234 192.168.0.227 TCP_MISS/200 5261 GET http://i1.ytimg.com/vi/dCjp28ps4qY/default.jpg - DIRECT/209.85.153.118 image/jpeg




Creating Sample Text File


I have created sample text file - squid.txt with above mentioned records (just copy-paste).

  • Filename: squid.txt

  • Path: /Users/Rajput/Documents/testdata



 


Eclipse IDE Setup (for beginners)


Before writing the Spark program it's necessary to setup Scala project in Eclipse IDE. I assume that you have installed Eclipse, if not please refer my previous blogs for installation steps (Windows | Mac users). These steps will be same for other sections like reading CSV, JSON, JDBC.


1. Create a new Scala project "txtReader"

  • Go to File → New → Project and enter txtReader in project name field and click finish.






2. Create a new Scala Package "com.dataneb.spark"

  • Right click on the txtReader project in the Package Explorer panel → New → Package and enter name com.dataneb.spark and finish.


 



3. Create a Scala object "textfileReader"

  • Expand the txtReader project tree and right click on the com.dataneb.spark package → New → Scala Object → enter textfileReader in the object name and press finish.


 




4. Add external jar files (if needed)

  • Right click on txtReader project → properties → Java Build Path → Add External Jars

  • Now navigate to the path where you have installed Spark. You will find all the jar files under /spark/jars folder.


 
  • Now select all the jar files and click open. Apply and Close.


After adding these jar files you will find Referenced Library folder created on left panel of the screen below Scala object.



5. Setup Scala compiler

  • Now right click on txtReader project → properties → Scala Compiler and check the box Use Project Settings and select Fixed Scala installation: 2.11.11 (built-in) from drop-down options.





Write the code!


[For beginners] Before you write the Spark program,

  • I have written separate blog to explain Spark RDD, various transformations and actions. You can go through this for basic understanding.

  • Refer these blogs for Spark-shell and SparkContext basics if you are new to Spark programming.

However, I have explained little bit in comments above each line of code what it actually does. For list of spark functions you can refer this.



Now, open textfileReader.scala and copy-paste the below code.


// Your package name

package com.dataneb.spark

// Each library has its significance, I have commented when it's used import org.apache.spark._ import org.apache.spark.sql._ import org.apache.log4j._ import org.apache.spark.sql.types.{StructType, StructField, StringType} import org.apache.spark.sql.Row


object textfileReader { // Reducing the error level to just "ERROR" messages // It uses library org.apache.log4j._ // You can apply other logging levels like ALL, DEBUG, ERROR, INFO, FATAL, OFF etc Logger.getLogger("org").setLevel(Level.ERROR) // Defining Spark configuration to set application name and master // It uses library org.apache.spark._ val conf = new SparkConf().setAppName("textfileReader") conf.setMaster("local") // Using above configuration to define our SparkContext val sc = new SparkContext(conf) // Defining SQL context to run Spark SQL // It uses library org.apache.spark.sql._ val sqlContext = new SQLContext(sc) // Main function where all operations will occur def main (args:Array[String]): Unit = { // Reading the text file val squidString = sc.textFile("/Users/Rajput/Documents/testdata/squid.txt") // Defining the data-frame header structure val squidHeader = "time duration client_add result_code bytes req_method url user hierarchy_code type" // Defining schema from header which we defined above // It uses library org.apache.spark.sql.types.{StructType, StructField, StringType} val schema = StructType(squidHeader.split(" ").map(fieldName => StructField(fieldName,StringType, true))) // Converting String RDD to Row RDD for 10 attributes val rowRDD = squidString.map(_.split(" ")).map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5) , x(6) , x(7) , x(8), x(9))) // Creating data-frame based on Row RDD and schema val squidDF = sqlContext.createDataFrame(rowRDD, schema) // Saving as temporary table squidDF.registerTempTable("squid") // Retrieving all the records val allrecords = sqlContext.sql("select * from squid") // Showing top 5 records with false truncation i.e. showing complete row value allrecords.show(5,false)


/* Further you can apply Spark transformations according to your need */ allrecords.write.saveAsTable("allrecords") // Printing schema before transformation allrecords.printSchema() // Something like this for date, integer and string conversion // To have multiline sql use triple quotes val transformedData = sqlContext.sql(""" -- multiline sql select from_unixtime(time) as time, -- you can apply to_date cast(duration as int) as duration, -- casting to integer cast (req_method as string) as req_method from allrecords -- casting to string just to explain where type like '%application%' -- filtering """) // To print schema after transformation, you can see new fields data types transformedData.printSchema() transformedData.show() sc.stop() } }



Result


Right click anywhere on the screen and select Run As Scala Application.


If you have followed the steps properly you will find the result in Console.



Key Notes

  1. First output is complete data-frame with all the fields as string type.

  2. Second output is the schema without any transformation, you will find all the datatypes as string.

  3. Third output is schema after applying datatype conversions.

  4. Fourth output is our transformed data (minor transformations).


You might face error if;

  • You have missed to import required jar files.

  • You have missed to configure Scala compiler.

  • You have missed to import referenced libraries.

  • You have defined rowRDD with wrong number of fields like (x(0) to x(10)) you will see "ArrayIndexOutOfBoundsException" error.


 

Spark Read CSV


To demonstrate this I am using Spark-shell but you can always follow similar steps like above to create Scala project in Eclipse IDE.


I have downloaded sample “books” data from Kaggle. I like Kaggle for free data files, you should try as well. Sample books.csv has 10 columns and its approximately 1.5 MB file, yeah I know it’s very small for Apache Spark. But this is just for demonstration purpose so it should be fine.


Columns - bookID, title, authors, average_rating, isbn, isbn13, language_code, num_pages, ratings_count, text_reviews_count

Path - /Volumes/MYLAB/testdata

Files - book.csv





Start Spark-shell


I am using Spark version 2.3.1 and Scala version 2.11.8


// Create books dataframe using SparkSession available as spark

scala> val booksDF = spark.read.csv("/Volumes/MYLAB/testdata/")

booksDF: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 8 more fields]


// Showing top 10 records in dataframe

scala> booksDF.show(10)


// To include header you can set option header => true

scala> spark

.read

.format("csv")

.option("header", "true")

.load("/Volumes/MYLAB/testdata/")

.show()


// Also if you want to store Schema of dataframe you need to set option inferSchema => true

scala> val booksDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/MYLAB/testdata/")

booksDF: org.apache.spark.sql.DataFrame = [bookID: string, title: string ... 8 more fields]


scala> booksDF.printSchema

root

|-- bookID: string (nullable = true)

|-- title: string (nullable = true)

|-- authors: string (nullable = true)

|-- average_rating: string (nullable = true)

|-- isbn: string (nullable = true)

|-- isbn13: string (nullable = true)

|-- language_code: string (nullable = true)

|-- # num_pages: string (nullable = true)

|-- ratings_count: string (nullable = true)

|-- text_reviews_count: string (nullable = true)


// You can save this data in a temp table and run SQL

scala> booksDF.registerTempTable("books")


scala> booksDF.sqlContext.sql("select title from books").show(false)


// You can write any sql you want, for example lets say you want to see books with rating over 4.5

scala> booksDF.sqlContext.sql("select title, average_rating from books where average_rating > 4.5").show(false)



You can see what all options you can apply on a dataframe by pressing tab, for example,


Thank you folks! If you have any questions please mention in comments section below.





Navigation menu

1. Apache Spark and Scala Installation

2. Getting Familiar with Scala IDE

3. Spark data structure basics

4. Spark Shell

5. Reading data files in Spark

6. Writing data files in Spark

7. Spark streaming

ความคิดเห็น


Want to share your thoughts about this blog?

Disclaimer: Please note that the information provided on this website is for general informational purposes only and should not be taken as legal advice. Dataneb is a platform for individuals to share their personal experiences with visa and immigration processes, and their views and opinions may not necessarily reflect those of the website owners or administrators. While we strive to keep the information up-to-date and accurate, we make no representations or warranties of any kind, express or implied, about the completeness, accuracy, reliability, suitability, or availability with respect to the website or the information, products, services, or related graphics contained on the website for any purpose. Any reliance you place on such information is therefore strictly at your own risk. We strongly advise that you consult with a qualified immigration attorney or official government agencies for any specific questions or concerns related to your individual situation. We are not responsible for any losses, damages, or legal disputes arising from the use of information provided on this website. By using this website, you acknowledge and agree to the above disclaimer and Google's Terms of Use (https://policies.google.com/terms) and Privacy Policy (https://policies.google.com/privacy).

bottom of page