top of page

Results found for ""

  • Spark read Text file into Dataframe

    Main menu: Spark Scala Tutorial In this Spark Scala tutorial you will learn how to read data from a text file & CSV to dataframe. This blog has two sections: Spark read Text File Spark read CSV with schema/header There are various methods to load a text file in Spark. You can refer Spark documentation. Spark Read Text File I am loading a text file which is space (" ") delimited. I have chosen this format because in most of the practical cases you will find delimited text files with fixed number of fields. Further, I will be adding a header to dataframe and transform it to some extent. I have tried to keep the code as simple as possible so that anyone can understand it. You can change the separator, name/number of fields, data type according to your requirement. I am using squid logs as sample data for this example. It has date, integer and string fields which will help us to apply data type conversions and play around with Spark SQL. You can find complete squid file structure details at this. No. of fields = 10 Separator is a space character Sample Data 1286536309.586 921 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml 1286536309.608 829 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml 1286536309.660 785 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml 1286536309.684 808 192.168.0.68 TCP_MISS/200 507 POST http://rcv-srv37.inplay.tubemogul.co...eiver/services - DIRECT/174.129.41.128 application/xml 1286536309.775 195 192.168.0.227 TCP_MISS/200 4120 GET http://i4.ytimg.com/vi/gTHZnIAzmdY/default.jpg - DIRECT/209.85.153.118 image/jpeg 1286536309.795 215 192.168.0.227 TCP_MISS/200 5331 GET http://i2.ytimg.com/vi/-jBxVLD4fzg/default.jpg - DIRECT/209.85.153.118 image/jpeg 1286536309.815 234 192.168.0.227 TCP_MISS/200 5261 GET http://i1.ytimg.com/vi/dCjp28ps4qY/default.jpg - DIRECT/209.85.153.118 image/jpeg Creating Sample Text File I have created sample text file - squid.txt with above mentioned records (just copy-paste). Filename: squid.txt Path: /Users/Rajput/Documents/testdata Eclipse IDE Setup (for beginners) Before writing the Spark program it's necessary to setup Scala project in Eclipse IDE. I assume that you have installed Eclipse, if not please refer my previous blogs for installation steps (Windows | Mac users). These steps will be same for other sections like reading CSV, JSON, JDBC. 1. Create a new Scala project "txtReader" Go to File → New → Project and enter txtReader in project name field and click finish. 2. Create a new Scala Package "com.dataneb.spark" Right click on the txtReader project in the Package Explorer panel → New → Package and enter name com.dataneb.spark and finish. 3. Create a Scala object "textfileReader" Expand the txtReader project tree and right click on the com.dataneb.spark package → New → Scala Object → enter textfileReader in the object name and press finish. 4. Add external jar files (if needed) Right click on txtReader project → properties → Java Build Path → Add External Jars Now navigate to the path where you have installed Spark. You will find all the jar files under /spark/jars folder. Now select all the jar files and click open. Apply and Close. After adding these jar files you will find Referenced Library folder created on left panel of the screen below Scala object. 5. Setup Scala compiler Now right click on txtReader project → properties → Scala Compiler and check the box Use Project Settings and select Fixed Scala installation: 2.11.11 (built-in) from drop-down options. Write the code! [For beginners] Before you write the Spark program, I have written separate blog to explain Spark RDD, various transformations and actions. You can go through this for basic understanding. Refer these blogs for Spark-shell and SparkContext basics if you are new to Spark programming. However, I have explained little bit in comments above each line of code what it actually does. For list of spark functions you can refer this. Now, open textfileReader.scala and copy-paste the below code. // Your package name package com.dataneb.spark // Each library has its significance, I have commented when it's used import org.apache.spark._ import org.apache.spark.sql._ import org.apache.log4j._ import org.apache.spark.sql.types.{StructType, StructField, StringType} import org.apache.spark.sql.Row object textfileReader { // Reducing the error level to just "ERROR" messages // It uses library org.apache.log4j._ // You can apply other logging levels like ALL, DEBUG, ERROR, INFO, FATAL, OFF etc Logger.getLogger("org").setLevel(Level.ERROR) // Defining Spark configuration to set application name and master // It uses library org.apache.spark._ val conf = new SparkConf().setAppName("textfileReader") conf.setMaster("local") // Using above configuration to define our SparkContext val sc = new SparkContext(conf) // Defining SQL context to run Spark SQL // It uses library org.apache.spark.sql._ val sqlContext = new SQLContext(sc) // Main function where all operations will occur def main (args:Array[String]): Unit = { // Reading the text file val squidString = sc.textFile("/Users/Rajput/Documents/testdata/squid.txt") // Defining the data-frame header structure val squidHeader = "time duration client_add result_code bytes req_method url user hierarchy_code type" // Defining schema from header which we defined above // It uses library org.apache.spark.sql.types.{StructType, StructField, StringType} val schema = StructType(squidHeader.split(" ").map(fieldName => StructField(fieldName,StringType, true))) // Converting String RDD to Row RDD for 10 attributes val rowRDD = squidString.map(_.split(" ")).map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5) , x(6) , x(7) , x(8), x(9))) // Creating data-frame based on Row RDD and schema val squidDF = sqlContext.createDataFrame(rowRDD, schema) // Saving as temporary table squidDF.registerTempTable("squid") // Retrieving all the records val allrecords = sqlContext.sql("select * from squid") // Showing top 5 records with false truncation i.e. showing complete row value allrecords.show(5,false) /* Further you can apply Spark transformations according to your need */ allrecords.write.saveAsTable("allrecords") // Printing schema before transformation allrecords.printSchema() // Something like this for date, integer and string conversion // To have multiline sql use triple quotes val transformedData = sqlContext.sql(""" -- multiline sql select from_unixtime(time) as time, -- you can apply to_date cast(duration as int) as duration, -- casting to integer cast (req_method as string) as req_method from allrecords -- casting to string just to explain where type like '%application%' -- filtering """) // To print schema after transformation, you can see new fields data types transformedData.printSchema() transformedData.show() sc.stop() } } Result Right click anywhere on the screen and select Run As Scala Application. If you have followed the steps properly you will find the result in Console. Key Notes First output is complete data-frame with all the fields as string type. Second output is the schema without any transformation, you will find all the datatypes as string. Third output is schema after applying datatype conversions. Fourth output is our transformed data (minor transformations). You might face error if; You have missed to import required jar files. You have missed to configure Scala compiler. You have missed to import referenced libraries. You have defined rowRDD with wrong number of fields like (x(0) to x(10)) you will see "ArrayIndexOutOfBoundsException" error. Spark Read CSV To demonstrate this I am using Spark-shell but you can always follow similar steps like above to create Scala project in Eclipse IDE. I have downloaded sample “books” data from Kaggle. I like Kaggle for free data files, you should try as well. Sample books.csv has 10 columns and its approximately 1.5 MB file, yeah I know it’s very small for Apache Spark. But this is just for demonstration purpose so it should be fine. Columns - bookID, title, authors, average_rating, isbn, isbn13, language_code, num_pages, ratings_count, text_reviews_count Path - /Volumes/MYLAB/testdata Files - book.csv Start Spark-shell I am using Spark version 2.3.1 and Scala version 2.11.8 // Create books dataframe using SparkSession available as spark scala> val booksDF = spark.read.csv("/Volumes/MYLAB/testdata/") booksDF: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 8 more fields] // Showing top 10 records in dataframe scala> booksDF.show(10) // To include header you can set option header => true scala> spark .read .format("csv") .option("header", "true") .load("/Volumes/MYLAB/testdata/") .show() // Also if you want to store Schema of dataframe you need to set option inferSchema => true scala> val booksDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/MYLAB/testdata/") booksDF: org.apache.spark.sql.DataFrame = [bookID: string, title: string ... 8 more fields] scala> booksDF.printSchema root |-- bookID: string (nullable = true) |-- title: string (nullable = true) |-- authors: string (nullable = true) |-- average_rating: string (nullable = true) |-- isbn: string (nullable = true) |-- isbn13: string (nullable = true) |-- language_code: string (nullable = true) |-- # num_pages: string (nullable = true) |-- ratings_count: string (nullable = true) |-- text_reviews_count: string (nullable = true) // You can save this data in a temp table and run SQL scala> booksDF.registerTempTable("books") scala> booksDF.sqlContext.sql("select title from books").show(false) // You can write any sql you want, for example lets say you want to see books with rating over 4.5 scala> booksDF.sqlContext.sql("select title, average_rating from books where average_rating > 4.5").show(false) You can see what all options you can apply on a dataframe by pressing tab, for example, Thank you folks! If you have any questions please mention in comments section below. Next: Loading JSON file using Spark Scala Navigation menu ​ 1. Apache Spark and Scala Installation 1.1 Spark installation on Windows​ 1.2 Spark installation on Mac 2. Getting Familiar with Scala IDE 2.1 Hello World with Scala IDE​ 3. Spark data structure basics 3.1 Spark RDD Transformations and Actions example 4. Spark Shell 4.1 Starting Spark shell with SparkContext example​ 5. Reading data files in Spark 5.1 SparkContext Parallelize and read textFile method 5.2 Loading JSON file using Spark Scala 5.3 Loading TEXT file using Spark Scala 5.4 How to convert RDD to dataframe? 6. Writing data files in Spark ​6.1 How to write single CSV file in Spark 7. Spark streaming 7.1 Word count example Scala 7.2 Analyzing Twitter texts 8. Sample Big Data Architecture with Apache Spark 9. What's Artificial Intelligence, Machine Learning, Deep Learning, Predictive Analytics, Data Science? 10. Spark Interview Questions and Answers

  • Analyzing Twitter Data - Twitter sentiment analysis using Spark streaming

    Analyzing Twitter Data - Twitter sentiment analysis using Spark streaming. Twitter Spark Streaming - We will be analyzing Twitter data and we will be doing Twitter sentiment analysis using Spark streaming. You can do this in any programming language Python, Scala, Java or R. Main menu: Spark Scala Tutorial Spark streaming is very useful in analyzing real time data from IoT technologies which could be your smart watch, Google Home, Amazon Alexa, Fitbit, GPS, home security system, smart cameras or any other device which communicates with internet. Social accounts like Facebook, Twitter, Instagram etc generate enormous amount of data every minute. Below trend shows interest over time for three of these smart technologies over past 5 years. In this example we are going to stream Twitter API tweets in real time with OAuth authentication and filter the hashtags which are most famous among them. Prerequisite Download and install Apache Spark and Scala IDE (Windows | Mac) Create Twitter sample application and obtain your client secret, client secret key, access token and access token secret. Refer this to know how to get Twitter development account and api access keys. Authentication file setup Create a text file twitter.txt with Twitter OAuth details and place it anywhere on your local directory system (remember the path). File content should look like this. Basically it has two fields separated with single space - first field contains OAuth headers name and second column contains api keys. Make sure there is no extra space anywhere in your file else you will get authentication errors. There is a new line character at the end (i.e. hit enter after 4th line). You can see empty 5th line in below screenshot. Write the code! Now create a Scala project in Eclipse IDE (see how to create Scala project), refer the following code that prints out live tweets as they stream using Spark Streaming. I have written separate blog to explain what are basic terminologies used in Spark like RDD, SparkContext, SQLContext, various transformations and actions etc. You can go through this for basic understanding. However, I have explained little bit in comments above each line of code what it actually does. For list of spark functions you can refer this. // Our package package com.dataneb.spark // Twitter libraries used to run spark streaming import twitter4j._ import twitter4j.auth.Authorization import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming.StreamingContext._ import scala.io.Source import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.Receiver /** Listens to a stream of Tweets and keeps track of the most popular * hashtags over a 5 minute window. */ object PopularHashtags { /** Makes sure only ERROR messages get logged to avoid log spam. */ def setupLogging() = { import org.apache.log4j.{Level, Logger} val rootLogger = Logger.getRootLogger() rootLogger.setLevel(Level.ERROR) } /** Configures Twitter service credentials using twiter.txt in the main workspace directory. Use the path where you saved the authentication file */ def setupTwitter() = { import scala.io.Source for (line <- Source.fromFile("/Volumes/twitter.txt").getLines) { val fields = line.split(" ") if (fields.length == 2) { System.setProperty("twitter4j.oauth." + fields(0), fields(1)) } } } // Main function where the action happens def main(args: Array[String]) { // Configure Twitter credentials using twitter.txt setupTwitter() // Set up a Spark streaming context named "PopularHashtags" that runs locally using // all CPU cores and one-second batches of data val ssc = new StreamingContext("local[*]", "PopularHashtags", Seconds(1)) // Get rid of log spam (should be called after the context is set up) setupLogging() // Create a DStream from Twitter using our streaming context val tweets = TwitterUtils.createStream(ssc, None) // Now extract the text of each status update into DStreams using map() val statuses = tweets.map(status => status.getText()) // Blow out each word into a new DStream val tweetwords = statuses.flatMap(tweetText => tweetText.split(" ")) // Now eliminate anything that's not a hashtag val hashtags = tweetwords.filter(word => word.startsWith("#")) // Map each hashtag to a key/value pair of (hashtag, 1) so we can count them by adding up the values val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1)) // Now count them up over a 5 minute window sliding every one second val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( (x,y) => x + y, (x,y) => x - y, Seconds(300), Seconds(1)) // Sort the results by the count values val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy(x => x._2, false)) // Print the top 10 sortedResults.print // Set a checkpoint directory, and kick it all off // I can watch this all day! ssc.checkpoint("/Volumes/Macintosh HD/Users/Rajput/Documents/checkpoint/") ssc.start() ssc.awaitTermination() } } Run it! See the result! I actually went to my Twitter account to see top result tweet #MTVHottest and it was trending, see the snapshot below. You might face error if You are not using proper version of Scala and Spark Twitter libraries. I have listed them below for your reference. You can download these libraries from these two links Link1 & Link2. Scala version 2.11.11 Spark version 2.3.2 twitter4j-core-4.0.6.jar twitter4j-stream-4.0.6.jar spark-streaming-twitter_2.11-2.1.1.jar Thank you folks. I hope you are enjoying these blogs, if you have any doubt please mention in comments section below. #AnalyzingTwitterData #SparkStreamingExample #Scala #TwitterSparkStreaming #SparkStreaming Next: Sample Big Data Architecture with Apache Spark Navigation menu ​ 1. Apache Spark and Scala Installation 1.1 Spark installation on Windows​ 1.2 Spark installation on Mac 2. Getting Familiar with Scala IDE 2.1 Hello World with Scala IDE​ 3. Spark data structure basics 3.1 Spark RDD Transformations and Actions example 4. Spark Shell 4.1 Starting Spark shell with SparkContext example​ 5. Reading data files in Spark 5.1 SparkContext Parallelize and read textFile method 5.2 Loading JSON file using Spark Scala 5.3 Loading TEXT file using Spark Scala 5.4 How to convert RDD to dataframe? 6. Writing data files in Spark ​6.1 How to write single CSV file in Spark 7. Spark streaming 7.1 Word count example Scala 7.2 Analyzing Twitter texts 8. Sample Big Data Architecture with Apache Spark 9. What's Artificial Intelligence, Machine Learning, Deep Learning, Predictive Analytics, Data Science? 10. Spark Interview Questions and Answers

  • StreamingContext: Spark streaming word count example Scala

    Main menu: Spark Scala Tutorial In this tutorial you will learn, How to stream data in real time using Spark streaming? Spark streaming is basically used for near real-time data processing. Why I said "near" real-time? Because data processing takes some time, few milliseconds. This lag can be reduced but obviously it can't be reduced to zero. This lag is so minute that we end up calling it real-time processing. Streaming word seems very cool but honestly speaking most of you have already implemented this in the form of "batch mode". Only difference is the time window. Everyone is aware of batch mode where you pull the data on hourly, daily, weekly or monthly basis and process it to fulfill your business requirements. What if you start pulling data every second and simultaneously you made your code so efficient that it can process the data in milliseconds. It would be automatically near real time processing, right? Spark streaming basically provides you ability to define sliding time windows where you can define the batch interval. After that, Spark automatically breaks the data into smaller batches for real-time processing. It basically uses RDD's (resilient distributed datasets) to perform the computation on unstructured dataset. RDD's are nothing but references to the actual data which are distributed across multiple nodes with some replication factor which reveal their values only when you perform an action (like collect) on top of it, called lazy evaluation. If you haven't installed Apache Spark, please refer this (Windows | Mac users). Screen 1 Open Scala IDE, create package com.dataneb.spark and define Scala object SparkStreaming. Check this link how you can create packages and objects using Scala IDE. Copy paste the below code on your Scala IDE and let the program run. package com.dataneb.spark /** Libraries needed for streaming the data. */ import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ object SparkStreaming { /** Reducing the logging level to print just ERROR. */ def setupLogging() = { import org.apache.log4j.{Level, Logger} val rootLogger = Logger.getRootLogger() rootLogger.setLevel(Level.ERROR) } def main(args: Array[String]) { /** Defining Spark configuration to utilize all the resources and * setting application name as TerminalWordCount*/ val conf = new SparkConf().setMaster("local[*]").setAppName("TerminalWordCount") /** Calling logging function */ setupLogging() /** Defining spark streaming context with above configuration and batch interval as 1*/ val ssc = new StreamingContext(conf, Seconds(1)) /** Terminal 9999 where we will entering real time messages */ val lines = ssc.socketTextStream("localhost", 9999) /** Flat map to split the words with spaces and reduce by key pair to perform count */ val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate } } Screen 2 Now open your local terminal and type "nc -lk 9999" , hit enter. What we did just now? We just made a network connection (nc or NetCat) to local port 9999. Now type some random inputs for Spark processing as shown below, Screen 1 Now go back to Scala IDE to see the processed records, you need to swap the screens quickly to see the results as Spark will process these lines within seconds. Just kidding, you can simply pin the console and scroll back to see the results. You can see the output below. That's it. It's so simple. In real life scenario you can stream the Kafka producer to local terminal from where Spark can pick up for processing. Or you can also configure Spark to communicate with your application directly. Thank you!! If you have any question write in comments section below. #spark #scala #example #StreamingContext #streaming #word #count Next: Analyzing Twitter text with Spark Streaming Navigation menu ​ 1. Apache Spark and Scala Installation 1.1 Spark installation on Windows​ 1.2 Spark installation on Mac 2. Getting Familiar with Scala IDE 2.1 Hello World with Scala IDE​ 3. Spark data structure basics 3.1 Spark RDD Transformations and Actions example 4. Spark Shell 4.1 Starting Spark shell with SparkContext example​ 5. Reading data files in Spark 5.1 SparkContext Parallelize and read textFile method 5.2 Loading JSON file using Spark Scala 5.3 Loading TEXT file using Spark Scala 5.4 How to convert RDD to dataframe? 6. Writing data files in Spark ​6.1 How to write single CSV file in Spark 7. Spark streaming 7.1 Word count example Scala 7.2 Analyzing Twitter texts 8. Sample Big Data Architecture with Apache Spark 9. What's Artificial Intelligence, Machine Learning, Deep Learning, Predictive Analytics, Data Science? 10. Spark Interview Questions and Answers

  • Kibana GeoIP example: How to index geographical location of IP addresses into Elasticsearch

    The relation between your IP address and geolocation is very simple. There are numerous websites available as of today like Maxmind, IP2Location, IPstack , Software77 etc where you can track the geolocation of an IP address. What's the benefit? It's very simple, it gives you another dimension to analyze your data. Let's say my data predicts that most of the users traffic is coming from 96.67.149.166. It doesn't make complete sense until I say most of the traffic is coming from New Jersey. When I say geolocation it includes multiple attributes like city, state, country, continent, region, currency, country flag, country language, latitude, longitude etc. Most of the websites which provide geolocation are paid sites. But there are few like IPstack which provides you free access token to make calls to their rest API's. Still there are limitations like how many rest API calls you can make per day and also how many types of attributes you can pull. Suppose I want to showcase specific city in the report and API provides limited access to country and continent only, then obviously that data is useless for me. Now the best part is Elastic stack provides you free plugin called "GeoIP" which grants you access to lookup millions of IP addresses. You would be thinking from where it gets the location details? The answer is Maxmind which I referred earlier. GeoIP plugin internally does a lookup from stored copy of Maxmind database which keeps on updating and creates number of extra fields with geo coordinates (longitude & latitude). These geo coordinates can be used to plot maps in Kibana. ELK Stack Installation I am installing ELK stack on Mac OS, for installation on Linux machine refer this. ELK installation is very easy on Mac with Homebrew. It's hardly few minutes task if done properly. 1. Homebrew Installation Run this command on your terminal. If you have already installed Homebrew move to the next step, or if this command doesn't work - copy it from here. $ /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" 2. Java Installation Check if java is installed on your machine. $ java -version java version "9.0.1" If java is not installed, run following steps to install java. $ brew tap caskroom/cask $ brew cask install java $ brew cask info java 3. Elasticsearch Installation $ brew tap elastic/tap $ brew install elastic/tap/elasticsearch-full $ elasticsearch If you see all INFO without any error, that means installation went fine. Let this run, don't kill the process. Now, simply open localhost:9200 in your local browser. You will see elasticsearch version. [TIP] You might face permission issue if you are not logged in with root user. To enable root user on Mac you can follow this. It's due to security reasons that root user is disabled by default on Mac. However another solution is to change folder permission itself. Run these commands if you want to change folder permissions, $ sudo chown -R $(whoami) /usr/local/include /usr/local/lib/pkgconfig $ chmod u+w /usr/local/include /usr/local/lib/pkgconfig Install xcode if it's missing, $ xcode-select --install 4. Kibana Installation $ brew install elastic/tap/kibana-full $ kibana Let this process run, don't kill. Now, open localhost:5601 in your local browser to check if kibana is running properly, 5. Logstash Installation $ brew install elastic/tap/logstash-full Configuring Logstash for GeoIP Let's begin with few sample IP addresses as listed below. I generated this sample data from browserling.com so please ignore if there is some known ip address in this list. Honestly speaking even I don't know where these IP addresses will point to when we generate the maps. Sample Data 1. Copy paste these records into a flat file with "ipaddress" header (sampleip.csv). ipaddress 0.42.56.104 82.67.74.30 55.159.212.43 108.218.89.226 189.65.42.171 62.218.183.66 210.116.94.157 80.243.180.223 169.44.232.173 232.117.72.103 242.14.158.127 14.209.62.41 4.110.11.42 135.235.149.26 93.60.177.34 145.121.235.122 170.68.154.171 206.234.141.195 179.22.18.176 178.35.233.119 145.156.239.238 192.114.2.154 212.36.131.210 252.185.209.0 238.49.69.205 2. Make sure your Elasticsearch and Kibana services are up and running. If not, please refer my previous blog - how to restart them. 3. [Update 9/Aug/2019: Not mandatory step now] Install GeoIP plugin for Elasticsearch. Run the below command in your Elasticsearch home directory. Once GeoIP plugin is installed successfully, you will be able to find plugin details under elasticsearch home plugin directory "/elasticsearch/plugins". You need to run installation command on each node if you are working in a clustered environment and then restart the services. /elasticsearch/bin/elasticsearch-plugin install ingest-geoip New version of elastics has built in GeoIP module, so you don't need to install it separately. Configure Logstash Configure logstash config file to create "logstash-iplocation" index. Please note your index name should start with logstash-name otherwise your attributes will not be mapped properly as geo_points datatype. This is because the default index name in logstash template is declared as logstash-* , you can change it if you want but as of now lets move ahead with logstash-iplocation. Below is the sample input, filter and output configuration. input { file { path => "/Volumes/MYLAB/testdata/sampleip.csv" start_position => "beginning" sincedb_path => "/Volumes/MYLAB/testdata/logstash.txt" } } filter { csv { columns => "ipaddress" } geoip { source => "message" } } output { elasticsearch { hosts => "localhost" index => "logstash-iplocation" } stdout{ codec => rubydebug } } My configuration file looks something like this: Important Notes Your index name should be in lower caps, starting with logstash- for example logstash-abcd Also, sincedb path is created once per file input, so if you want to reload the same file make sure you delete the sincedb file entry. It looks like this, You invoke geoip plugin from filter configuration, it has no relation with input/output. Run Logstash Load the data into elasticsearch by running below command (it's a single line command). Now wait, it will take few seconds to load. Change your home location accordingly, for me its homebrew linked as shown below. /usr/local/var/homebrew/linked/logstash-full/bin/logstash -f /usr/local/var/homebrew/linked/logstash-full/libexec/config/logstash_ip.config Sample output Important Notes See if filters geoip is invoked when you load the data into elasticsearch. Also, the datatype of location should be geo_point, otherwise there is some issue with your configuration. Latitude and longitude datatype should be float. These datatypes are like confirmation that logstash loaded this data as expected. Kibana Dashboard Creation 1. Once data is loaded into Elasticsearch, open Kibana UI and go to Management tab => Kibana Index pattern. 2. Create Kibana index with "logstash-iplocation" pattern and hit Next. 3. Select timestamp if you want to show it with your index and hit create index pattern. 4. Now go to Discover tab and select "logstash-iplocation" to see the data which we just loaded. You can expand the fields and see geoip.location has datatype as geo_point. You can verify this by "globe" sign which you will find just before geoip.location field. If it's not there then you have done some mistake and datatype mapping is incorrect. 5. Now go to Visualize tab and select coordinate map from the types of visualization and index name as "logstash-iplocation". 6. Apply the filters (Buckets: Geo coordinates, Aggregation: Geohash & Field: geoip.location) as shown below and hit the "Play" button. That's it !! You have located all the ip addresses. Thank you!! If you have any question please comment. Next: Loading data into Elasticsearch using Apache Spark Navigation Menu: Introduction to ELK Stack Installation Loading data into Elasticsearch with Logstash Create Kibana Dashboard Example Kibana GeoIP Dashboard Example Loading data into Elasticsearch using Apache Spark

  • Loading JSON file using Spark (Scala)

    Main menu: Spark Scala Tutorial In this Apache Spark Tutorial - We will be loading a simple JSON file. Now-a-days most of the time you will find files in either JSON format, XML or a flat file. JSON file format is very easy to understand and you will love it once you understand JSON file structure. JSON File Structure Before we ingest JSON file using spark, it's important to understand JSON data structure. Basically, JSON (JavaScript Object Notation) is a lightweight data-interchange format. It is easy for humans to read and write. It is easy for machines to parse and generate. JSON is built on two structures: A collection of name/value pairs, usually referred as an object and its value pair. An ordered list of values. You can think it like an array, list of values. An object is an unordered set of name/value pairs. An object begins with { (left brace) and ends with } (right brace). Each name is followed by : (colon) and the name/value pairs are separated by , (comma). An array is an ordered collection of values. An array begins with [ (left bracket) and ends with ] (right bracket). Values are separated by , (comma). A value can be a string in double quotes, or a number, or true or false or null, or an object or an array. These structures can be nested. One more fact, JSON files could exist in two formats. However, most of the time you will encounter multiline JSON files. Multiline JSON where each line could have multiple records. Single line JSON where each line depicts one record. Multiline JSON would look something like this: [ { "color": "red", "value": "#f00" }, { "color": "green", "value": "#0f0" }, { "color": "blue", "value": "#00f" }, { "color": "cyan", "value": "#0ff" }, { "color": "magenta", "value": "#f0f" }, { "color": "yellow", "value": "#ff0" }, { "color": "black", "value": "#000" } ] Single line JSON would look something like this (try to correlate object, array and value structure format which I explained earlier): { "color": "red", "value": "#f00" } { "color": "green", "value": "#0f0" } { "color": "blue", "value": "#00f" } { "color": "cyan", "value": "#0ff" } { "color": "magenta", "value": "#f0f" } { "color": "yellow", "value": "#ff0" } { "color": "black", "value": "#000" } Creating Sample JSON file I have created two different sample files - multiline and single line JSON file with above mentioned records (just copy-paste). singlelinecolors.json multilinecolors.json Sample files look like: Note: I assume that you have installed Scala IDE if not please refer my previous blogs for installation steps (Windows & Mac users). 1. Create a new Scala project "jsnReader" Go to File → New → Project and enter jsnReader in project name field and click finish. 2. Create a new Scala Package "com.dataneb.spark" Right click on the jsnReader project in the Package Explorer panel → New → Package and enter name com.dataneb.spark and finish. 3. Create a Scala object "jsonfileReader" Expand the jsnReader project tree and right click on the com.dataneb.spark package → New → Scala Object → enter jsonfileReader in the object name and press finish. 4. Add external jar files Right click on jsnReader project → properties → Java Build Path → Add External Jars Now navigate to the path where you have installed Spark. You will find all the jar files under /spark/jars folder. After adding these jar files you will find Referenced Library folder created on left panel of your screen below Scala object. You will also find that project has become invalid (red cross sign), we will fix it shortly. 5. Setup Scala Compiler Now right click on jsnReader project → properties → Scala Compiler and check the box Use Project Settings and select Fixed Scala installation: 2.11.11 (built-in) from drop-down options. After applying these changes, you will find project has become valid again (red cross sign is gone). 6. Sample code Open jsonfileReader.scala and copy-paste the code written below. I have written separate blog to explain what are basic terminologies used in Spark like RDD, SparkContext, SQLContext, various transformations and actions etc. You can go through this for basic understanding. However, I have explained little bit in comments above each line of code what it actually does. For list of spark functions you can refer this. // Your package name package com.dataneb.spark // Each library has its significance, I have commented in below code how its being used import org.apache.spark._ import org.apache.spark.sql._ import org.apache.log4j._ object jsonfileReader { // Reducing the error level to just "ERROR" messages // It uses library org.apache.log4j._ // You can apply other logging levels like ALL, DEBUG, ERROR, INFO, FATAL, OFF etc Logger.getLogger("org").setLevel(Level.ERROR) // Defining Spark configuration to define application name and the local resources to use // It uses library org.apache.spark._ val conf = new SparkConf().setAppName("Sample App") conf.setMaster("local") // Using above configuration to define our SparkContext val sc = new SparkContext(conf) // Defining SQL context to run Spark SQL // It uses library org.apache.spark.sql._ val sqlContext = new SQLContext(sc) // Main function where all operations will occur def main (args: Array[String]): Unit = { // Reading the json file val df = sqlContext.read.json("/Volumes/MYLAB/testdata/multilinecolors.json") // Printing schema df.printSchema() // Saving as temporary table df.registerTempTable("JSONdata") // Retrieving all the records val data=sqlContext.sql("select * from JSONdata") // Showing all the records data.show() // Stopping Spark Context sc.stop } } 7. Run the code! Right click anywhere on the screen and select Run As Scala Application. That's it!! If you have followed the steps properly you will find the result in Console. We have successfully loaded JSON file using Spark SQL dataframes. Printed JSON schema and displayed the data. Try reading single line JSON file which we created earlier. There is a multiline flag which you need to make true to read such files. Also, you can save this data in HDFS, database or CSV file depending upon your need. If you have any question, please don't forget to write in comments section below. Thank you. Next: How to convert RDD to dataframe? Navigation menu ​ 1. Apache Spark and Scala Installation 1.1 Spark installation on Windows​ 1.2 Spark installation on Mac 2. Getting Familiar with Scala IDE 2.1 Hello World with Scala IDE​ 3. Spark data structure basics 3.1 Spark RDD Transformations and Actions example 4. Spark Shell 4.1 Starting Spark shell with SparkContext example​ 5. Reading data files in Spark 5.1 SparkContext Parallelize and read textFile method 5.2 Loading JSON file using Spark Scala 5.3 Loading TEXT file using Spark Scala 5.4 How to convert RDD to dataframe? 6. Writing data files in Spark ​6.1 How to write single CSV file in Spark 7. Spark streaming 7.1 Word count example Scala 7.2 Analyzing Twitter texts 8. Sample Big Data Architecture with Apache Spark 9. What's Artificial Intelligence, Machine Learning, Deep Learning, Predictive Analytics, Data Science? 10. Spark Interview Questions and Answers

  • "Hello World" with Scala IDE

    Main menu: Spark Scala Tutorial In this Apache Spark Scala tutorial you will learn how to create, "Hello World" Scala application with Eclipse Scala IDE. Scala application, project, package, objects, run configuration and debug the application. Motive is to get you guys familiar with Scala IDE. I assume that you have already installed Eclipse Scala IDE, if not please refer my previous post for installation (Windows | Mac users). Now, open Eclipse Scala IDE. 1. Create a new Scala project "hellooWorld" Go to File → New → Project and enter hellooWorld in project name field and click finish. 2. Create a new Scala Package "hellooWorld" Right click on the hellooWorld project in the Package Explorer panel → New → Package and enter name hellooWorld and finish. 3. Create a Scala object "hello": Expand the hellooWorld project tree and right click on the hellooWorld package → New → Scala Object → enter hello in the Object name field and press finish. 4. Write the program to print Hello World message. package hellooWorld object hello { def main (args: Array[String]) { println("Hello World") } } 5. Create a Run configuration for the Scala application: Right click on hello.scala in package explorer → Run As → Scala Application. Select the first matching item, the HellooWorld class and click the Ok. OR you can also define configuration manually and run it. Just mention the project and class name. That's it. You can see the output Hello World in output console. That's all guys! If you face any problem while running the program please mention in the comments section below. Thank you. Next: Spark Shell Navigation menu ​ 1. Apache Spark and Scala Installation 1.1 Spark installation on Windows​ 1.2 Spark installation on Mac 2. Getting Familiar with Scala IDE 2.1 Hello World with Scala IDE​ 3. Spark data structure basics 3.1 Spark RDD Transformations and Actions example 4. Spark Shell 4.1 Starting Spark shell with SparkContext example​ 5. Reading data files in Spark 5.1 SparkContext Parallelize and read textFile method 5.2 Loading JSON file using Spark Scala 5.3 Loading TEXT file using Spark Scala 5.4 How to convert RDD to dataframe? 6. Writing data files in Spark ​6.1 How to write single CSV file in Spark 7. Spark streaming 7.1 Word count example Scala 7.2 Analyzing Twitter texts 8. Sample Big Data Architecture with Apache Spark 9. What's Artificial Intelligence, Machine Learning, Deep Learning, Predictive Analytics, Data Science? 10. Spark Interview Questions and Answers

  • Scala IDE \.metadata\.log error fix (Mac)

    Scala IDE is not compatible with Java SE 9 and higher versions. You might need to downgrade or install Java SE 8 in order to fix the issue. Lets go through each step how you can fix it. Step 1. Check what version of Java you have installed on your machine. Run this command in your terminal: /usr/libexec/java_home --verbose As you can see I have three different versions of java running on my machine. Step 2: Install Java SE 8 (jdk1.8) if you don't find it in your list. Refer this blog for java installation steps. Step 3: Now open your .bashrc file (run command: vi ~/.bashrc) and copy-paste below line in your bashrc file. export JAVA_HOME=$(/usr/libexec/java_home -v 1.8) Step 4: Save the file (:wq!) and reload your profile (source ~/.bashrc) Step 5: Now you need to define eclipse.ini arguments in order to use Java 1.8 version. On a Mac OS X system, you can find eclipse.ini by right-clicking (or Ctrl+click) on the Scala IDE executable in Finder, choose Show Package Contents, and then locate eclipse.ini in the Eclipse folder under Contents. The path is often /Applications/Scala IDE.app/Contents/Eclipse/eclipse.ini Step 6: Open it with text editor and copy-paste below line in eclipse.ini file. Change the version (if needed) according to your java version. Mine is 1.8.0_171. -vm /Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/bin Step 7: Save the file and exit. Step 8: Run the Scala IDE application now and it should run: If you are still facing problem, please mention it in the comments section below. Thank you! #ScalaIDEinstallation #metadatalogerror #eclipse #metadata #log #error Learn Apache Spark in 7 days, start today! Navigation menu ​ 1. Apache Spark and Scala Installation 1.1 Spark installation on Windows​ 1.2 Spark installation on Mac 2. Getting Familiar with Scala IDE 2.1 Hello World with Scala IDE​ 3. Spark data structure basics 3.1 Spark RDD Transformations and Actions example 4. Spark Shell 4.1 Starting Spark shell with SparkContext example​ 5. Reading data files in Spark 5.1 SparkContext Parallelize and read textFile method 5.2 Loading JSON file using Spark Scala 5.3 Loading TEXT file using Spark Scala 5.4 How to convert RDD to dataframe? 6. Writing data files in Spark ​6.1 How to write single CSV file in Spark 7. Spark streaming 7.1 Word count example Scala 7.2 Analyzing Twitter texts 8. Sample Big Data Architecture with Apache Spark 9. What's Artificial Intelligence, Machine Learning, Deep Learning, Predictive Analytics, Data Science? 10. Spark Interview Questions and Answers

  • Installing Apache Spark and Scala (Mac)

    Main menu: Spark Scala Tutorial In this Spark Scala tutorial you will learn, How to install Apache Spark on Mac OS. By the end of this tutorial you will be able to run Apache Spark with Scala on Mac machine. You will also download Eclispe for Scala IDE. To install Apache Spark on windows machine visit this. Installing Homebrew You will be installing Apache Spark using Homebrew. So install Homebrew if you don’t have it, visit: https://brew.sh/ and copy paste the command on your terminal and run it. Installing Apache Spark Open terminal and type command brew install apache-spark and hit Enter. Create a log4j.properties file. Type cd /usr/local/Cellar/apache-spark/2.3.1/libexec/conf and hit Enter. Please change the version according to your downloaded version. Spark 2.3.1 is the version installed for me. Type cp log4j.properties.template log4j.properties and hit Enter. Edit the log4j.properties file and change the log level from INFO to ERROR on log4j.rootCategory. We are just changing the log level from INFO to ERROR only. Download Scala IDE Install the Scala IDE from here. Open the IDE once, just to check if it's running fine. You should see panels like this; Test it out! Open terminal and go to the directory where apache-spark was installed to (such as cd /usr/local/Cellar/apache-spark/2.3.1/libexec/) and then type ls to get a directory listing. Look for a text file, like README.md or CHANGES.txt. Type command spark-shell and hit Enter. At this point you should see a scala> prompt. If not, double check the steps above. Type val rdd = sc.textFile("README.md") or whatever text file you’ve found and hit Enter. You have just created a rdd of readme text file. Now type rdd.count() and hit Enter to count the number of lines in text file. You should get a count of number of lines in that file! Congratulations, you just ran your first Spark program! Don't worry about the commands, I will explain them. Sample Execution You’ve got everything set up! If you have any question please don't forget to mention in the comments section below. Main Menu | Next: Just enough Scala for Spark Navigation menu ​ 1. Apache Spark and Scala Installation 1.1 Spark installation on Windows​ 1.2 Spark installation on Mac 2. Getting Familiar with Scala IDE 2.1 Hello World with Scala IDE​ 3. Spark data structure basics 3.1 Spark RDD Transformations and Actions example 4. Spark Shell 4.1 Starting Spark shell with SparkContext example​ 5. Reading data files in Spark 5.1 SparkContext Parallelize and read textFile method 5.2 Loading JSON file using Spark Scala 5.3 Loading TEXT file using Spark Scala 5.4 How to convert RDD to dataframe? 6. Writing data files in Spark ​6.1 How to write single CSV file in Spark 7. Spark streaming 7.1 Word count example Scala 7.2 Analyzing Twitter texts 8. Sample Big Data Architecture with Apache Spark 9. What's Artificial Intelligence, Machine Learning, Deep Learning, Predictive Analytics, Data Science? 10. Spark Interview Questions and Answers

  • Installing Apache Spark and Scala (Windows)

    Main menu: Spark Scala Tutorial In this Spark Scala tutorial you will learn how to download and install, Apache Spark (on Windows) Java Development Kit (JDK) Eclipse Scala IDE By the end of this tutorial you will be able to run Apache Spark with Scala on Windows machine, and Eclispe Scala IDE. JDK Download and Installation 1. First download JDK (Java Development Kit) from this link. If you have already installed Java on your machine please proceed to Spark download and installation. I have already installed Java SE 8u171/ 8u172 (Windows x64) on my machine. Java SE 8u171 means Java Standard Edition 8 Update 171. This version keeps on changing so just download the latest version available at the time of download and follow these steps. 2. Accept the license agreement and choose the OS type. In my case it is Windows 64 bit platform. 3. Double click on downloaded executable file (jdk*.exe; ~200 MB) to start the installation. Note down the destination path where JDK is installing and then complete the installation process (for instance in this case it says Install to: C:\Program Files\Java\jdk1.8.0_171\). Apache Spark Download & Installation 1. Download a pre-built version of Apache Spark from this link. Again, don't worry about the version, it might be different for you. Choose latest Spark release from drop down menu and package type as pre-built for Apache Hadoop. 2. If necessary, download and install WinRAR so that you can extract the .tgz file that you just downloaded. 3. Create a separate directory spark in C drive. Now extract Spark files using WinRAR, and copy its contents from downloads folder => C:\spark. ​Please note you should end up with directory structure like C:\spark\bin, C:\spark\conf, etc as shown above. Configuring windows environment for Apache Spark 4. Make sure you "Hide file extension properties" in your file explorer (view tab) is unchecked. Now go to C:\spark\conf folder and rename log4j.properties.template file to log4j.properties. You should see filename as log4j.properties and not just log4j. 5. Now open log4j.properties with word pad and change the statement log4j.rootCategory=INFO, console --> log4j.rootCategory=ERROR, console. Save the file and exit, we did this change to capture ERROR messages only when we run Apache Spark, instead of capturing all INFO. 6. Now create C:\winutils\bin directory. Download winutils.exe from GitHub and extract all the files. You will find multiple versions of Hadoop inside it, you just need to focus on Hadoop version which you selected while downloading package type pre-built Hadoop 2.x/3.x in Step 1. Copy all the underlying files (all .dll, .exe etc) from Hadoop version folder and move it into C:\winutils\bin folder. This step is needed to make windows fool as we are running Hadoop. This location (C:\winutils\bin) will act as Hadoop home. 7. Now right-click your Windows menu, Select Control Panel --> System and Security --> System --> “Advanced System Settings” --> then click “Environment Variables” button. Click on "New" button in User variables and add 3 variables: SPARK_HOME c:\spark JAVA_HOME (path you noted while JDK Installation Step 3, for example C:\Program Files\Java\jdk1.8.0_171) HADOOP_HOME c:\winutils 8. Add the following 2 paths to your PATH user variable. Select "PATH" user variable and edit, if not present create new. %SPARK_HOME%\bin %JAVA_HOME%\bin Download and Install Scala IDE 1. Now install the latest Scala IDE from here. I have installed Scala-SDK-4.7 on my machine. Download the zipped file and extract it. That's it. 2. Under Scala-SDK folder you will find eclipse folder, extract it to c:\eclipse. Run eclipse.exe and it will open the IDE (we will use this later). Now test it out! Open up a Windows command prompt in administrator mode. Right click on command prompt in search menu and run as admin. Type java -version and hit Enter to check if Java is properly installed. If you see the Java version that means Java is installed properly. Type cd c:\spark and hit Enter. Then type dir and hit Enter to get a directory listing. Look for any text file, like README.md or CHANGES.txt. Type spark-shell and hit Enter. At this point you should have a scala> prompt as shown below. If not, double check the steps above, check the environment variables and after making change close the command prompt and retry again. Type val rdd = sc.textFile(“README.md”) and hit Enter. Now type rdd.count() and hit Enter. You should get a count of the number of lines from readme file! Congratulations, you just ran your first Spark program! We just created a rdd with readme text file and ran count action on it. Don't worry we will be going through this in detail in next sections. Hit control-D to exit the spark shell, and close the console window. You’ve got everything set up! Hooray! Note for Python lovers - To install pySpark continue to this blog. Thats all! Guys if it's not running, don't worry. Please mention in comments section below and I will help you out with installation process. Thank you. Next: Just enough Scala for Spark Navigation menu ​ 1. Apache Spark and Scala Installation 1.1 Spark installation on Windows​ 1.2 Spark installation on Mac 2. Getting Familiar with Scala IDE 2.1 Hello World with Scala IDE​ 3. Spark data structure basics 3.1 Spark RDD Transformations and Actions example 4. Spark Shell 4.1 Starting Spark shell with SparkContext example​ 5. Reading data files in Spark 5.1 SparkContext Parallelize and read textFile method 5.2 Loading JSON file using Spark Scala 5.3 Loading TEXT file using Spark Scala 5.4 How to convert RDD to dataframe? 6. Writing data files in Spark ​6.1 How to write single CSV file in Spark 7. Spark streaming 7.1 Word count example Scala 7.2 Analyzing Twitter texts 8. Sample Big Data Architecture with Apache Spark 9. What's Artificial Intelligence, Machine Learning, Deep Learning, Predictive Analytics, Data Science? 10. Spark Interview Questions and Answers

  • Elasticsearch Tutorial - What is ELK stack (Elastic stack)

    In this Elasticsearch tutorial, you will learn - What is ELK stack (Elastic stack)? We will go through ELK stack examples, load data into Elasticsearch stack and create Kibana dashboard. Navigation Menu: Introduction to ELK Stack Installation Load data into Elasticsearch stack with Logstash Create Kibana Dashboard Example Kibana GeoIP Dashboard Example What is ELK stack (now called Elastic Stack)? ELK stack is an acronym for three open-source products - Elasticsearch, Logstash & Kibana. However all three components are maintained by Elastic. ELK stack started as a Log Analytics solution but later it evolved into enterprise search and analytics platform. Elasticsearch is based on Lucene search engine and you can consider it as a NoSQL database which has capability to index (for text search) and store the data. Logstash is basically a data pipeline technique that can connect to various sources based on various plugins, apply transformations and loads data into various targets including Elasticsearch. In short, Logstash collects and transforms the data and sometimes used for data shipping as well. Kibana is a data visualization platform where you will create dashboards. Another tool called Filebeat is one of the Beats member which can also perform similar tasks like Logstash. ELK Stack Architecture Here is the basic architecture of elastic stack. Notice I haven't mentioned the source in below diagram. Usually data source for ELK stack are various log files, for example application log, server logs, database log, network switch log, router log etc. These log files are consumed using filebeat. Filebeat acts like data collector which collects various types of log files (when we have more than one type of log file). Now-a-days, Kafka is used as another layer which distributes files collected by filebeat to various queue from where logstash transform it and stores in elasticsearch for visualization. So complete flow would look like - [application log, server logs, database log, network switch log, router log etc] => Filebeat => Kafka => ELK Stack. Please note this could be changed based on architecture needed for a project. If there are limited types of log files, sometimes you might even not consider using filebeat or kafka and directly dump logs into ELK stack. Fun Fact: ELK stack Google Trend Elasticsearch is most famous amongst the stack. Refer the Google Trend shown below. Why is ELK stack is so popular worldwide, basically due to 3 major reasons. First of all, price - Its open source tool, easy to learn and free of cost. If you consider other visualization tools like QlikView and Tableau - Kibana provides you similar capabilities without any hidden cost. Elasticsearch is used by many big companies for example Wikipedia & GitHub. Second, its elegant user interface. You can spend time exploring and reviewing data not trying to figure out how to navigate the interface. And last but not the least, its extensible. Elasticsearch is schema-free NoSQL database and can scale horizontally. It is also used for real time analytics. Next: ELK Installation Navigation Menu: Introduction to ELK Stack Installation Load data into Elasticsearch stack with Logstash Create Kibana Dashboard Example Kibana GeoIP Dashboard Example #ELKStack #Elasticsearch #Logstash #Kibana #ElasticsearchTutorial #ElasticStack #ELKTutorial

bottom of page