Kafka — KafkaListener Error Handler

In this example, we’ll see you how to make the use of ConsumerAwareListenerErrorHandler

Create Topic —

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-food-order
Created topic t-food-order.

Producer.java

FoodOrder.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class FoodOrder {
private int amount;
private String item;
}

FoodOrderProducer.java

package com.course.kafka.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.FoodOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class FoodOrderProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private ObjectMapper objectMapper;

public void send(FoodOrder foodOrder) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(foodOrder);

kafkaTemplate.send("t-food-order", json);
}
}

MainApp.java

package com.course.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.course.kafka.entity.FoodOrder;
import com.course.kafka.producer.FoodOrderProducer;

@SpringBootApplication
@EnableScheduling
public class KafkaProducerApplication implements CommandLineRunner {

public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}

@Autowired
private FoodOrderProducer producer;

@Override
public void run(String... args) throws Exception {
var fo1 = FoodOrder.builder().item("Chicken").amount(3).build();
var fo2 = FoodOrder.builder().item("Fish").amount(10).build();
var fo3 = FoodOrder.builder().item("Pizza").amount(5).build();

producer.send(fo1);
producer.send(fo2);
producer.send(fo3);
}
}

Consumer

FoodOrderConsumer.java

package com.course.kafka.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.FoodOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class FoodOrderConsumer {

private static final Logger LOG = LoggerFactory.getLogger(FoodOrderConsumer.class);

private static final int MAX_ORDER_AMOUNT = 7;

@Autowired
private ObjectMapper objectMapper;

@KafkaListener(topics = "t-food-order", errorHandler = "myFoodOrderErrorHandler")
public void consume(String message) throws JsonMappingException, JsonProcessingException {
var foodOrder = objectMapper.readValue(message, FoodOrder.class);

System.out.println("Food Order : "+ foodOrder);
if (foodOrder.getAmount() > MAX_ORDER_AMOUNT) {
throw new IllegalArgumentException("Order amount is too many : " + foodOrder.getAmount());
}

LOG.info("Processing food order : {}", foodOrder);
}

}

FoodOrderErrorHandler.java

package com.course.kafka.error.handler;

import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

@Service(value = "myFoodOrderErrorHandler")
public class FoodOrderErrorHandler implements ConsumerAwareListenerErrorHandler {

private static final Logger LOG = LoggerFactory.getLogger(FoodOrderErrorHandler.class);

@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
LOG.warn("Food order error, sending to elasticsearch : {}, because : {}", message.getPayload(),
exception.getMessage());
return null;
}
}

Rethrow the exception

import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

@Service(value = "myFoodOrderErrorHandler")
public class FoodOrderErrorHandler implements ConsumerAwareListenerErrorHandler {

private static final Logger LOG = LoggerFactory.getLogger(FoodOrderErrorHandler.class);

@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
LOG.warn("Food order error, sending to elasticsearch : {}, because : {}", message.getPayload(),
exception.getMessage());

if (exception.getCause() instanceof RuntimeException) {
throw exception;
}

return null;
}
}

--

--

--

Java Developer and enthusiast

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Why is Cloud a Board Level Key Business Initiative

Razorpay payment DJANGO

Automate UI Testing with PyAutoGUI in Python

How to set up Windows Subsystem for Linux (WSL)

Source and Target Profiling is Key to Estimating Salesforce Data Migrations With Accuracy

More goroutines means… faster? It depends.

Why Feature Briefs > Epics

Simplify Workload Migration to AWS EKS (Elastic Kubernetes Service) by using Jamcracker Cloud…

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
PA

PA

Java Developer and enthusiast

More from Medium

Kafka — Hello Producer

Kafka and Its Components

Kafka Transactions: Part 2: Spring Boot Demo

Kafka internals : how Kafka brokers work