Kafka Consumer Error Handling, Retry, and Recovery

Nishanth Reddy Emmadi
5 min readJul 15, 2020

In the previous post Here I have covered the Kafka Producer Error Handling, Retry, and Recovery. Here in this post Let’s see how to handle Errors, Retry, and Recovery in Kafka consumer.

As a scenario, let’s assume there is a Kafka topic Package events topic from which our consumer poll the events from. The events once consumed they are inserted into the database with the help of a service class (Package service). Here database can be another API or third party application.

In this post, I would like to cover how to handle the exceptions at the service level, where an exception can be in service as validation or while persisting into a database or it can be also when you are making a call to an API.

Kafka consumer-based application is responsible to consume events, process events, and make a call to third party API.

Let me start talking about Kafka Consumer. To create a consumer listening to a certain topic, we use @KafkaListener(topics = {“packages-received”}) on a method in spring boot application. Here topic to poll messages from is “packages-received”.

@KafkaListener(topics = {"packages-received"})
public void packagesListener(ConsumerRecord<String,PackageInfoEvent> packageInfoEvent){

log.info("Received event to persist packageInfoEvent :{}", packageInfoEvent.value());

}

In general Kafka Listener gets all the properties like groupId, key, and value serializer information specified in the property files is by “kafkaListenerFactory” bean. In simple words “kafkaListenerFactory” bean is key for configuring the Kafka Listener.

If we need to configure the Kafka listener configuration overwriting the default behavior you need to create your “kafkaListenerFactory” bean and set your desired configurations. This is what we are going to leverage to set up the Error handling, retry, and recovery for the Kafka Listener/consumer.

@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);

return factory;
}

Firstly we will talk about handling the retries with Kafka, generally, Runtime exception caused in the service layer, these are the exceptions caused by the service(DB, API) you are trying to access is down or have some issue. These Exceptions are those which can be succeeded when they are tried later. The following code snippet shows how to configure a retry with RetryTemplate. In return, RetryTemplate is set with Retry policy which specifies the maximum attempts you want to retry and what are the exceptions you want to retry and what are not to be retried.

public class ConsumerConfig {

@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);

factory.setRetryTemplate(retryTemplate());

return factory;
}

private RetryTemplate retryTemplate() {

RetryTemplate retryTemplate = new RetryTemplate();

/* here retry policy is used to set the number of attempts to retry and what exceptions you wanted to try and what you don't want to retry.*/
retryTemplate.setRetryPolicy(getSimpleRetryPolicy());

return retryTemplate;
}

private SimpleRetryPolicy getSimpleRetryPolicy() {
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
// the boolean value in the map determines whether exception should be retried exceptionMap.put(IllegalArgumentException.class, false);
exceptionMap.put(TimeoutException.class, true);

return new SimpleRetryPolicy(3,exceptionMap,true);
}

}

Even after retrying certain exceptions for the max number of retries the event is failed , here is when the recovery phase kicks in. The Retry and Recover go hand in hand, if the number of retries is exhausted, the recovery will test if the event exception is recoverable and take necessary recovery steps like putting it back to retry topic or saving it to DB to try for later. In case the event exception is not recoverable it simply passes it on to Error handler. We will talk about error handling in a minute here.

Let us talk about Kafka Recovery, There is a handly method setRecoveryCallBack() on ConcurrentKafkaListenerContainerFactory where it accepts the Retry context parameter, here we get context after max retries attempted it has information about the event. It can be implemented as follows:

@Configuration
@Slf4j
public class ConsumerConfig {

@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);

factory.setRetryTemplate(retryTemplate());

factory.setRecoveryCallback((context -> {

if(context.getLastThrowable().getCause() instanceof RecoverableDataAccessException){

//here you can do your recovery mechanism where you can put back on to the topic using a Kafka producer

} else{

// here you can log things and throw some custom exception that Error handler will take care of ..
throw new RuntimeException(context.getLastThrowable().getMessage());
}

return null;

}));

return factory;
}

private RetryTemplate retryTemplate() {

RetryTemplate retryTemplate = new RetryTemplate();

/* here retry policy is used to set the number of attempts to retry and what exceptions you wanted to try and what you don't want to retry.*/
retryTemplate.setRetryPolicy(getSimpleRetryPolicy());

return retryTemplate;
}

private SimpleRetryPolicy getSimpleRetryPolicy() {
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
exceptionMap.put(IllegalArgumentException.class, false);
exceptionMap.put(TimeoutException.class, true);

return new SimpleRetryPolicy(3,exceptionMap,true);
}

}

Lastly we will talk about Error handling, as a result of any exception in the process of the consumed event the error is logged by Kafka “LoggingErrorHandler” class in org.springframework.kafka.listener which implements “ErrorHandler” interface. So by implementing the “ErrorHandler” interface we can implement our own Error Handler.

@Configuration
@Slf4j
public class ConsumerConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(((exception, data) -> { /* here you can do you custom handling, I am just logging it same as default Error handler doesIf you just want to log. you need not configure the error handler here. The default handler does it for you.Generally, you will persist the failed records to DB for tracking the failed records. */log.error("Error in process with Exception {} and the record is {}", exception, data); })); return factory;
}
}

Code Snippet all strategies working together

package com.pack.events.consumer.config;


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.RecoverableDataAccessException;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@Configuration
@Slf4j
public class ConsumerConfig {

@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);

factory.setRetryTemplate(retryTemplate());

factory.setRecoveryCallback((context -> {

if(context.getLastThrowable().getCause() instanceof RecoverableDataAccessException){

//here you can do your recovery mechanism where you can put back on to the topic using a Kafka producer

} else{

// here you can log things and throw some custom exception that Error handler will take care of ..
throw new RuntimeException(context.getLastThrowable().getMessage());
}

return null;

}));

factory.setErrorHandler(((exception, data) -> {
/* here you can do you custom handling, I am just logging it same as default Error handler doesIf you just want to log. you need not configure the error handler here. The default handler does it for you.Generally, you will persist the failed records to DB for tracking the failed records. */ log.error("Error in process with Exception {} and the record is {}", exception, data);
}));

return factory;
}

private RetryTemplate retryTemplate() {

RetryTemplate retryTemplate = new RetryTemplate();

/* here retry policy is used to set the number of attempts to retry and what exceptions you wanted to try and what you don't want to retry.*/
retryTemplate.setRetryPolicy(getSimpleRetryPolicy());

return retryTemplate;
}

private SimpleRetryPolicy getSimpleRetryPolicy() {
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
exceptionMap.put(IllegalArgumentException.class, false);
exceptionMap.put(TimeoutException.class, true);

return new SimpleRetryPolicy(3,exceptionMap,true);
}

}

Thank you

--

--