Updated: Sep 6, 2019
There are many technologies available today which provides real time data ingestion (refer my previous blog). Apache Kafka is one of my favorites because of its distributed streaming platform. What exactly does that mean?
Basically it can act as "publisher-subscriber" data streaming platform.
It can process data streams as they occur.
Lastly, it can store data stream in a fault-tolerant durable way.
Kafka can run as a cluster on one or more servers that can span multiple data-centers. The Kafka cluster stores streams of records in categories called "topics". For instance, if I have data streams coming from Twitter (refer blog), you can name this topic as "Tweets".
We will learn how to define these topics and how you can access pub-sub model. Kafka has basically four core APIs. Each record consists of a key, a value, and a timestamp.
Streams API & Connector API
Whats the need of Zookeeper with Kafka?
As each data stream in Kafka consist of a key, a value and a timestamp, we need someone to manage it's key-value pair and synchronicity. Zookeeper is essentially a centralized service for distributed systems to a hierarchical key-value store, which is used to provide configuration information, naming, providing synchronization service, and providing group services.
Apache Kafka package installer comes with inbuilt Zookeeper but in production environment where we have multiple nodes, people usually install Zookeeper separately. I will explain you both the ways to run Kafka: one with inbuilt Zookeeper and another with separately installed Zookeeper.
Before we start Kafka installation. I hope you all have Java installed on your machine, if not please refer to my previous blog.
Once you download it on your local machine, move it to your Linux environment where you want to run Kafka. I use MobaXterm (open source tool) to transfer the file from my windows machine to Linux environment (Red Hat Linux client without GUI in this case).
Navigate to the directory where you transferred .tgz file and untar the file. For instance I have kept it in /var folder:
Now remove the .tgz file, not a necessary step but in order to save some space.
Rename the folder for your convenience.
mv kafka_2.11-1.1.0 kafka
Follow similar steps to download Zookeeper latest release from this link.
Move the .gz file to your Linux machine (/var folder in my case) and perform the below steps:
Run the below commands to untar the file and configure the .conf file.
tar -xvf zookeeper-3.4.11.tar
mv zookeeper-3.4.11 zookeeper
Now let's setup the configuration file.
cp zoo_sample.cfg zoo.cfg
Your configuration file will look something like below. You can change dataDir, if you don't want to depend upon your /tmp directory. If server reboots due to some infrastructure issues you might lose /tmp data, hence people usually don't rely on this. Or you can simply leave it as it is.
At this point you are done with Zookeeper setup. Now we will start the Zookeeper as it is needed for Kafka. There are basically 2 ways to do this.
First, like I said Kafka comes with inbuilt Zookeeper program (you can find Zookeeper files under /kafka/bin directory). So either you can start the Zookeeper which comes with Kafka or you can run Zookeeper separately. For this you can navigate to your Kafka directory and run the below command.
Or you can start Zookeeper which you just installed by running below command in /var/zookeeper directory.
You will get output something like this.
Go to your Kafka directory and execute the below command:
You will find lots of events being generated and screen getting stuck at one point. Keep this terminal open and open a new terminal to verify if Zookeeper and Kafka services are running fine.
Type the jps command to check active java process status. QuorunPeerMain basically shows our Zookeeper process 12050 & 11021 is our Kafka process. Process id might vary for you.
Once you close that terminal Kafka service will stop. Another way is to run these services in background with "nohup", like;
nohup bin/zookeeper-server-start.sh config/zookeeper.properties
nohup bin/kafka-server-start.sh config/server.properties
Create Kafka Topic
As discussed earlier, Kafka cluster stores streams of records in categories called "topics". Lets create a topic called "Tweets". To do this run the below command in your Kafka directory.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Tweets
You will get a prompt saying - Created topic "Tweets". You can also see the list of topics by running below command.
bin/kafka-topics.sh --list --zookeeper localhost:2181
Running Kafka Producer and Consumer
Run the below command in order to start Producer API. You need to tell Kafka which topic you want to start and on which port. Check /config/server.properties in Kafka directory for details. For example I am running it for Tweets on port 9092:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Tweets
Now open a "new terminal" and lets run the Consumer API. Make sure you are entering the correct topic name. Port will be same where your Zookeeper is running.
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic Tweets --from-beginning
Now go back to Producer API terminal and type anything, hit enter. Same message will be shown on your Consumer API like below;
Thank you!! If you have any question please write in comments section below.
Learn Apache Spark in 7 days, start today!
Main menu: Spark Scala Tutorial
1. Apache Spark and Scala Installation
2. Getting Familiar with Scala IDE
3. Spark data structure basics
4. Spark Shell
5. Reading data files in Spark
6. Writing data files in Spark
7. Spark streaming
10. Spark Interview Questions and Answers