Kafka — Handler

Prateek
3 min readMay 9, 2022

--

In this example, we’ll see you how to make the use of ConsumerAwareListenerErrorHandler

Create Topic —

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-food-order
Created topic t-food-order.

Producer.java

FoodOrder.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class FoodOrder {
private int amount;
private String item;
}

FoodOrderProducer.java

package com.course.kafka.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.FoodOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class FoodOrderProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private ObjectMapper objectMapper;

public void send(FoodOrder foodOrder) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(foodOrder);

kafkaTemplate.send("t-food-order", json);
}
}

MainApp.java

package com.course.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.course.kafka.entity.FoodOrder;
import com.course.kafka.producer.FoodOrderProducer;

@SpringBootApplication
@EnableScheduling
public class KafkaProducerApplication implements CommandLineRunner {

public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}

@Autowired
private FoodOrderProducer producer;

@Override
public void run(String... args) throws Exception {
var fo1 = FoodOrder.builder().item("Chicken").amount(3).build();
var fo2 = FoodOrder.builder().item("Fish").amount(10).build();
var fo3 = FoodOrder.builder().item("Pizza").amount(5).build();

producer.send(fo1);
producer.send(fo2);
producer.send(fo3);
}
}

Consumer

FoodOrderConsumer.java

package com.course.kafka.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.FoodOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class FoodOrderConsumer {

private static final Logger LOG = LoggerFactory.getLogger(FoodOrderConsumer.class);

private static final int MAX_ORDER_AMOUNT = 7;

@Autowired
private ObjectMapper objectMapper;

@KafkaListener(topics = "t-food-order", errorHandler = "myFoodOrderErrorHandler")
public void consume(String message) throws JsonMappingException, JsonProcessingException {
var foodOrder = objectMapper.readValue(message, FoodOrder.class);

System.out.println("Food Order : "+ foodOrder);
if (foodOrder.getAmount() > MAX_ORDER_AMOUNT) {
throw new IllegalArgumentException("Order amount is too many : " + foodOrder.getAmount());
}

LOG.info("Processing food order : {}", foodOrder);
}

}

FoodOrderErrorHandler.java

package com.course.kafka.error.handler;

import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

@Service(value = "myFoodOrderErrorHandler")
public class FoodOrderErrorHandler implements ConsumerAwareListenerErrorHandler {

private static final Logger LOG = LoggerFactory.getLogger(FoodOrderErrorHandler.class);

@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
LOG.warn("Food order error, sending to elasticsearch : {}, because : {}", message.getPayload(),
exception.getMessage());
return null;
}
}

Rethrow the exception

import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

@Service(value = "myFoodOrderErrorHandler")
public class FoodOrderErrorHandler implements ConsumerAwareListenerErrorHandler {

private static final Logger LOG = LoggerFactory.getLogger(FoodOrderErrorHandler.class);

@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
LOG.warn("Food order error, sending to elasticsearch : {}, because : {}", message.getPayload(),
exception.getMessage());

if (exception.getCause() instanceof RuntimeException) {
throw exception;
}

return null;
}
}

--

--