View By

Categories

 

Spark read Text file into Dataframe

Updated: Nov 9, 2019

Main menu: Spark Scala Tutorial

In this Spark Scala tutorial you will learn how to read data from a text file & CSV to dataframe. This blog has two sections:

  1. Spark read Text File

  2. Spark read CSV with schema/header


There are various methods to load a text file in Spark. You can refer Spark documentation.



Spark Read Text File


I am loading a text file which is space (" ") delimited. I have chosen this format because in most of the practical cases you will find delimited text files with fixed number of fields.

Further, I will be adding a header to dataframe and transform it to some extent. I have tried to keep the code as simple as possible so that anyone can understand it. You can change the separator, name/number of fields, data type according to your requirement.


I am using squid logs as sample data for this example. It has date, integer and string fields which will help us to apply data type conversions and play around with Spark SQL. You can find complete squid file structure details at this.

  • No. of fields = 10

  • Separator is a space character



Sample Data


1286536309.586 921 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml

1286536309.608 829 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml

1286536309.660 785 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml

1286536309.684 808 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml

1286536309.775 195 192.168.0.227 TCP_MISS/200 4120 GET http://i4.ytimg.com/vi/gTHZnIAzmdY/default.jpg - DIRECT/209.85.153.118 image/jpeg

1286536309.795 215 192.168.0.227 TCP_MISS/200 5331 GET http://i2.ytimg.com/vi/-jBxVLD4fzg/default.jpg - DIRECT/209.85.153.118 image/jpeg

1286536309.815 234 192.168.0.227 TCP_MISS/200 5261 GET http://i1.ytimg.com/vi/dCjp28ps4qY/default.jpg - DIRECT/209.85.153.118 image/jpeg




Creating Sample Text File


I have created sample text file - squid.txt with above mentioned records (just copy-paste).

  • Filename: squid.txt

  • Path: /Users/Rajput/Documents/testdata




Eclipse IDE Setup (for beginners)


Before writing the Spark program it's necessary to setup Scala project in Eclipse IDE. I assume that you have installed Eclipse, if not please refer my previous blogs for installation steps (Windows | Mac users). These steps will be same for other sections like reading CSV, JSON, JDBC.


1. Create a new Scala project "txtReader"

  • Go to File → New → Project and enter txtReader in project name field and click finish.






2. Create a new Scala Package "com.dataneb.spark"

  • Right click on the txtReader project in the Package Explorer panel → New → Package and enter name com.dataneb.spark and finish.





3. Create a Scala object "textfileReader"

  • Expand the txtReader project tree and right click on the com.dataneb.spark package → New → Scala Object → enter textfileReader in the object name and press finish.






4. Add external jar files (if needed)

  • Right click on txtReader project propertiesJava Build Path Add External Jars

  • Now navigate to the path where you have installed Spark. You will find all the jar files under /spark/jars folder.


  • Now select all the jar files and click open. Apply and Close.


After adding these jar files you will find Referenced Library folder created on left panel of the screen below Scala object.



5. Setup Scala compiler

  • Now right click on txtReader project properties Scala Compiler and check the box Use Project Settings and select Fixed Scala installation: 2.11.11 (built-in) from drop-down options.





Write the code!


[For beginners] Before you write the Spark program,

  • I have written separate blog to explain Spark RDD, various transformations and actions. You can go through this for basic understanding.

  • Refer these blogs for Spark-shell and SparkContext basics if you are new to Spark programming.

However, I have explained little bit in comments above each line of code what it actually does. For list of spark functions you can refer this.



Now, open textfileReader.scala and copy-paste the below code.


// Your package name

package com.dataneb.spark

// Each library has its significance, I have commented when it's used import org.apache.spark._ import org.apache.spark.sql._ import org.apache.log4j._ import org.apache.spark.sql.types.{StructType, StructField, StringType} import org.apache.spark.sql.Row


