top of page

Results found for ""

  • Kafka Producer Example

    In this Apache Kafka tutorial you will learn - How to Install Apache Kafka on Mac using homebrew. To install Kafka on linux machine refer this. Kafka Zookeeper Installation $ brew install kafka Command will automatically install Zookeeper as dependency. Kafka installation will take a minute or so. If you are working in cluster mode, then you need to install it on all the nodes. How to start Kafka & Zookeeper? You don't need to run these commands right now but just for understanding you can see how to start them in output log. To start zookeeper now and restart at login, you need to run this: $ brew services start zookeeper Or, if you don't want/need a background service you can just run: $ zkServer start To start kafka now and restart at login: $ brew services start kafka Or, if you don't want/need a background service you can just run: $ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties Zookeeper & Kafka Server Configuration You can open Zookeeper properties file to see default configuration, there is not much to explain here. You can see the port number where client (kafka in this case) will connect, directory where snapshot will be stored and the max number of connections per-ip address. $ vi /usr/local/etc/kafka/zookeeper.properties # the directory where the snapshot is stored. dataDir=/usr/local/var/lib/zookeeper # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 Similarly, you can see default Kafka server properties. You just need to change listener settings here to localhost (standalone mode) or change it to ip-address of node in cluster mode. $ vi /usr/local/etc/kafka/server.properties Server basics - Basically you define broker id here, its unique integer value for each broker. Socket server settings - Here, you define the listener hostname and port, by default it's commented out. For this example hostname will be localhost, but in case of cluster you need to mention respective ip-addresses. Setup like this, listeners=PLAINTEXT://localhost:9092 Log basics - Here you define log directory, number of log partitions per topic and recovery thread per data directory. Internal topic settings - Here you can change topic replication factor which is by default 1, usually in production environment its > 1. Log flush policy - By default everything is commented out. Log retention policy - Default retention hour is 168. Zookeeper - Default port number is same which you saw during installation : 2181 Group coordinator settings - This is the rebalance time in milliseconds when new member joins as consumer. Kafka topics are usually multi-subscriber, i.e. there will be multiple consumers to one topic. However, it can have 0,1 or more consumers. Starting Zookeeper & Kafka To start Zookeeper and Kafka, you can start them together like below or run each command separately i.e. start Zookeeper first and then start Kafka. $ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties This will print a long list of INFO, WARN and ERROR messages. You can scroll back up and look for WARN and ERRORS if any. You can see producer id, broker id in the log, similarly other properties which is setup by default in kafka properties file which I explained earlier. Let this process run, don't kill it. Create a Topic and Start Kafka Producer To create a topic and to start producer, you need to run this command; $ kafka-console-producer --broker-list localhost:9092 --topic topic1 Here my topic name is "topic1" and this terminal will act as producer. You can send messages from this terminal. Start Kafka Consumer Now, start the Kafka consumer, you need to run this command; $ kafka-console-consumer --bootstrap-server localhost:9092 --topic topic1 --from-beginning Bootstrap-server is basically the server to connect to. For this example its localhost with 9092 default port. Screen on the left is Producer and screen on the right is Consumer. You can see how messages are transferred from one terminal to another. Thank you. If you have any question please mention in comments section below. #Kafka

  • How to write your first blog on Dataneb?

    01. Sign Up First sign up, after this, you should automatically receive writer's privilege within 24 hours. If you don't get writer's privilege, please email us. 02. Start Writing Go to your profile > Create a post. Before you start, please read blogging guidelines carefully. Blogging Guidelines These rules are meant to keep a quality blogging environment at Dataneb. Blog Uniqueness Blogs should be unique. Dataneb does not accept syndicated/unoriginal posts, research papers, duplicate posts, copying of others' content/articles, etc... NOTE: Violation to this guidelines will result into direct loss of writers privilege. Blog Length Blogs should have a minimum length of 2000 characters, there is no upper limit. You will find the total number of characters in the top left corner of the editor while drafting blogs. Blogs not fulfilling this criterion will be automatically moved to draft status. Image Requirement You can add images (but they should not be copyrighted images). Or, you can leave it to us. One of our moderators will handle image requirements. Back-links Backlinks are allowed (maximum 5 & sometimes more) as far as the intention is clear. Make sure you are not linking any blacklisted websites. Miscellaneous Moderators have the authority to add keywords and modify texts, images, etc so that your blog can get a higher Google ranking. This will help your blog to get more organic views. You can delete your post anytime, but Dataneb has full rights to re-publish that content again. Wait! There is an Easier Way to Publish Your Blog We understand that you don't want to publish your blog without review. Don't worry! Just draft the blog and save it. Email us when your blog is ready to publish and one of our moderators will review and publish it for you. If you are just a member and don't want to become a writer. You can also write your post in a word document and email us for submission. What's next? Share your blog post on Facebook, Twitter, etc to get more views, earn badges and invite others. Sharing blogs on social media is the easiest and fastest way to earn views. We value your words, please don't hurt others' feelings while commenting on blog posts, and maintain a quality environment at Dataneb. Email us if you have any queries.

  • SSB Interview Procedure

    SSB interview procedure is a mandatory process to become a Service Selection Board officer in any Indian defense force irrespective of the age group and kind of entry it is. There just can’t be any Indian defense officer who has not experienced this five-day Service Selection Board (SSB) Interview process. Because of its high rejection rate, it has gained too much importance for aspiring candidates and so has expert guidance on it in terms of coaching and books published over it. Still, it’s very difficult to get a correct methodology to prepare for it and clear it successfully. I believe this complete phenomenon of the SSB Interview has become more commercialized and made it a kind of hype and correct and easier procedure had been lost in the process. Table of Contents: SSB Interview Procedure Introduction Day of reporting Day 1: Screening Test Day 2: Psychology Test Day 3-4: GTO Tasks & Interview Day 5: Conference SSB interview is a very scientifically designed evaluation system that ensures the correct intake of officers into the system for its overall growth. The board assesses the suitability of the candidate for becoming an officer, It consists of personality, intelligence tests, and interviews. The tests are of both types i.e. written and practical task-based. In total there are thirteen Service Selection Boards across India, out of which four boards are for the Indian Army, four boards are for the Indian Air Force, and five boards for the Indian Navy. The Service Selection Boards of the Indian Army are located at: SSB North (Kapurthala, PB) SSB South (Bangalore, KN) SSB Central (Bhopal, MP) SSB East (Allahabad, UP) SSB interviews consist of three separate evaluation systems spread over five days. The day-wise procedure is given below. Day of reporting The selected candidates are provided exact date and time of reaching the respective SSB center in their call letter. The reception center is established at the nearest railway station which further arranges the necessary pick and drop from the station to the center. Upon arrival, a distinguishing chest no is provided to each candidate which in turn becomes their identity for this exam process. Their educational documents are checked for initial verification and they are allotted a berth for stay. A briefing about the schedule, various tests, and general instructions are given. Day 1: Screening Test On the first day, the Screening test is conducted which segregates the best from the crowd. Normally more than half of the candidates don’t make it beyond this point. Screening Tests include; Intelligence Test – This consists of two Tests. Verbal and Non-Verbal. (About 50 questions each) Picture Perception & Picture Description Test (PPDT) – In this test, a picture is shown to the candidates for 30 seconds. Each candidate observes it and then, in the next one minute, must record the number of characters seen in the picture. Then, in four minutes, draft a story from the picture (and not just describe the picture). The candidate must record the mood, approximate age, and gender of the "main character". Group discussion on the PPDT – In stage two of the PPDT, the candidates are given their stories, which they may revise. Then, in a group, each candidate must narrate his story in under one minute. The group is then asked to create a common story involving each of their perceived picture stories. Selected candidates are shifted to different accommodations where they are going to stay for the next four days of the interview process. The remaining candidates are sent back to their house. Day 2: Psychology Test The following tests are conducted during the Second day of SSB interviews. Thematic Appreciation Test (TAT) - Candidates are shown a picture for thirty seconds and then write a story in the next four minutes. Twelve such pictures are shown sequentially. The last picture is a blank slide inviting the candidates to write a story of their choice. Word Association Test (WAT) - Candidates are shown sixty simple, everyday words for fifteen seconds each and they need to write a sentence on each word.Thematic Apperception Test (TAT). Situation Reaction Test (SRT) - A booklet of 60 situations is given in which responses are to be completed in 30 minutes. Self Description Test (SDT) - The candidate is asked 5 questions about the there's parents, teachers, friends, and his perception about himself. Day 3-4: GTO Tasks & Interview The following tests are conducted during the day of the SSB interview. Group Discussion Test (GD) Military Planning Exercise (MPE) Progressive Group Task (PGT) Individual Lecturettes Group Obstacle Race Half Group Task Personal interview of candidates is taken by the SSB Board president. Day 5: Conference All the officers (in proper uniform) attend the conference where each candidate has a conversation with a panel of assessors. The assessors look for confidence and expression when speaking; a positive attitude in adversity and life; and honesty. Following this, the final results are announced. Successful candidates remain for an intensive medical examination taking three to five days at a military hospital. Thank you. If you have any questions simply comment below. Related Topics

  • Permanent Commission in Indian Army

    In the Indian Army, a cadet upon completion of training is commissioned into service in either Permanent Commission (PC) or Short Service Commission (SSC). In addition to the initial contract of service, there are several other differences between these two commissions. We will discuss these differences in a subsequent article; here I present a brief about the Permanent commissions and various entries that are part of this and the academy at which cadets are trained before commissioning. Table of Contents: Permanent Commission in Indian Army Indian Military Academy, Dehradun 1. National Defence Academy, Pune 2. Combined Defence Service Examination (CDSE) 3. University Entry Scheme (Pre-Final Year Students Only) 4. Technical Graduate Course 5. AEC (Men) Officers Training Academy, Gaya(10+2 (TES) Entry) A Permanent Commission means a career in the Army till you retire. For Permanent Commission, you have to join the Indian Military Academy, Dehradun, or the Officers Training Academy, Gaya. Indian Military Academy, Dehradun Indian Military Academy is a cradle of leadership. The IMA trains you to lead from the front. You are trained in all aspects of combat and tactics using technology and other modern tools and technologies. The IMA has excellent facilities for all-round development. At the I.M.A Army Cadets are known as Gentlemen Cadets and are given strenuous military training for one year. On successful completion of training Gentlemen Cadets are granted Permanent Commission in the rank of Lieutenant subject to being medically fit in "SHAPE-I” one. Gentlemen Cadets during the entire duration of training in Service Academies i.e. during the training period at IMA shall be entitled to a stipend of Rs 21,000/- p.m. From the IMA, you're commissioned as a "Lieutenant" in the Indian Army, to go out into the world and live up to the IMA motto - "Valour & Wisdom". The main entries to get into IMA are as under: 1. National Defence Academy, Pune The cadets for the three services, viz, Army, Navy, and Air Force are given preliminary training in both academic and physical for 3 Years at the National Defence Academy(Pune), which is an Inter-Service Institution. The training during the first two and half years is common to the cadets of three wings. The cadets on passing out will be awarded B.Sc./B.Sc.(Computer)/BA. Degree from Jawaharlal Nehru University, Delhi. On passing out from the National Defence Academy, Army Cadets go to the Indian Military Academy, Dehradun for training and commissioning. 2. Combined Defence Service Examination (CDSE) You can take the CDS Entrance Exam conducted by UPSC while you are in the final year of Graduation / possess any Graduate Degree. Clear the SSB interview, be medically fit, and join IMA as a Direct Entry subject to meeting all eligibility conditions and availability of vacancies. (For details of exam dates / Notification visit the UPSC website) Candidates finally selected will undergo Military Training for 18 Months at IMA. Candidates will be enrolled under the Army Act as Gentlemen Cadets. On successful completion of training gentlemen cadets are granted permanent commission in the rank of Lt subject to being medically fit in SHAPE – I. The final allocation of Arms/Services will be made before passing out of Gentlemen Cadets from IMA. The other entries are Non UPSC entries and there is no written exam for them. You are directly called for an SSB interview and the details are as under: 3. University Entry Scheme (Pre-Final Year Students Only) This entry is for those of you who wish to apply for the Army in the Pre-Final Year of Engineering. Look out for the advertisement published in leading newspapers/employment news in May every year. Selected candidates for the UES Course will be detailed for training at the Indian Military Academy, Dehradun according to their position in the final order of merit, discipline up to the number of vacancies available at that time. The duration of training is one year. Candidates during the period of training will be given a stipend at the rate of the minimum basic pay of a Lieutenant. However, the entire stipend will be paid in lump sum only on reporting for training at IMA. From the date of joining IMA, they will be entitled to full pay and allowances, and other benefits as admissible to regular officers. Engineering graduates who possess the prescribed qualification will be granted a one-year ante-date for seniority, promotion, and increments of pay. 4. Technical Graduate Course Those who are studying in their final year/ have completed BE/B Tech in notified streams can also join IMA through the Technical Graduate Course. Look out for the advertisement published in leading newspapers/employment news in May/Jun & Nov/Dec every year. The duration of training is one year. Selected candidates will be detailed for training at Indian Military Academy, Dehradun according to their position in the final order of merit up to the number of vacancies available in each subject. On successful completion of training, cadets will be granted Permanent Commission in the Army in the rank of Lieutenant. Officers may be granted commission in any Arms/Services and will be liable for service in any part of the world on selected appointments as decided by Army Headquarters from time to time. One year ante date seniority from the date of commission will be granted to Engineering Graduates of TGC Entry. 5. AEC (Men) Candidates who have passed Post Graduate Degree MA/M.Sc in subjects as per Notification /M.Com/MCA/MBA with 1st or 2nd Division are only eligible. Final Year appearing/Result awaiting students are not eligible. The duration of training is one year. Selected candidates will be detailed for training at Indian Military Academy, Dehradun according to their position in the final order of merit up to the number of vacancies available in each subject. On successful completion of training, cadets will be granted Permanent Commission in the Army in the rank of Lieutenant. Officers Training Academy, Gaya(10+2 (TES) Entry) You can apply after passing your 12th Exams. A minimum aggregate of 70% is mandatory in Physics, Chemistry, and Mathematics. You will be detailed for the SSB interview based on the cut-off as decided by the Recruiting Directorate. Look out for the advertisement published in leading newspapers/employment news in May/Oct every year. The duration of training for TES Entry is 5 years, with which the first year is Basic Military Training from Officer Training Academy, Gaya, and the remaining four years of Training at CME Pune or MCTE Mhow or MCEME Secunderabad. The candidates will be awarded an Engineering degree after successful completion of training. The candidates will be given a stipend of Rs. 21,000/- p.m. (Rs 15,600/- as pay in Pay Band plus Grade Pay of Rs. 5,400/-) as is admissible to NDA cadets on completion of 3 years of training. On completion of 4 Years of training, they will be commissioned in the rank of Lieutenant and entitled to pay as admissible to the rank. Thank you. If you have any questions simply comment below. Related Topics

  • Do NDA cadets get holidays like Sunday?

    Yes, NDA cadets get holidays, and the same applies to OTA and IMA cadets. A Sunday, like in IMA or OTA, is nothing less than a holiday for a cadet. The life of a cadet in the National Defence Academy (Pune), Indian Military Academy (Dehradun), or Officers Training Academy (Chennai) is a busy schedule of events comprising various training activities that drain out the energy of a cadet. Irrespective of the various academies, the life of the cadet remains similar. Academy life is an everlasting experience that every military officer cherishes in his life, and Sunday is a special day among those. The free Sunday is the only savior for a cadet who is desperately looking for a break from training schedules and needs to revitalize for the coming weeks. Table of Contents: NDA cadets' holidays Zero haircut Sleep, Sleep & Sleep Liberty Letters Canteen Phone Weapon Cleaning In this blog, I endeavor to bring a few activities that take place during Sunday or any other holiday in NDA, IMA, and OTA. This will help an aspiring cadet to sense and visualize the Sunday in the academy. As an OTA alumni, I may recall rigorous and tough training days, full of kiosks due to various scheduled activities that take place in day-to-day life, and most of the activities used to overlap with each other with no spare time at all. We were so mentally and physically occupied that there was nothing else to think about other than how to get some free time. So now, you can understand the importance of this GREAT SUNDAY in a cadet's life. Sunday is the only break from this busy schedule of the academy, a much-needed time to rest physically, mentally, logistically, and emotionally. This helps to relieve the body from weeklong training and fatigue and prepare it for the coming week. Here is the list of various activities that most of the cadets do on Sunday, these are: 1. Zero Haircut Yes, you can call it the most important event of the day which a cadet looks forward to and makes all the effort to complete it against all his will. Even slight remnants of hair can bring unwanted punishment to a cadet in the upcoming week. 2. Sleep, Sleep & Sleep Yes, you read it right, that's what most of the cadets do. The Sunday break is required for full-time rest due to undergoing various weeklong tough training activities which further results in fatigue. 3. Liberty Going out in the local city market during the daytime is commonly known as liberty. A time to satisfy your eyes by seeing young girls & taste buds by hogging everything available. 4. Letters Yeah, yeah, a bit old-fashioned but we do write letters to our friends and family members. It helps to express your emotions and keep you energized for the week ahead. I didn't realize before the academy, that letters are so good means of communication. 5. Canteen Yes, it’s time to fill up your stocks of emergency ration (in its literal meaning) for upcoming days. Anything that can be eaten without cooking is kept in the room. 6. Phone Most of the time during weekdays one didn't get that much time to speak with family members and friends. Therefore, THE GREAT Sunday comes to the rescue. 7. Weapon Cleaning Seldom cadets need to clean their weapons that could have been left dirty due to weeklong training activity. This is particularly after the week in which some outdoor exercise was conducted in the previous week. I hope I've painted a clear picture of how Sunday is being spent in the academy. Comment if you have any questions about the life of a cadet in a training academy. Related Topics

  • Short Service Commission in Indian Army

    In the Indian Army, Short Service Commission (SSC) recruitment grants qualified candidates the option of joining the Indian Army as a "Short Service Commissioned Officer" for 10/14 years. At the end of 10 years, you have 3 options. Convert a Short Service Commission to a Permanent Commission (PC). You can opt for 4 years extension. You can resign at any time during this period of 4 4-year extension. Or simply opt out! Table of Contents: Short Service Commission in Indian Army Introduction What happens after SSC selection? SSC Training 1. Short Service Commission (Non-Technical) Men & Women 2. Short Service Commission (Technical) Men & Women 3. Short Service Commission (NCC) Men & Women 4. Short Service Commission (JAG) Men & Women Short Service Commission empowers you with analytical thinking, planning skills, and administrative and organizational abilities. These qualities will make you an invaluable asset for any organization that you join after the Army and there are opportunities to sidestep to an alternate career for which the Army will also help you. Officers Training Academy, Chennai What happens after SSC selection? Once selected for the Short Service Commission, you go to the Officers Training Academy in Chennai. Selected candidates will be detailed for training at the Officers Training Academy, Chennai according to your position in the final order of merit, up to the number of vacancies available at the time. SSC Training Duration of training is 49 weeks (approximately 11 months) training at OTA, Chennai, on successful completion of which candidates will be granted commission in the rank of Lieutenant. Ante date seniority will be granted only to Technical Graduates joining through Technical Entry. Training at OTA is at Govt. expense, all candidates who complete Pre-Commission training at Officers Training Academy, Chennai will be awarded "Post Graduate Diploma in Defence Management and Strategic Studies” by the University of Madras. OTA training provides you with opportunities to broaden your perspective and widen your horizon. Whether it's skeet-shooting, golf, riding, or angling, at OTA you can do it all. Officers granted Short Service Commission are liable to serve anywhere in India and abroad. The main entries for Men & Women both are as under: 1. Short Service Commission (Non-Technical) Men & Women Officers' recruitment through this entry is advertised through the Employment News and all important National and Regional Newspapers/dailies in Jul and Nov. The Mode of Application is Online only through the official website of UPSC. Selection for short service commission is through a written exam conducted by the UPSC twice every year in the months Sept. and Feb, followed by an interview by the Service Selection Board (SSB). Courses are conducted at OTA, Chennai twice every year in the Months of April and October. 2. Short Service Commission (Technical) Men & Women Officers Recruitment through this entry is advertised through the Employment News and All important National and Regional Newspapers/dailies in January and July. The mode of Application is Online only through the official website of the Recruitment Directorate. Selection for short service commission is through the Services Selection Board (SSB) and there is no written exam for this entry as of date. The candidates are shortlisted for SSB interview based on the cut-off percentage applied on marks obtained by the candidates up to the 6th Semester. Courses are conducted at OTA, Chennai twice every year in the Months of April and October. 3. Short Service Commission (NCC) Men & Women Officers Recruitment through this entry is advertised through the Employment News and All important National and Regional Newspapers/dailies in Jun and Dec. If you have done NCC Senior Division (Army) and obtained a 'C' certificate with a minimum 'B' grade, you can apply through your NCC Battalion from where NCC 'C' certificate has been issued to you. 4. Short Service Commission (JAG) Men & Women To apply for JAG Entry, you have to be a Law Graduate with a minimum of 55% aggregate marks in LLB Degree (Three years professional after graduation or five years after the 10+2 examination). The candidates should be eligible for registration with the Bar Council of India/State. The candidate should be from a College/University recognized by the Bar Council of India. Officers Recruitment through this entry is advertised through the Employment News and All important National and Regional Newspapers/dailies in June and December. Thank you. If you have any questions simply comment below. Related Topics

  • Write CSV/JSON data to Elasticsearch using Spark dataframes

    Elasticsearch-hadoop connector allows Spark-elasticsearch integration in Scala and Java language. Elasticsearch-hadoop library helps Apache Spark to integrate with Elasticsearch. Contents: Write JSON data to Elasticsearch using Spark dataframe Write CSV file to Elasticsearch using Spark dataframe I am using Elasticsearch version [7.3.0], Spark [2.3.1] and Scala [2.11]. Download Jar In order to execute Spark with Elasticsearch, you need to download proper version of spark-elasticsearch jar file and add it to Spark's classpath. If you are running Spark in local mode it will be added to just one machine but if you are running in cluster, you need to add it per-node. I assume you have already installed Elasticsearch, if not please follow these for installation steps (Linux | Mac users). Elasticsearch installation is very easy and it will be done in few minutes. I would encourage you all to install Kibana as well. Now, you can download complete list of hadoop library (Storm, Mapreduce, Hive and Pig as shown below) from here. I have added elasticsearch-spark-20_2.10-7.3.0.jar because I am running Elastics 7.3 version. [Tip] Make sure you are downloading correct version of jar, otherwise you will get this error during execution: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Unsupported/Unknown Elasticsearch version x.x.x Adding Jar (Scala IDE) If you are using Scala IDE, just right click on project folder => go to properties => Java build path => add external jars and add the downloaded jar file. Apply and close. Adding Jar (Spark-shell) If you are using Spark-shell, just navigate to the Spark executable library where you can see all other jar files and add the downloaded jar file there. For example, Start Elasticsearch & Kibana Now, make sure Elasticsearch is running. If Elasticsearch is not running, Spark will not be able to make connection and you will get this error. org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed. To start Elasticsearch and Kibana run this command on your terminal, $ elasticsearch $ kibana Writing JSON data to Elasticsearch In all sections these three steps are mandatory, Import necessary elasticsearch spark library Configure ES nodes Configure ES port If you are running ES on AWS just add this line to your configurations - .config("spark.es.nodes.wan.only","true") JSON file multilinecolors.json sample data: [ { "color": "red", "value": "#f00" }, { "color": "green", "value": "#0f0" }, { "color": "blue", "value": "#00f" }, { "color": "cyan", "value": "#0ff" }, { "color": "magenta", "value": "#f0f" }, { "color": "yellow", "value": "#ff0" }, { "color": "black", "value": "#000" } ] package com.dataneb.spark import org.apache.spark.sql.SparkSession import org.elasticsearch.spark.sql._ object toES { def main(args: Array[String]): Unit = { // Configuration val spark = SparkSession .builder() .appName("WriteJSONToES") .master("local[*]") .config("spark.es.nodes","localhost") .config("spark.es.port","9200") .getOrCreate() // Create dataframe val colorsDF = spark.read.json("/Volumes/MYLAB/testdata/multilinecolors.json") // Write to ES with index name in lower case colorsDF.saveToEs("dataframejsonindex") } } [Tip] Make sure you are writing index name in lower case otherwise you will get error: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Illegal write index name [ABCindex]. Write resources must be lowercase singular index names, with no illegal pattern characters except for multi-resource writes. Here is the Scala IDE output, You can also check the index created in Elasticsearch, go to Management => ES Index Management You can further discover the index pattern in Kibana; Writing CSV data to Elasticsearch books.csv sample data: bookID,title,authors,average_rating,isbn,isbn13,language_code,# num_pages,ratings_count,text_reviews_count 1,Harry Potter and the Half-Blood Prince (Harry Potter #6),J.K. Rowling-Mary GrandPré,4.56,0439785960,9780439785969,eng,652,1944099,26249 2,Harry Potter and the Order of the Phoenix (Harry Potter #5),J.K. Rowling-Mary GrandPré,4.49,0439358078,9780439358071,eng,870,1996446,27613 3,Harry Potter and the Sorcerer's Stone (Harry Potter #1),J.K. Rowling-Mary GrandPré,4.47,0439554934,9780439554930,eng,320,5629932,70390 4,Harry Potter and the Chamber of Secrets (Harry Potter #2),J.K. Rowling,4.41,0439554896,9780439554893,eng,352,6267,272 5,Harry Potter and the Prisoner of Azkaban (Harry Potter #3),J.K. Rowling-Mary GrandPré,4.55,043965548X,9780439655484,eng,435,2149872,33964 8,Harry Potter Boxed Set Books 1-5 (Harry Potter #1-5),J.K. Rowling-Mary GrandPré,4.78,0439682584,9780439682589,eng,2690,38872,154 Everything is same except the read method (json => csv) and index name. package com.dataneb.spark import org.apache.spark.sql.SparkSession import org.elasticsearch.spark.sql._ object toES { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("WriteJSONToES") .master("local[*]") .config("spark.es.nodes","localhost") .config("spark.es.port","9200") .getOrCreate() val colorsDF = spark.read.csv("/Volumes/MYLAB/testdata/books*.csv") colorsDF.saveToEs("dataframecsvindex") } } Here is the Scala IDE output, I have two csv files books1.csv and books2.csv so you are seeing 2 task ID in result. You can also check the index created in Elasticsearch, go to Management => ES Index Management You can further create the index pattern in Kibana; You can further discover the index pattern in Kibana. I haven't applied format options to read header while applying csv method in Spark program hence you are seeing header record in the index. Thank you. If you have any question please write in comments section below. Navigation Menu: Introduction to ELK Stack Installation Loading data into Elasticsearch with Logstash Create Kibana Dashboard Example Kibana GeoIP Dashboard Example

  • Just enough Scala for Spark

    In this tutorial you will learn just enough Scala for Spark, it's like a quick guide for Scala basics needed for Spark programming, Scala syntax and few Scala examples. Well, you can't become Scala expert in a day but after reading this post you will be able to write Spark programs. I will be using Spark-shell to run Scala commands, so no installation needed if you have Spark shell running on your machine. I would encourage you to run these commands side-by-side on your machine. Staring with printing "Hello World", for example, scala> println("Hello World") Hello World For comments you can use double forward slash, or for multiline comments you can use similar syntax like Java. For example, ignore the pipe character it's because I am using spark-shell. scala> // Hello Data Nebulae - This is single line comment scala> /* Hello World | This is multi-line comment | Data Nebulae | */ We have two types of variables in Scala - mutable and immutable variables. Mutable variable are defined with var keyword and immutable variable with val keyword. You can't re-assign immutable variables. For example, scala> val myNumber :Int = 7 myNumber: Int = 7 scala> var myWord :String = "Hello" myWord: String = Hello Because myNumber is immutable variable so re-assignment failed scala> myNumber = 10 :25: error: reassignment to val myNumber = 10 scala> myWord = "Dataneb" myWord: String = Dataneb You can specify datatype (Int, Double, Boolean, String) in front of variable name, if not Scala compiler will automatically assign the type (called variable type inference). scala> val myNumber :Int = 10 myNumber: Int = 10 scala> val myFlag = true myFlag: Boolean = true You can also assign variables in pairs, basically tuples similar to Python, scala> val (x, y) = (1, 5) x: Int = 1 y: Int = 5 keep going.. scala> var (x, y, z) = (1, 2, "Hello") x: Int = 1 y: Int = 2 z: String = Hello You can pass these variables to println function scala> println (x) 1 String interpolation, like you do in other languages s with double quotes; scala> println(s"Value of x is: $x") Value of x is: 1 Similar to other languages, you can create a range with step-size and print for each element. scala> (1 to 5).foreach(println) 1 2 3 4 5 scala> (5 to 1 by -1) res144: scala.collection.immutable.Range = Range(5, 4, 3, 2, 1) scala> (5 to 1 by -2) res145: scala.collection.immutable.Range = Range(5, 3, 1) Strings are surrounded by double quotes and characters with single quotes, for example, scala> "Hello Word" res111: String = Hello Word scala> 'H' res112: Char = H scala> :type ('H') Char You can apply similar methods like other languages, length, substring, replace etc, for example scala> "Hello World".length res113: Int = 11 scala> "Hello World".size res1: Int = 11 scala> "Hello World".toUpperCase res2: String = HELLO WORLD scala> "Hello World".contains('H') res5: Boolean = true scala> 19.toHexString res4: String = 13 scala> "Hello World".take(3) res114: String = Hel scala> "Hello World".drop(3) res115: String = lo World scala> "Hello World".substring(3,6) res116: String = "lo " scala> "Hello World".replace("H","3") res123: String = 3ello World scala> "Hello".map(x=>(x,1)) res7: scala.collection.immutable.IndexedSeq[(Char, Int)] = Vector((H,1), (e,1), (l,1), (l,1), (o,1)) Array, List, Map, Set - behaves similarly like other languages data structures scala> val a = Array("Hello", "World", "Scala", "Spark") a: Array[String] = Array(Hello, World, Scala, Spark) // you can access the elements with index positions scala> a(0) res159: String = Hello scala> (a(0),a(3)) res160: (String, String) = (Hello,Spark) Similarly List.. // List of Integers scala> val l = List(1, 2, 3, 4, 5) l: List[Int] = List(1, 2, 3, 4, 5) // List of strings scala> val strings = List("Hello", "World", "Dataneb", "Spark") strings: List[String] = List(Hello, World, Dataneb, Spark) // List of List scala> val listOfList = List(List(1,2,3), List(2,6,7), List(2,5,3)) listOfList: List[List[Int]] = List(List(1, 2, 3), List(2, 6, 7), List(2, 5, 3)) scala> val emptyList = List() emptyList: List[Nothing] = List() Similarly Map.. scala> val m = Map("one" -> 1, "two" -> 2 ) m: scala.collection.immutable.Map[String,Int] = Map(one -> 1, two -> 2) scala> m("two") res163: Int = 2 Set, returns boolean scala> val s = Set("Apple", "Orange", "Banana") s: scala.collection.immutable.Set[String] = Set(Apple, Orange, Banana) scala> s("Apple") res164: Boolean = true scala> s("Grapes") res165: Boolean = false Arithmetic operations + (adds), -(subtracts), *(multiply), / (divide), %(remainder) for example, scala> val (x, y) = (5, 8) x: Int = 5 y: Int = 8 scala> y%x res95: Int = 3 scala> res95 + 7 res110: Int = 10 scala> "Hello" + " World" res0: String = Hello World Relational operators ==, !=, <, >, >=, <= for example, scala> y > x res96: Boolean = true Logical operators &&, ||, ! for example, scala> !(y>x && x>y) res98: Boolean = true Assignment operators =, +=, %= etc for example, like other languages x+=y is same as x=x+y; scala> var (x, y) = (5, 8) x: Int = 5 y: Int = 8 scala> x+=y scala> x res102: Int = 13 Array of integers, with println and index scala> val a = Array(1, 2, 3) a: Array[Int] = Array(1, 2, 3) scala> println(s"Sum is ${a(0) + a(1) + a(2)}") Sum is 6 Defining function has also similar syntax (ignore | character), (Int, Int) => (Int, Int) means function takes two integer argument and returns two integers. scala> def squareOfNumbers(x: Int, y: Int): (Int,Int) = {(x*x, y*y) | // for multiline you have to use curly {} brackets | } squareOfNumbers: (x: Int, y: Int)(Int, Int) scala> squareOfNumbers(2,3) res131: (Int, Int) = (4,9) Lambda function, if you will not mention datatype, Scala compiler will automatically decide it (inference). scala> (x:Int) => x+x res132: Int => Int = Int => Int means function takes integer return integer scala> val func: Int => Int = x => x + x func: Int => Int = scala> func(3) res133: Int = 6 Takes two integer and returns one integer, first _ for first input and so on.. scala> val underscoreFunc: (Int, Int) => Int = (_ * 3 + _ * 2) underscoreFunc: (Int, Int) => Int = scala> underscoreFunc(7, 5) res134: Int = 31 if-else statements, for example scala> x res139: Int = 5 scala> if (x==5) { println("five") } // curly braces not needed here but in case of multiline program five scala> println(if (x==4) println("Hello") else "Bye") Bye Loops, while, do-while and for loop scala> while (i<5) {println(i); i+=1} 0 1 2 3 4 scala> do {println(i); i-=1} while (i>0) 5 4 3 2 1 In Scala, <- is like a generator, read like x in range(1 to 5) similar to Python scala> for (x <- 1 to 5) println(x) 1 2 3 4 5 Pattern matching, for example; scala> def patternMatch (x: Int) :String = x match { | case 1 => "one" | case 2 => "two" | case _ => "unknown" | } patternMatch: (x: Int)String scala> patternMatch(2) res40: String = two scala> patternMatch(4) res41: String = unknown Classes can be defined like other languages, for example scala> class Dog(breed: String){ | var br: String = breed | def bark = "Woof woof!" | private def eat(food: String) = | println(s"I am eating $food") | } defined class Dog scala> val myDog = new Dog("pitbull") myDog: Dog = Dog@62882596 scala> myDog.br res155: String = pitbull scala> myDog.bark res156: String = Woof woof! Case classes, these will be useful while performing data operations, for example scala> case class Order(orderNum: Int, orderItem: String) defined class Order scala> val myOrder = Order(123, "iPhone") myOrder: Order = Order(123,iPhone) scala> val anotherOrder = Order(124, "macBook") anotherOrder: Order = Order(124, macBook) scala> myOrder.orderItem res158: String = iPhone Exercise For Spark, most of the time you will be writing lambda functions. I have hardly seen complex functions written to transform the data in Spark. Spark has built-in transformations which takes care of complex transformations which you will learn soon. For practice, try these examples. Example 1: Area of Circle scala> def areaCircle(radius:Double ) : Double = 3.14 * radius * radius areaCircle: (radius: Double)Double scala> areaCircle(5) res17: Double = 78.5 Example 2: Sum of Squares of input numbers scala> def sumOfSquares(x: Int, y:Int) : Int = x*x + y*y sumOfSquares: (x: Int, y: Int)Int scala> sumOfSquares(2,3) res18: Int = 13 Example 3: Reverse the Sign of input number scala> def reverseTheSign (x: Int) : Int = if (x>0) -x else -x reverseTheSign: (x: Int)Int scala> reverseTheSign(-6) res23: Int = 6 scala> reverseTheSign(6) res24: Int = -6 Example 4: Factorial of a number (to explain recursion), note how we are calling func within func; scala> def factorial (x: Int) :Int = if (x==1) x else factorial(x-1)*x factorial: (x: Int)Int scala> factorial(4) res26: Int = 24 Example 5: Defining objects and methods, you can define it like (ignore |) scala> object MyObject{ | val MyVal = 1 | def MyMethod = "Hello" | } defined object MyObject scala> MyObject.MyMethod res30: String = Hello for example; scala> object Foo {val x = 1} defined object Foo scala> object Bar {val x = 2} defined object Bar scala> object fooBar { | val y = Bar.x | } defined object fooBar scala> fooBar.y res31: Int = 2 Example 6: Sum of Squares using Lambda or anonymous func scala> val z = (x:Int, y:Int) => x*x + y*y z: (Int, Int) => Int = scala> z(2,3) res34: Int = 13 Example 7: Filtering the list with anonymous func scala> List(1,2,3,4,5,6).filter(x => x % 2 == 0) res39: List[Int] = List(2, 4, 6) Example 8: For loops with yield scala> for (x <- 1 to 5) yield x res42: scala.collection.immutable.IndexedSeq[Int] = Vector(1, 2, 3, 4, 5) scala> for (x <- 1 to 3; y <- Array("Hello","World")) yield (x, y) res47: scala.collection.immutable.IndexedSeq[(Int, String)] = Vector((1,Hello), (1,World), (2,Hello), (2,World), (3,Hello), (3,World)) That's all guys! If you have any question please mention in the comments section below. Thank you! Next: Hello with Eclipse Scala IDE Navigation menu 1. Apache Spark and Scala Installation 1.1 Spark installation on Windows​ 1.2 Spark installation on Mac 2. Getting Familiar with Scala IDE 2.1 Hello World with Scala IDE​ 3. Spark data structure basics 3.1 Spark RDD Transformations and Actions example 4. Spark Shell 4.1 Starting Spark shell with SparkContext example​ 5. Reading data files in Spark 5.1 SparkContext Parallelize and read textFile method 5.2 Loading JSON file using Spark Scala 5.3 Loading TEXT file using Spark Scala 5.4 How to convert RDD to dataframe? 6. Writing data files in Spark ​6.1 How to write single CSV file in Spark 7. Spark streaming 7.1 Word count example Scala 7.2 Analyzing Twitter texts 8. Sample Big Data Architecture with Apache Spark 9. What's Artificial Intelligence, Machine Learning, Deep Learning, Predictive Analytics, Data Science? 10. Spark Interview Questions and Answers

  • Spark Transformation example in Scala (Part 2)

    Main menu: Spark Scala Tutorial In this post I will walk you through groupByKey, reduceByKey, aggregateByKey, sortByKey, join, cartesian, coalesce and repartition Spark transformations. In previous blog we covered map, flatMap, mapPartitions, mapPartitionsWithIndex, filter, distinct, union, intersection and sample Spark transformations. I would encourage you all to go through these posts before this. Spark Transformation Examples in Scala (Part 1) Spark RDD, Transformations and Actions example groupByKey() As name says it groups the dataset (K, V) key-value pair based on Key and stores the value as Iterable, (K, V) => (K, Iterable(V)). It's very expensive operation and consumes lot of memory if dataset is huge. For example, scala> val rdd = sc.parallelize(List("Hello Hello Spark Apache Hello Dataneb Dataneb Dataneb Spark")) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at :24 scala> rdd.collect res3: Array[String] = Array(Hello Hello Spark Apache Hello Dataneb Dataneb Dataneb Spark) // Splitting the array and creating (K, V) pair scala> val keyValue = rdd.flatMap(words => words.split(" ")).map(x=>(x,1)) keyValue: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[16] at map at :25 // Iterable[Int] Value "1" tells number of occurrences of Key scala> keyValue.groupByKey.collect res12: Array[(String, Iterable[Int])] = Array((Spark,CompactBuffer(1, 1)), (Dataneb,CompactBuffer(1, 1, 1)), (Hello,CompactBuffer(1, 1, 1)), (Apache,CompactBuffer(1))) reduceByKey() Operates on (K, V) pair dataset, but reduce func must be of type (V, V) => V. For example, if you want to reduce all the values to get the total number of occurrences. scala> rdd .flatMap(words => words.split(" ")) .map(x=>(x,1)) .reduceByKey((x, y)=>x+y) .collect res14: Array[(String, Int)] = Array((Spark,2), (Dataneb,3), (Hello,3), (Apache,1)) aggregateByKey() It's similar to reduceByKey(), I hardly use this transformation because you can achieve the same with previous transformation. For example, scala> rdd .flatMap(words => words.split(" ")) .map(x=>(x,1)) .aggregateByKey(0)((x, y)=> x+y, (k, v)=> k+v ) .collect res24: Array[(String, Int)] = Array((Spark,2), (Dataneb,3), (Hello,3), (Apache,1)) sortByKey() Called upon key-value pair, returns sorted by keys. For example, scala> rdd .flatMap(words => words.split(" ")) .distinct .map(x => (x,1)) .sortByKey() -- by default Ascending .collect res36: Array[(String, Int)] = Array((Apache,1), (Dataneb,1), (Hello,1), (Spark,1)) scala> rdd .flatMap(words => words.split(" ")) .distinct .map(x => (x,1)) .sortByKey(false) -- Ascending order (false) .collect res37: Array[(String, Int)] = Array((Spark,1), (Hello,1), (Dataneb,1), (Apache,1)) join() It takes datasets of type key-value pair and works same like sql joins. For no match value will be None. For example, scala> val rdd1 = sc.parallelize(List("Apple","Orange", "Banana", "Grapes", "Strawberry", "Papaya")).map(words => (words,1)) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[96] at map at :24 scala> val rdd2 = sc.parallelize(List("Apple", "Grapes", "Peach", "Fruits")).map(words => (words,1)) rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[98] at map at :24 scala> rdd1.join(rdd2).collect res40: Array[(String, (Int, Int))] = Array((Grapes,(1,1)), (Apple,(1,1))) scala> rdd1.rightOuterJoin(rdd2).collect res41: Array[(String, (Option[Int], Int))] = Array((Grapes,(Some(1),1)), (Peach,(None,1)), (Apple,(Some(1),1)), (Fruits,(None,1))) scala> rdd1.leftOuterJoin(rdd2).collect res43: Array[(String, (Int, Option[Int]))] = Array((Grapes,(1,Some(1))), (Banana,(1,None)), (Papaya,(1,None)), (Orange,(1,None)), (Apple,(1,Some(1))), (Strawberry,(1,None))) scala> rdd1.fullOuterJoin(rdd2).collect res44: Array[(String, (Option[Int], Option[Int]))] = Array((Grapes,(Some(1),Some(1))), (Peach,(None,Some(1))), (Banana,(Some(1),None)), (Papaya,(Some(1),None)), (Orange,(Some(1),None)), (Apple,(Some(1),Some(1))), (Fruits,(None,Some(1))), (Strawberry,(Some(1),None))) cartesian() Same like cartesian product, return all possible pairs of elements of dataset. scala> val rdd1 = sc.parallelize(List("Apple","Orange", "Banana", "Grapes", "Strawberry", "Papaya")) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[111] at parallelize at :24 scala> val rdd2 = sc.parallelize(List("Apple", "Grapes", "Peach", "Fruits")) rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[112] at parallelize at :24 scala> rdd1.cartesian(rdd2).collect res46: Array[(String, String)] = Array((Apple,Apple), (Apple,Grapes), (Apple,Peach), (Apple,Fruits), (Orange,Apple), (Banana,Apple), (Orange,Grapes), (Banana,Grapes), (Orange,Peach), (Banana,Peach), (Orange,Fruits), (Banana,Fruits), (Grapes,Apple), (Grapes,Grapes), (Grapes,Peach), (Grapes,Fruits), (Strawberry,Apple), (Papaya,Apple), (Strawberry,Grapes), (Papaya,Grapes), (Strawberry,Peach), (Papaya,Peach), (Strawberry,Fruits), (Papaya,Fruits)) coalesce() coalesce and repartition both shuffles the data to increase or decrease the partition, but repartition is more costlier operation as it re-shuffles all data and creates new partition. For example, scala> val distData = sc.parallelize(1 to 16, 4) distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[128] at parallelize at :24 // current partition size scala> distData.partitions.size res63: Int = 4 // checking data across each partition scala> distData.mapPartitionsWithIndex((index, iter) => if (index == 0) iter else Iterator()).collect res64: Array[Int] = Array(1, 2, 3, 4) scala> distData.mapPartitionsWithIndex((index, iter) => if (index == 1) iter else Iterator()).collect res65: Array[Int] = Array(5, 6, 7, 8) scala> distData.mapPartitionsWithIndex((index, iter) => if (index == 2) iter else Iterator()).collect res66: Array[Int] = Array(9, 10, 11, 12) scala> distData.mapPartitionsWithIndex((index, iter) => if (index == 3) iter else Iterator()).collect res67: Array[Int] = Array(13, 14, 15, 16) // decreasing partitions to 2 scala> val coalData = distData.coalesce(2) coalData: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[133] at coalesce at :25 // see how shuffling occurred. Instead of moving all data it just moved 2 partitions. scala> coalData.mapPartitionsWithIndex((index, iter) => if (index == 0) iter else Iterator()).collect res68: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8) scala> coalData.mapPartitionsWithIndex((index, iter) => if (index == 1) iter else Iterator()).collect res69: Array[Int] = Array(9, 10, 11, 12, 13, 14, 15, 16) repartition() Notice how it re-shuffled everything to create new partitions as compared to previous RDDs - distData and coalData. Hence repartition is more costlier operation as compared to coalesce. scala> val repartData = distData.repartition(2) repartData: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[139] at repartition at :25 // checking data across each partition scala> repartData.mapPartitionsWithIndex((index, iter) => if (index == 0) iter else Iterator()).collect res70: Array[Int] = Array(1, 3, 6, 8, 9, 11, 13, 15) scala> repartData.mapPartitionsWithIndex((index, iter) => if (index == 1) iter else Iterator()).collect res71: Array[Int] = Array(2, 4, 5, 7, 10, 12, 14, 16) That's all folks. If you have any question please mention in comments section below. Thank you. Next: Spark RDD, Transformation and Actions Navigation menu ​ 1. Apache Spark and Scala Installation 1.1 Spark installation on Windows​ 1.2 Spark installation on Mac 2. Getting Familiar with Scala IDE 2.1 Hello World with Scala IDE​ 3. Spark data structure basics 3.1 Spark RDD Transformations and Actions example 4. Spark Shell 4.1 Starting Spark shell with SparkContext example​ 5. Reading data files in Spark 5.1 SparkContext Parallelize and read textFile method 5.2 Loading JSON file using Spark Scala 5.3 Loading TEXT file using Spark Scala 5.4 How to convert RDD to dataframe? 6. Writing data files in Spark ​6.1 How to write single CSV file in Spark 7. Spark streaming 7.1 Word count example Scala 7.2 Analyzing Twitter texts 8. Sample Big Data Architecture with Apache Spark 9. What's Artificial Intelligence, Machine Learning, Deep Learning, Predictive Analytics, Data Science? 10. Spark Interview Questions and Answers

  • Spark Transformations example (Part 1)

    Apache Spark transformations like Spark reduceByKey, groupByKey, mapPartitions, map etc are very widely used. Apart from these transformations there are several others, I will explain each of them with examples. But before I proceed with Spark transformation examples, if you are new to Spark and Scala I would highly encourage you to go through this post - Spark RDD, Transformation and Actions example. Main menu: Spark Scala Tutorial We will be using lambda functions to pass through most of these Spark transformations. So those who are new to Scala should have basic understanding of lambda functions. Lambda Functions In brief, lambda functions are like normal functions except the fact that they are more convenient when we have to use functions just in one place so that you don't need to worry about defining functions separately. For example, if you want to double the number you can simply write; x => x + x like you do in Python and other languages. Syntax in Scala would be like this, scala> val lfunc = (x:Int) => x + x lfunc: Int => Int = scala> lfunc(3) res0: Int = 6 Sample Data We will be using "Where is the Mount Everest?" text data which we created in earlier post (SparkContext and text files). I just picked some random data to go through these examples. Where is Mount Everest? (MountEverest.txt) Mount Everest (Nepali: Sagarmatha सगरमाथा; Tibetan: Chomolungma ཇོ་མོ་གླང་མ; Chinese Zhumulangma 珠穆朗玛) is Earth's highest mountain above sea level, located in the Mahalangur Himal sub-range of the Himalayas. The international border between Nepal (Province No. 1) and China (Tibet Autonomous Region) runs across its summit point. - Reference Wikipedia scala> val mountEverest = sc.textFile("/Users/Rajput/Documents/testdata/MountEverest.txt") mountEverestRDD: org.apache.spark.rdd.RDD[String] = /Users/Rajput/Documents/testdata/MountEverest.txt MapPartitionsRDD[1] at textFile at :24 Spark Transformations I encourage you all to run these examples on Spark-shell side-by-side. map(func) This transformation redistributes the data after passing each element through func. For example, if you want to split the Mount Everest text into individual words, you just need to pass this lambda func x => x.split(" ") and it will create a new RDD as shown below. What is this func doing? It's just reading each element and splitting on the basis of space character. scala> val words = mountEverest.map(x => x.split(" ")) words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[3] at map at :25 scala> words.collect() res1: Array[Array[String]] = Array(Array(Mount, Everest, (Nepali:, Sagarmatha, सगरमाथा;, Tibetan:, Chomolungma, ཇོ་མོ་གླང་མ;, Chinese, Zhumulangma, 珠穆朗玛), is, Earth's, highest, mountain, above, sea, level,, located, in, the, Mahalangur, Himal, sub-range, of, the, Himalayas., The, international, border, between, Nepal, (Province, No., 1), and, China, (Tibet, Autonomous, Region), runs, across, its, summit, point.)) Don't worry about collect() action, it's very basic Spark action which is used to return all the element. Now, suppose you want to get the word count in this text file, you can do something like this - first split the file and then get the length or size. scala> mountEverest.map(x => x.split(" ").length).collect() res6: Array[Int] = Array(45) scala> mountEverest.map(x => x.split(" ").size).collect() res7: Array[Int] = Array(45) Lets say you want to get total number of characters in file, you can do this. scala> mountEverest.map(x => x.length).collect() res5: Array[Int] = Array(329) Making all text upper case, you can do it like this. scala> mountEverest.map(x => x.toUpperCase()).collect() res9: Array[String] = Array(MOUNT EVEREST (NEPALI: SAGARMATHA सगरमाथा; TIBETAN: CHOMOLUNGMA ཇོ་མོ་གླང་མ; CHINESE ZHUMULANGMA 珠穆朗玛) IS EARTH'S HIGHEST MOUNTAIN ABOVE SEA LEVEL, LOCATED IN THE MAHALANGUR HIMAL SUB-RANGE OF THE HIMALAYAS. THE INTERNATIONAL BORDER BETWEEN NEPAL (PROVINCE NO. 1) AND CHINA (TIBET AUTONOMOUS REGION) RUNS ACROSS ITS SUMMIT POINT.) flatmap(func) This is also similar to map, except the fact that it gives you more flattened output. For example, scala> val rdd = sc.parallelize(Seq("Where is Mount Everest","Himalayas India")) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at :24 scala> rdd.collect res26: Array[String] = Array(Where is Mount Everest, Himalayas India) scala> rdd.map(x => x.split(" ")).collect res21: Array[Array[String]] = Array(Array(Where, is, Mount, Everest), Array(Himalayas, India)) scala> rdd.flatMap(x => x.split(" ")).collect res23: Array[String] = Array(Where, is, Mount, Everest, Himalayas, India) In above case we have two elements in rdd - "Where is Mount Everest" and second "Himalayas India". When map() transformation is applied, it returned array of array string Array[Array[String]]. It has basically two separate array of strings within an array. So for each element we got one output (1st element => 1 element (Where, is, Mount, Everest), 2nd element => 1 element (Himalayas, India)). And those individual elements are collection of words separated by comma. But if you see flatMap(), output is flattened to single array of string Array[String]. Thus flatMap() is similar to map, but each input item is mapped to 0 or more output items (1st element => 4 elements, 2nd element => 2 elements). This will give you clear picture, scala> rdd.map(x => x.split(" ")).count() res24: Long = 2 scala> rdd.flatMap(x => x.split(" ")).count() res25: Long = 6 map() => [Where is Mount Everest, Himalayas India] => [[Where, is, Mount, Everest],[Himalayas, India]] flatMap() => [Where is Mount Everest, Himalayas India] => [Where, is, Mount, Everest, Himalayas, India] filter(func) As name says it is used to filter elements same like where clause in SQL and it is case sensitive. For example, // returns one element which contains match scala> rdd.filter(x=>x.contains("Himalayas")).collect res31: Array[String] = Array(Himalayas India) // No match scala> rdd.filter(x=>x.contains("Dataneb")).collect res32: Array[String] = Array() // Case sensitive scala> rdd.filter(x=>x.contains("himalayas")).collect res33: Array[String] = Array() scala> rdd.filter(x=>x.toLowerCase.contains("himalayas")).collect res37: Array[String] = Array(Himalayas India) mapPartitions(func) Similar to map() transformation but in this case function runs separately on each partition (block) of RDD unlike map() where it was running on each element of partition. Hence mapPartitions are also useful when you are looking for performance gain (calls your function once/partition not once/element). Suppose you have elements from 1 to 100 distributed among 10 partitions i.e. 10 elements/partition. map() transformation will call func 100 times to process these 100 elements but in case of mapPartitions(), func will be called once/partition i.e. 10 times. Secondly, mapPartitions() holds the data in-memory i.e. it will store the result in memory until all the elements of the partition has been processed. mapPartitions() will return the result only after it finishes processing of whole partition. mapPartitions() requires an iterator input unlike map() transformation. What is an iterator? (for new programmers) - An iterator is a way to access collection of elements one-by-one, its similar to collection of elements like List(), Array(), Dict() etc in few ways but the difference is that iterator doesn't load the whole collection of elements in memory at together. Instead iterator loads elements one after another. In Scala you access these elements with hasNext and Next operation. For example, scala> sc.parallelize(1 to 9, 3).map(x=>(x, "Hello")).collect res3: Array[(Int, String)] = Array((1,Hello), (2,Hello), (3,Hello), (4,Hello), (5,Hello), (6,Hello), (7,Hello), (8,Hello), (9,Hello)) scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(Array("Hello").iterator)).collect res7: Array[String] = Array(Hello, Hello, Hello) scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next).iterator)).collect res11: Array[Int] = Array(1, 4, 7) In first example, I have applied map() transformation on dataset distributed between 3 partitions so that you can see function is called 9 times. In second example, when we applied mapPartitions(), you will notice it ran 3 times i.e. for each partition once. We had to convert string "Hello" into iterator because mapPartitions() takes iterator. In thirds step, I tried to get the iterator value to showcase the dataset. Note that next is always increasing value, so you can't step back. See this, scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next,x.next, "|").iterator)).collect res18: Array[Any] = Array(1, 2, |, 4, 5, |, 7, 8, |) In first call next value for partition 1 changed from 1 => 2 , for partition 2 it changed from 4 => 5 and similarly for partition 3 it changed from 7 => 8. You can keep this increasing until hasNext is False (hasNext is a property of iteration which tells you whether collection has ended or not, it returns you True or False based on items left in the collection). For example, scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next, x.hasNext).iterator)).collect res19: Array[AnyVal] = Array(1, true, 4, true, 7, true) You can see hasNext is true because there are elements left in each partition. Now suppose we access all three elements from each partition, then hasNext will result false. For example, scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next, x.next, x.next, x.hasNext).iterator)).collect res20: Array[AnyVal] = Array(1, 2, 3, false, 4, 5, 6, false, 7, 8, 9, false) Just for our understanding, if you will try to access next 4th time, you will get error which is expected - scala> sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next, x.next, x.next, x.next, x.hasNext).iterator)).collect 19/07/31 11:14:42 ERROR Executor: Exception in task 1.0 in stage 18.0 (TID 56) java.util.NoSuchElementException: next on empty iterator Think, map() transformation as special case of mapPartitions() where you have just 1 element in each partition. Isn't it? mapPartitionsWithIndex(func) Similar to mapPartitions, but good part is you have index to see the partition position. For example, scala> val mp = sc.parallelize(List("One","Two","Three","Four","Five","Six","Seven","Eight","Nine"), 3) mp: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[38] at parallelize at :24 scala> mp.collect res23: Array[String] = Array(One, Two, Three, Four, Five, Six, Seven, Eight, Nine) scala> mp.mapPartitionsWithIndex((index, iterator) => {iterator.toList.map(x => x + "=>" + index ).iterator} ).collect res26: Array[String] = Array(One=>0, Two=>0, Three=>0, Four=>1, Five=>1, Six=>1, Seven=>2, Eight=>2, Nine=>2) Index 0 (first partition) has three values as expected, similarly other 2 partitions. If you have any question please mention it in comments section at the end of this blog. sample() Generates a fraction RDD from an input RDD. Note that second argument fraction doesn't represent the fraction of actual RDD. It actually tells the probability of each element in the dataset getting selected for the sample. Seed is optional. First boolean argument decides type of sampling algorithm. For example, scala> sc.parallelize(1 to 10).sample(true, .4).collect res103: Array[Int] = Array(4) scala> sc.parallelize(1 to 10).sample(true, .4).collect res104: Array[Int] = Array(1, 4, 6, 6, 6, 9) // Here you can see fraction 0.2 doesn't represent fraction of rdd, 4 elements selected out of 10. scala> sc.parallelize(1 to 10).sample(true, .2).collect res109: Array[Int] = Array(2, 4, 7, 10) // fraction set to 1 which is the max value (probability 0 to 1), so each element got selected. scala> sc.parallelize(1 to 10).sample(false, 1).collect res111: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) union() Similar to SQL union, except the fact that it keeps duplicate data. scala> val rdd1 = sc.parallelize(List("apple","orange","grapes","mango","orange")) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[159] at parallelize at :24 scala> val rdd2 = sc.parallelize(List("red","green","yellow")) rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[160] at parallelize at :24 scala> rdd1.union(rdd2).collect res116: Array[String] = Array(apple, orange, grapes, mango, orange, red, green, yellow) scala> rdd2.union(rdd1).collect res117: Array[String] = Array(red, green, yellow, apple, orange, grapes, mango, orange) intersection() Returns intersection of two datasets. For example, scala> val rdd1 = sc.parallelize(-5 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[171] at parallelize at :24 scala> val rdd2 = sc.parallelize(1 to 10) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[172] at parallelize at :24 scala> rdd1.intersection(rdd2).collect res119: Array[Int] = Array(4, 1, 5, 2, 3) distinct() Returns new dataset with distinct elements. For example, we don't have duplicate orange now. scala> val rdd = sc.parallelize(List("apple","orange","grapes","mango","orange")) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[186] at parallelize at :24 scala> rdd.distinct.collect res121: Array[String] = Array(grapes, orange, apple, mango) That's all guys, please refer next post for next set of transformations in Spark. Next: Spark Transformations (Part 2) Navigation menu ​ 1. Apache Spark and Scala Installation 1.1 Spark installation on Windows​ 1.2 Spark installation on Mac 2. Getting Familiar with Scala IDE 2.1 Hello World with Scala IDE​ 3. Spark data structure basics 3.1 Spark RDD Transformations and Actions example 4. Spark Shell 4.1 Starting Spark shell with SparkContext example​ 5. Reading data files in Spark 5.1 SparkContext Parallelize and read textFile method 5.2 Loading JSON file using Spark Scala 5.3 Loading TEXT file using Spark Scala 5.4 How to convert RDD to dataframe? 6. Writing data files in Spark ​6.1 How to write single CSV file in Spark 7. Spark streaming 7.1 Word count example Scala 7.2 Analyzing Twitter texts 8. Sample Big Data Architecture with Apache Spark 9. What's Artificial Intelligence, Machine Learning, Deep Learning, Predictive Analytics, Data Science? 10. Spark Interview Questions and Answers

bottom of page