View By

Categories

 

How to write single CSV file using spark?

Updated: 6 days ago


Apache Spark by default writes CSV file output in multiple parts-*.CSV, inside a directory. Reason is simple it creates multiple files because each partition is saved individually. Apache Spark is built for distributed processing and multiple files are expected. However, you can overcome this situation by several methods. In previous posts, we have just read the data files (flat file, json), created rdd, dataframes using spark sql, but we haven't written file back to disk or any storage system. In this Apache Spark tutorial - you will learn how to write files back to disk.


Main menu: Spark Scala Tutorial

For this blog, I am creating Scala Object - textfileWriter in same project - txtReader folder where we created textfileReader.





Source File


I am using the same source file squid.txt file (with duplicate records) which I created in previous blog. However, in practical scenario source could be anything - relational database, hdfs file system, message queue etc. Practically, It will be never the case, i.e. reading and writing same file. This is just for demo purpose.


1286536309.586 921 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml

1286536309.608 829 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml

1286536309.660 785 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml

1286536309.684 808 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml

1286536309.775 195 192.168.0.227 TCP_MISS/200 4120 GET http://i4.ytimg.com/vi/gTHZnIAzmdY/default.jpg - DIRECT/209.85.153.118 image/jpeg

1286536309.795 215 192.168.0.227 TCP_MISS/200 5331 GET http://i2.ytimg.com/vi/-jBxVLD4fzg/default.jpg - DIRECT/209.85.153.118 image/jpeg

1286536309.815 234 192.168.0.227 TCP_MISS/200 5261 GET http://i1.ytimg.com/vi/dCjp28ps4qY/default.jpg - DIRECT/209.85.153.118 image/jpeg



Sample Code

  • Open jsonfileReader.scala and copy-paste the code written below.

  • I have written separate blog to explain what are basic terminologies used in Spark like rdd, SparkContext, SQLContext, various transformations and actions etc. You can go through these for basic understanding.

  1. Spark shell, Spark context and configuration

  2. Spark RDD, Transformations and Actions


However, I have explained little bit in comments above each line of code what it actually does. For list of spark functions you can refer this.


You can make this code much simpler but my aim is to teach as well. Hence I have intentionally introduced header structure, SQL context, string rdd etc. However, if you are familiar with these, you can just focus on writing dataframe part highlighted in blue.



package com.dataneb.spark


// Each library has its significance, I have commented in below code how its being used

import org.apache.spark._

import org.apache.spark.sql._

import org.apache.log4j._

import org.apache.spark.sql.types.{StructType, StructField, StringType}

import org.apache.spark.sql.Row


object textfileWriter {


// Reducing the error level to just "ERROR" messages

// It uses library org.apache.log4j._

// You can apply other logging levels like ALL, DEBUG, ERROR, INFO, FATAL, OFF etc

Logger.getLogger("org").setLevel(Level.ERROR)


// Defining Spark configuration to define application name and the local resources to use

// It uses library org.apache.spark._

val conf = new SparkConf().setAppName("textfileWriter")

conf.setMaster("local")


// Using above configuration to define our SparkContext

val sc = new SparkContext(conf)


// Defining SQL context to run Spark SQL

// It uses library org.apache.spark.sql._

val sqlContext = new SQLContext(sc)


// Main function where all operations will occur

def main (args:Array[String]): Unit = {


// Reading the text file

val squidString = sc.textFile("/Users/Rajput/Documents/testdata/squid.txt")


// Defining the data-frame header structure

val squidHeader = "time duration client_add result_code bytes req_method url user hierarchy_code type"


// Defining schema from header which we defined above

// It uses library org.apache.spark.sql.types.{StructType, StructField, StringType}


val schema = StructType(squidHeader.split(" ").map(fieldName => StructField(fieldName,StringType, true)))


// Converting String RDD to Row RDD for 10 attributes

val rowRDD = squidString.map(_.split(" ")).map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5) , x(6) , x(7) , x(8), x(9)))


// Creating dataframe based on Row RDD and schema

val squidDF = sqlContext.createDataFrame(rowRDD, schema)


// Writing dataframe to a file with overwrite mode, header and single partition.

squidDF

.repartition(1)

.write

.mode ("overwrite")

.format("com.databricks.spark.csv")

.option("header", "true")

.save("targetfile.csv")


sc.stop()

}

}



Run the code!




Output



There are several other methods to write these files.


Method 1


This is what we did above. If expected dataframe size is small you can either use repartition or coalesce to create single file output as /filename.csv/part-00000.


Scala> dataframe

.repartition(1)

.write

.mode ("overwrite")

.format("com.databricks.spark.csv")

.option("header", "true")

.save("filename.csv")

Repartition(1) will shuffle the data to write everything in one particular partition thus writer cost will be high and it might take long time if file size is huge.



Method 2


Coalesce will require lot of memory, if your file size is huge as you will run out of memory.

Scala> dataframe

.coalesce(1)

.write

.mode ("overwrite")

.format("com.databricks.spark.csv")

.option("header", "true")

.save("filename.csv")