object textfileReader { // Reducing the error level to just "ERROR" messages // It uses library org.apache.log4j._ // You can apply other logging levels like ALL, DEBUG, ERROR, INFO, FATAL, OFF etc Logger.getLogger("org").setLevel(Level.ERROR) // Defining Spark configuration to set application name and master // It uses library org.apache.spark._ val conf = new SparkConf().setAppName("textfileReader") conf.setMaster("local") // Using above configuration to define our SparkContext val sc = new SparkContext(conf) // Defining SQL context to run Spark SQL // It uses library org.apache.spark.sql._ val sqlContext = new SQLContext(sc) // Main function where all operations will occur def main (args:Array[String]): Unit = { // Reading the text file val squidString = sc.textFile("/Users/Rajput/Documents/testdata/squid.txt") // Defining the data-frame header structure val squidHeader = "time duration client_add result_code bytes req_method url user hierarchy_code type" // Defining schema from header which we defined above // It uses library org.apache.spark.sql.types.{StructType, StructField, StringType} val schema = StructType(squidHeader.split(" ").map(fieldName => StructField(fieldName,StringType, true))) // Converting String RDD to Row RDD for 10 attributes val rowRDD = squidString.map(_.split(" ")).map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5) , x(6) , x(7) , x(8), x(9))) // Creating data-frame based on Row RDD and schema val squidDF = sqlContext.createDataFrame(rowRDD, schema) // Saving as temporary table squidDF.registerTempTable("squid") // Retrieving all the records val allrecords = sqlContext.sql("select * from squid") // Showing top 5 records with false truncation i.e. showing complete row value allrecords.show(5,false)


/* Further you can apply Spark transformations according to your need */ allrecords.write.saveAsTable("allrecords") // Printing schema before transformation allrecords.printSchema() // Something like this for date, integer and string conversion // To have multiline sql use triple quotes val transformedData = sqlContext.sql(""" -- multiline sql select from_unixtime(time) as time, -- you can apply to_date cast(duration as int) as duration, -- casting to integer cast (req_method as string) as req_method from allrecords -- casting to string just to explain where type like '%application%' -- filtering """) // To print schema after transformation, you can see new fields data types transformedData.printSchema() transformedData.show() sc.stop() } }



Result


Right click anywhere on the screen and select Run As Scala Application.


If you have followed the steps properly you will find the result in Console.



Key Notes

  1. First output is complete data-frame with all the fields as string type.

  2. Second output is the schema without any transformation, you will find all the datatypes as string.

  3. Third output is schema after applying datatype conversions.

  4. Fourth output is our transformed data (minor transformations).


You might face error if;

  • You have missed to import required jar files.

  • You have missed to configure Scala compiler.

  • You have missed to import referenced libraries.

  • You have defined rowRDD with wrong number of fields like (x(0) to x(10)) you will see "ArrayIndexOutOfBoundsException" error.



Spark Read CSV


To demonstrate this I am using Spark-shell but you can always follow similar steps like above to create Scala project in Eclipse IDE.


I have downloaded sample “books” data from Kaggle. I like Kaggle for free data files, you should try as well. Sample books.csv has 10 columns and its approximately 1.5 MB file, yeah I know it’s very small for Apache Spark. But this is just for demonstration purpose so it should be fine.


Columns - bookID, title, authors, average_rating, isbn, isbn13, language_code, num_pages, ratings_count, text_reviews_count

Path - /Volumes/MYLAB/testdata

Files - book.csv





Start Spark-shell


I am using Spark version 2.3.1 and Scala version 2.11.8


// Create books dataframe using SparkSession available as spark

scala> val booksDF = spark.read.csv("/Volumes/MYLAB/testdata/")

booksDF: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 8 more fields]


// Showing top 10 records in dataframe

scala> booksDF.show(10)


// To include header you can set option header => true

scala> spark

.read

.format("csv")

.option("header", "true")

.load("/Volumes/MYLAB/testdata/")

.show()


// Also if you want to store Schema of dataframe you need to set option inferSchema => true

scala> val booksDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/MYLAB/testdata/")

booksDF: org.apache.spark.sql.DataFrame = [bookID: string, title: string ... 8 more fields]


scala> booksDF.printSchema

root

|-- bookID: string (nullable = true)

|-- title: string (nullable = true)

|-- authors: string (nullable = true)

|-- average_rating: string (nullable = true)

|-- isbn: string (nullable = true)

|-- isbn13: string (nullable = true)

|-- language_code: string (nullable = true)

|-- # num_pages: string (nullable = true)

|-- ratings_count: string (nullable = true)

|-- text_reviews_count: string (nullable = true)


// You can save this data in a temp table and run SQL

scala> booksDF.registerTempTable("books")


scala> booksDF.sqlContext.sql("select title from books").show(false)


// You can write any sql you want, for example lets say you want to see books with rating over 4.5

scala> booksDF.sqlContext.sql("select title, average_rating from books where average_rating > 4.5").show(false)



You can see what all options you can apply on a dataframe by pressing tab, for example,


Thank you folks! If you have any questions please mention in comments section below.



Next: Loading JSON file using Spark Scala



Navigation menu

1. Apache Spark and Scala Installation

1.1 Spark installation on Windows​

1.2 Spark installation on Mac

2. Getting Familiar with Scala IDE

2.1 Hello World with Scala IDE​

3. Spark data structure basics

3.1 Spark RDD Transformations and Actions example

4. Spark Shell

4.1 Starting Spark shell with SparkContext example​

5. Reading data files in Spark

5.1 SparkContext Parallelize and read textFile method

5.2 Loading JSON file using Spark Scala

5.3 Loading TEXT file using Spark Scala

5.4 How to convert RDD to dataframe?

6. Writing data files in Spark

​6.1 How to write single CSV file in Spark

7. Spark streaming

7.1 Word count example Scala

7.2 Analyzing Twitter texts

8. Sample Big Data Architecture with Apache Spark

9. What's Artificial Intelligence, Machine Learning, Deep Learning, Predictive Analytics, Data Science?

10. Spark Interview Questions and Answers

17,026 views1 comment

Help others, write your first blog today! 

Home   |   Contact Us

©2020 by Data Nebulae