top of page
BlogPageTop

Kafka Consumer Advance (Java example)

Prerequisite

 

Commits and Offset in Kafka Consumer


Once client commits the message, Kafka marks the message "deleted" for the consumer and hence the read message would be available in next poll by the client.


Properties used in the below example


bootstrap.servers=localhost:9092

ProducerConfig.RETRIES_CONFIG=0

value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

retries=0

group.id=group1

HQ_TOPIC_NAME=EK.TWEETS.TOPIC

CONSUMER_TIMEOUT=1000

worker.thread.count=5

counsumer.count=3

auto.offset.reset=earliest

enable.auto.commit=false



Configuration Level Setting


This can be done at configuration level in the properties files.

  • auto.commit.offset=false - This is the default setting. Means the consumer API can take the decision to retail the message of the offset or commit it.

  • auto.commit.offset=true - Once the message is consumed by the consumer, the offset is committed if consumer API is not taking any decision in client code.


 

Consumer API Level Setting


Synchronous Commit

  • Offset is committed as soon consumer API confirms.

  • The latest Offset of the message is committed.

  • Below example is committing the message after processing all messages of the current polling.

  • Synchronous commit blocks until the broker responds to the commit request.


Sample Code


public synchronized void subscribeMessage(String configPropsFile)throws Exception{

try{

if(consumer==null){

consumer =(KafkaConsumer<byte[], byte[]>) getKafkaConnection(configPropsFile);

System.out.println("Kafka Connection created...on TOPIC : "+getTopicName());

}

consumer.subscribe(Collections.singletonList(getTopicName()));

while (true) {

ConsumerRecords<byte[], byte[]> records = consumer.poll(10000L);

for (ConsumerRecord<byte[], byte[]> record : records) {

System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());

}

consumer.commitSync();

}

}catch(Exception e){

e.printStackTrace();

consumer.close();

}

}


 

Asynchronous Commit

  • The consumer does not wait for the the response from the broker

  • This commits just confirms the broker and continue its processing.

  • Throughput is more in compare to Synchronous commit.

  • There could be chances of duplicate read, that application need to handle its own.


Sample code


while (true) {

ConsumerRecords<byte[], byte[]> records = consumer.poll(10000L);

System.out.println("Number of messaged polled by consumer "+records.count());

for (ConsumerRecord<byte[], byte[]> record : records) {

System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());

}

consumer.commitAsync(new OffsetCommitCallback() {

public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,

Exception exception) {

if (exception != null){

System.out.printf("Commit failed for offsets {}", offsets, exception);

}else{

System.out.println("Messages are Committed Asynchronously...");

}

}});

}


 

Offset Level Commit

  • Sometime application may need to commit the offset on read of particular offset.

Sample Code


Map<TopicPartition, OffsetAndMetadata> currentOffsets =new HashMap<TopicPartition, OffsetAndMetadata>();

while (true) {

ConsumerRecords<byte[], byte[]> records = consumer.poll(1000L);

for (ConsumerRecord<byte[], byte[]> record : records) {

System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());

currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new

OffsetAndMetadata(record.offset()+1, "no metadata"));

if(record.offset()==18098){

consumer.commitAsync(currentOffsets, null);

}

}

}


 

Retention of Message

  • Kafka retains the message till the retention period defined in the configuration.

  • It can be defined at broker level or at topic level.

  • Retention of message can be on time basis or byte basis for the topic.

  • Retention defined on Topic level override the retention defined at broker level.

  • retention.bytes - The amount of messages, in bytes, to retain for this topic.

  • retention.ms - How long messages should be retained for this topic, in milliseconds.


1. Defining retention at topic level

  • Retention for the topic named “test-topic” to 1 hour (3,600,000 ms):

# kafka-configs.sh --zookeeper localhost:2181/kafka-cluster --alter --entity-type topics --entity-name test-topic --add-config retention.ms=3600000


2. Defining retention at broker level


Define one of the below properties in server.properties

  • # Configures retention time in milliseconds => log.retention.ms=1680000

  • # Configures retention time in minutes => log.retention.minutes=1680

  • # Configures retention time in hours => log.retention.hours=168


 

Fetching Message From A Specific Offset

  • Consumer can go down before committing the message and subsequently there can be message loss.

  • Since Kafka broker has capability to retain the message for long time.

  • Consumer can point to specific offset to get the message.

  • Consumer can go back from current offset to particular offset or can start polling the message from beginning.



Sample Code


Map<TopicPartition, OffsetAndMetadata> currentOffsets =new HashMap<TopicPartition, OffsetAndMetadata>();

public synchronized void subscribeMessage(String configPropsFile)throws Exception{

try{

if(consumer==null){

consumer =(KafkaConsumer<byte[], byte[]>) getKafkaConnection(configPropsFile);

System.out.println("Kafka Connection created...on TOPIC : "+getTopicName());

}

TopicPartition topicPartition = new TopicPartition(getTopicName(), 0);

List<TopicPartition> topics = Arrays.asList(topicPartition);

consumer.assign(topics);

consumer.seekToEnd(topics);

long current = consumer.position(topicPartition);

consumer.seek(topicPartition, current-10);

System.out.println("Topic partitions are "+consumer.assignment());

while (true) {

ConsumerRecords<byte[], byte[]> records = consumer.poll(10000L);

System.out.println("Number of record polled "+records.count());

for (ConsumerRecord<byte[], byte[]> record : records) {

System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());

currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new

OffsetAndMetadata(record.offset()+1, "no metadata"));

}

consumer.commitAsync(currentOffsets, null);

}

}catch(Exception e){ e.printStackTrace();

consumer.close();

}

}



Thank you. If you have any doubt please feel free to post your questions in comments section below.


[23/09/2019 04:38 PM CST - Reviewed by: PriSin]

Comments


Want to share your thoughts about this blog?

Disclaimer: Please note that the information provided on this website is for general informational purposes only and should not be taken as legal advice. Dataneb is a platform for individuals to share their personal experiences with visa and immigration processes, and their views and opinions may not necessarily reflect those of the website owners or administrators. 

 

While we strive to keep the information up-to-date and accurate, we make no representations or warranties of any kind, express or implied, about the completeness, accuracy, reliability, suitability, or availability with respect to the website or the information, products, services, or related graphics contained on the website for any purpose. Any reliance you place on such information is therefore strictly at your own risk. 

 

We strongly advise that you consult with a qualified immigration attorney or official government agencies for any specific questions or concerns related to your individual situation. We are not responsible for any losses, damages, or legal disputes arising from the use of information provided on this website. 

 

By using this website, you acknowledge and agree to the above disclaimer and Google's Terms of Use and Privacy Policy.

bottom of page