Kafka Consumer Advance (Java example)
Updated: Nov 18, 2022
Prerequisite
Commits and Offset in Kafka Consumer
Once client commits the message, Kafka marks the message "deleted" for the consumer and hence the read message would be available in next poll by the client.
Properties used in the below example
bootstrap.servers=localhost:9092
ProducerConfig.RETRIES_CONFIG=0
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
retries=0
group.id=group1
HQ_TOPIC_NAME=EK.TWEETS.TOPIC
CONSUMER_TIMEOUT=1000
worker.thread.count=5
counsumer.count=3
auto.offset.reset=earliest
enable.auto.commit=false
Configuration Level Setting
This can be done at configuration level in the properties files.
auto.commit.offset=false - This is the default setting. Means the consumer API can take the decision to retail the message of the offset or commit it.
auto.commit.offset=true - Once the message is consumed by the consumer, the offset is committed if consumer API is not taking any decision in client code.
Consumer API Level Setting
Synchronous Commit
Offset is committed as soon consumer API confirms.
The latest Offset of the message is committed.
Below example is committing the message after processing all messages of the current polling.
Synchronous commit blocks until the broker responds to the commit request.
Sample Code
public synchronized void subscribeMessage(String configPropsFile)throws Exception{
try{
if(consumer==null){
consumer =(KafkaConsumer<byte[], byte[]>) getKafkaConnection(configPropsFile);
System.out.println("Kafka Connection created...on TOPIC : "+getTopicName());
}
consumer.subscribe(Collections.singletonList(getTopicName()));
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(10000L);
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}catch(Exception e){
e.printStackTrace();
consumer.close();
}
}
Asynchronous Commit
The consumer does not wait for the the response from the broker
This commits just confirms the broker and continue its processing.
Throughput is more in compare to Synchronous commit.
There could be chances of duplicate read, that application need to handle its own.
Sample code
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(10000L);
System.out.println("Number of messaged polled by consumer "+records.count());
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception != null){
System.out.printf("Commit failed for offsets {}", offsets, exception);
}else{
System.out.println("Messages are Committed Asynchronously...");
}
}});
}
Offset Level Commit
Sometime application may need to commit the offset on read of particular offset.
Sample Code
Map<TopicPartition, OffsetAndMetadata> currentOffsets =new HashMap<TopicPartition, OffsetAndMetadata>();
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(1000L);
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new
OffsetAndMetadata(record.offset()+1, "no metadata"));
if(record.offset()==18098){
consumer.commitAsync(currentOffsets, null);
}
}
}