top of page
BlogPageTop

Kafka Producer and Consumer example (in Java)

In this Kafka pub sub example you will learn,

  • Kafka producer components (producer api, serializer and partition strategy)

  • Kafka producer architecture

  • Kafka producer send method (fire and forget, sync and async types)

  • Kafka producer config (connection properties) example

  • Kafka producer example

  • Kafka consumer example


Prerequisite - refer my previous post on Apache Kafka Overview (Windows).


Apache Kafka is one of the client for Kafka broker. It publishes message to kafka topic. Messages are serialized before transferring it over the network.


 

Kafka Producer Components


Producers API

  • Kafka provides a collection of producer APIs to publish the message to the topic.

  • The messaging can be optimize by setting different parameters.

  • Developer can decide to publish the message on a particular partition by providing custom key for the message.



Serializer

  • Serializer serializes the message to pass over the network. Default or custom serializer can be set by developer to serialize the message. Below are the String Serializer. value.serializer=org.apache.kafka.common.serialization.StringSerializer key.serializer=org.apache.kafka.common.serialization.StringSerializer

  • The serialize are set for the value for the key and value both.


Partitioner

  • This component apply the hashing algorithm and finds the partition for the message, if keys are provided.

  • If Key for the message is not provided by developer then it uses the round-robin algorithm to assign the the topic for the message.

Kafka Producer Architecture


 

Kafka Producer Send Methods


Fire and Forget


Producer does not care for the message arrives at destination or not.


ProducerRecord<String, String> data = new ProducerRecord<String, String> ("topic", key, message );

producer.send(data);



Synchronous Send


Send() method returns future object and developer can use the get() method on future object to know the status of message.


ProducerRecord<String, String> data = new ProducerRecord<String, String> ("topic", key, message );

producer.send(data).get();



Asynchronous Send


Developers can use the send() with a callback function which is called once broker send the response back to the producer.


TestCallback callback = new TestCallback();

ProducerRecord<String, String> data = new ProducerRecord<String, String> ("topic", key, message );

producer.send(data, callback);