Coalesce() vs repartition()


Coalesce and repartition both shuffles the data to increase or decrease the partition, but repartition is more costlier operation as it performs full shuffle. For example,


scala> val distData = sc.parallelize(1 to 16, 4)

distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[128] at parallelize at <console>:24


// current partition size

scala> distData.partitions.size

res63: Int = 4


// checking data across each partition

scala> distData.mapPartitionsWithIndex((index, iter) => if (index == 0) iter else Iterator()).collect

res64: Array[Int] = Array(1, 2, 3, 4)

scala> distData.mapPartitionsWithIndex((index, iter) => if (index == 1) iter else Iterator()).collect

res65: Array[Int] = Array(5, 6, 7, 8)

scala> distData.mapPartitionsWithIndex((index, iter) => if (index == 2) iter else Iterator()).collect

res66: Array[Int] = Array(9, 10, 11, 12)

scala> distData.mapPartitionsWithIndex((index, iter) => if (index == 3) iter else Iterator()).collect

res67: Array[Int] = Array(13, 14, 15, 16)


// decreasing partitions to 2

scala> val coalData = distData.coalesce(2)

coalData: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[133] at coalesce at <console>:25


// see how shuffling occurred. Instead of moving all data it just moved 2 partitions.

scala> coalData.mapPartitionsWithIndex((index, iter) => if (index == 0) iter else Iterator()).collect

res68: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)


scala> coalData.mapPartitionsWithIndex((index, iter) => if (index == 1) iter else Iterator()).collect

res69: Array[Int] = Array(9, 10, 11, 12, 13, 14, 15, 16)


repartition()


Notice how repartition() will re-shuffle everything to create new partitions as compared to previous RDDs - distData and coalData. Hence repartition is more costlier operation as compared to coalesce.


scala> val repartData = distData.repartition(2)

repartData: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[139] at repartition at <console>:25


// checking data across each partition

scala> repartData.mapPartitionsWithIndex((index, iter) => if (index == 0) iter else Iterator()).collect

res70: Array[Int] = Array(1, 3, 6, 8, 9, 11, 13, 15)


scala> repartData.mapPartitionsWithIndex((index, iter) => if (index == 1) iter else Iterator()).collect

res71: Array[Int] = Array(2, 4, 5, 7, 10, 12, 14, 16)



Method 3


Let the file create on various partitions and later merge the files with separate Shell Script. This method will be fast depending upon your hard disk write speed.

#!/bin/bash

echo "ColName1, ColName2, ColName3, ... , ColNameX" > filename.csv

for i in /spark/output/*.CSV ; do

echo "FileNumber $i"

cat $i >> filename.csv

rm $i

done

echo "Done"



Method 4


If you are using Hadoop file system to store output files. You can leverage HDFS to merge files by using getmerge utility.

Input your source directory with all partition files and destination output file, it concatenates all the files in source into destination local file. You can also set -nl to add a newline character at the end of each file. Further, -skip-empty-file can be used to avoid unwanted newline characters in case of empty files.

Syntax : hadoop fs -getmerge [-nl] [-skip-empty-file]

hadoop fs -getmerge -nl /spark/source /spark/filename.csv

hadoop fs -getmerge /spark/source/file1.csv /spark/source/file2.txt filename.csv



Method 5


Use FileUtil.copyMerge() to merge all the files.

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit = {

val hadoopConfig = new Configuration()

val hdfs = FileSystem.get(hadoopConfig)

FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) }

val newData = << Your dataframe >>

val outputfile = "/spark/outputs/subject"

var filename = "sampleFile"

var outputFileName = outputfile + "/temp_" + filename

var mergedFileName = outputfile + "/merged_" + filename

var mergeFindGlob = outputFileName

newData.write

.format("com.databricks.spark.csv")

.option("header", "true")

.mode("overwrite")

.save(outputFileName)

merge(mergeFindGlob, mergedFileName )



If you have any question, please don't forget to write in comments section below. Thank you!



Next: Spark Streaming word count example



Navigation menu

1. Apache Spark and Scala Installation

1.1 Spark installation on Windows​

1.2 Spark installation on Mac

2. Getting Familiar with Scala IDE

2.1 Hello World with Scala IDE​

3. Spark data structure basics

3.1 Spark RDD Transformations and Actions example

4. Spark Shell

4.1 Starting Spark shell with SparkContext example​

5. Reading data files in Spark

5.1 SparkContext Parallelize and read textFile method

5.2 Loading JSON file using Spark Scala

5.3 Loading TEXT file using Spark Scala

5.4 How to convert RDD to dataframe?

6. Writing data files in Spark

​6.1 How to write single CSV file in Spark

7. Spark streaming

7.1 Word count example Scala

7.2 Analyzing Twitter texts

8. Sample Big Data Architecture with Apache Spark

9. What's Artificial Intelligence, Machine Learning, Deep Learning, Predictive Analytics, Data Science?

10. Spark Interview Questions and Answers

24,518 views

Help others, write your first blog today! 

Home   |   Contact Us

©2020 by Data Nebulae