Kafka Producer and Consumer example (in Java)

Updated: Sep 14, 2020

In this Kafka pub sub example you will learn,

  • Kafka producer components (producer api, serializer and partition strategy)

  • Kafka producer architecture

  • Kafka producer send method (fire and forget, sync and async types)

  • Kafka producer config (connection properties) example

  • Kafka producer example

  • Kafka consumer example


Prerequisite - refer my previous post on Apache Kafka Overview (Windows).


Apache Kafka is one of the client for Kafka broker. It publishes message to kafka topic. Messages are serialized before transferring it over the network.



Kafka Producer Components


Producers API

  • Kafka provides a collection of producer APIs to publish the message to the topic.

  • The messaging can be optimize by setting different parameters.

  • Developer can decide to publish the message on a particular partition by providing custom key for the message.



Serializer

  • Serializer serializes the message to pass over the network. Default or custom serializer can be set by developer to serialize the message. Below are the String Serializer. value.serializer=org.apache.kafka.common.serialization.StringSerializer key.serializer=org.apache.kafka.common.serialization.StringSerializer

  • The serialize are set for the value for the key and value both.


Partitioner

  • This component apply the hashing algorithm and finds the partition for the message, if keys are provided.

  • If Key for the message is not provided by developer then it uses the round-robin algorithm to assign the the topic for the message.

Kafka Producer Architecture



Kafka Producer Send Methods


Fire and Forget


Producer does not care for the message arrives at destination or not.


ProducerRecord<String, String> data = new ProducerRecord<String, String> ("topic", key, message );

producer.send(data);



Synchronous Send


Send() method returns future object and developer can use the get() method on future object to know the status of message.


ProducerRecord<String, String> data = new ProducerRecord<String, String> ("topic", key, message );

producer.send(data).get();



Asynchronous Send


Developers can use the send() with a callback function which is called once broker send the response back to the producer.


TestCallback callback = new TestCallback();

ProducerRecord<String, String> data = new ProducerRecord<String, String> ("topic", key, message );

producer.send(data, callback);


private static class TestCallback implements Callback {

public void onCompletion(RecordMetadata recordMetadata, Exception e) {

if (e != null) {

System.out.println("Error while producing message to topic :" + recordMetadata);

e.printStackTrace();

} else {

String message = String.format("sent message to topic:%s partition:%s offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());

System.out.println(message);

}

}

}



Producer Configuration