private static class TestCallback implements Callback {

public void onCompletion(RecordMetadata recordMetadata, Exception e) {

if (e != null) {

System.out.println("Error while producing message to topic :" + recordMetadata);

e.printStackTrace();

} else {

String message = String.format("sent message to topic:%s partition:%s offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());

System.out.println(message);

}

}

}


 

Producer Configuration


bootstrap.servers=localhost:9092

acks=all

ProducerConfig.RETRIES_CONFIG=0

value.serializer=org.apache.kafka.common.serialization.StringSerializer

key.serializer=org.apache.kafka.common.serialization.StringSerializer

retries=2

batch.size=32768

linger.ms=5

buffer.memory=33554432

max.block.ms=60000


 

Kafka Producer Example


Step-1: Start Zookeeper


Step-2: Start Kafka broker and create a topic TEST.TOPIC


Step-3: Create a Java project


Step-4: Create a properties file - kconnection.properties


bootstrap.servers=localhost:9092

acks=all

ProducerConfig.RETRIES_CONFIG=0

value.serializer=org.apache.kafka.common.serialization.StringSerializer

key.serializer=org.apache.kafka.common.serialization.StringSerializer

retries=2

TOPIC_NAME=TEST.TOPIC

batch.size=32768

linger.ms=5

buffer.memory=33554432

max.block.ms=60000



Step-5: KafkaConnection.java


package com.demo.twitter.util;


import java.io.InputStream;

import java.util.HashMap;

import java.util.Map;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

public class KafkaConnection {


static Properties props=null;

private static Properties loadConPropsFromClasspath() throws Exception {

if(props==null){

InputStream stream = KafkaConnection.class.getResourceAsStream("kconnection.properties");

props = new Properties();

props.load(stream);

stream.close();

System.out.println("Configuration "+props);

}

return props;

}

public static Producer<String, String> getKafkaConnection()throws Exception{

Properties props=loadConPropsFromClasspath();

Producer<String, String> producer = new KafkaProducer<String, String>(props);

return producer;

}

public static String getTopicName() throws Exception{

if(props!=null){

return props.getProperty(IKafkaSourceConstant.TOPIC_NAME);

}else{

return null;

}

}

}



Step-6: KafkaProducerClient.java


package com.demo.client.producer;


import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;


import com.demo.twitter.util.KafkaConnection;


public class KafkaProducerClient {


static KafkaProducer<String, String> producer=null;

/**

* @param args

* @throws Exception

*/

public static void main(String[] args) throws Exception {

try{

KafkaProducerClient pclient=new KafkaProducerClient();

long i = 1;

for (; i <=10 ; i++) {

KafkaProducerClient.sendMessage(""+i, "Hello This is test message ..Demo"+i);

}

System.out.println("Number of message sent "+(i-1));

pclient.closeProducer();

}catch(Exception e){

e.printStackTrace();

}

}


public static void sendMessage(String key,String message)throws Exception{

try{

if(producer==null){

producer =(KafkaProducer<String, String>) KafkaConnection.getKafkaConnection();

System.out.println("Kafka Connection created for topic.. demo"+KafkaConnection.getTopicName());

}

TestCallback callback = new TestCallback();

long startTime=System.currentTimeMillis();

ProducerRecord<String, String> data = new ProducerRecord<String, String>(KafkaConnection.getTopicName(), key, message );

producer.send(data, callback);

System.out.println("Total Time:---> "+Long.valueOf(System.currentTimeMillis()-startTime));

}catch(Exception e){

e.printStackTrace();

producer.close();

}

}


public void closeProducer(){

try{

producer.close();

}catch(Exception e){

e.printStackTrace();

}

}

private static class TestCallback implements Callback {

public void onCompletion(RecordMetadata recordMetadata, Exception e) {

if (e != null) {

System.out.println("Error while producing message to topic :" + recordMetadata);

e.printStackTrace();

} else {

String message = String.format("sent message to topic:%s partition:%s offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());

System.out.println(message);

}

}

}

}


 

Apache Kafka Consumer Example


Continue in the same project.


Step-1: Create a properties file: kconsumer.properties with below contents

bootstrap.servers=localhost:9092

acks=all

ProducerConfig.RETRIES_CONFIG=0

value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

retries=0

group.id=group1

TOPIC_NAME=TEST.TOPIC

CONSUMER_TIMEOUT=1000

worker.thread.count=5

counsumer.count=3



Step-2: Create KafkaConsumerClient.java


package com.demo.kafka.consumer;

import java.io.InputStream;

import java.util.Collections;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import com.demo.twitter.util.KafkaConnection;


public class KafkaConsumerClient {

Properties props=null;

KafkaConsumer<byte[], byte[]> consumer =null;

public static void main(String[] args) {

KafkaConsumerClient conClient=new KafkaConsumerClient();

try {

conClient.subscribeMessage("kconsumer.properties");

} catch (Exception e) {

e.printStackTrace();

}

}

public synchronized void subscribeMessage(String configPropsFile)throws Exception{

try{

//Common for below two approach

if(consumer==null){

consumer =(KafkaConsumer<byte[], byte[]>) getKafkaConnection(configPropsFile);

}

consumer.subscribe(Collections.singletonList(getTopicName()));

while (true) {

ConsumerRecords<byte[], byte[]> records = consumer.poll(1000L);

for (ConsumerRecord<byte[], byte[]> record : records) {

System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());

}

consumer.commitSync();

}

}catch(Exception e){

e.printStackTrace();

consumer.close();

}

}


public KafkaConsumer<byte[], byte[]> getKafkaConnection(String fileName)throws Exception{

if(props==null){

props=loadConPropsFromClasspath(fileName);

System.out.println(props);

}

KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);

return consumer;

}


private Properties loadConPropsFromClasspath(String fileName) throws Exception {

if(props==null){

InputStream stream = KafkaConnection.class.getResourceAsStream(fileName);

props = new Properties();

props.load(stream);

stream.close();

System.out.println("Configuration "+props);

}

return props;

}


public String getTopicName() throws Exception{

if(props!=null){

return props.getProperty("TOPIC_NAME");

}else{

return null;

}

}

}


Thank you. If you have any question please write in comments section below.


[09/09/2019 10:38 PM CST - Reviewed by: PriSin]

2 Comments


prateek18ju
Sep 10, 2019

Very helpful

Like

Dataneb Team
Dataneb Team
Sep 10, 2019

Well written! 👏

Like

Want to share your thoughts about this blog?

Disclaimer: Please note that the information provided on this website is for general informational purposes only and should not be taken as legal advice. Dataneb is a platform for individuals to share their personal experiences with visa and immigration processes, and their views and opinions may not necessarily reflect those of the website owners or administrators. While we strive to keep the information up-to-date and accurate, we make no representations or warranties of any kind, express or implied, about the completeness, accuracy, reliability, suitability, or availability with respect to the website or the information, products, services, or related graphics contained on the website for any purpose. Any reliance you place on such information is therefore strictly at your own risk. We strongly advise that you consult with a qualified immigration attorney or official government agencies for any specific questions or concerns related to your individual situation. We are not responsible for any losses, damages, or legal disputes arising from the use of information provided on this website. By using this website, you acknowledge and agree to the above disclaimer and Google's Terms of Use (https://policies.google.com/terms) and Privacy Policy (https://policies.google.com/privacy).

bottom of page