In this example, we’ll see you how to make the use of ConsumerAwareListenerErrorHandler
Create Topic —
kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-food-order
Created topic t-food-order.
Producer.java
FoodOrder.java
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class FoodOrder {
private int amount;
private String item;
}
FoodOrderProducer.java
package com.course.kafka.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import com.course.kafka.entity.FoodOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
public class FoodOrderProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
public void send(FoodOrder foodOrder) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(foodOrder);
kafkaTemplate.send("t-food-order", json);
}
}
MainApp.java
package com.course.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.course.kafka.entity.FoodOrder;
import com.course.kafka.producer.FoodOrderProducer;
@SpringBootApplication
@EnableScheduling
public class KafkaProducerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
@Autowired
private FoodOrderProducer producer;
@Override
public void run(String... args) throws Exception {
var fo1 = FoodOrder.builder().item("Chicken").amount(3).build();
var fo2 = FoodOrder.builder().item("Fish").amount(10).build();
var fo3 = FoodOrder.builder().item("Pizza").amount(5).build();
producer.send(fo1);
producer.send(fo2);
producer.send(fo3);
}
}
Consumer
FoodOrderConsumer.java
package com.course.kafka.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.course.kafka.entity.FoodOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
public class FoodOrderConsumer {
private static final Logger LOG = LoggerFactory.getLogger(FoodOrderConsumer.class);
private static final int MAX_ORDER_AMOUNT = 7;
@Autowired
private ObjectMapper objectMapper;
@KafkaListener(topics = "t-food-order", errorHandler = "myFoodOrderErrorHandler")
public void consume(String message) throws JsonMappingException, JsonProcessingException {
var foodOrder = objectMapper.readValue(message, FoodOrder.class);
System.out.println("Food Order : "+ foodOrder);
if (foodOrder.getAmount() > MAX_ORDER_AMOUNT) {
throw new IllegalArgumentException("Order amount is too many : " + foodOrder.getAmount());
}
LOG.info("Processing food order : {}", foodOrder);
}
}
FoodOrderErrorHandler.java
package com.course.kafka.error.handler;
import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@Service(value = "myFoodOrderErrorHandler")
public class FoodOrderErrorHandler implements ConsumerAwareListenerErrorHandler {
private static final Logger LOG = LoggerFactory.getLogger(FoodOrderErrorHandler.class);
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
LOG.warn("Food order error, sending to elasticsearch : {}, because : {}", message.getPayload(),
exception.getMessage());
return null;
}
}
Rethrow the exception
import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@Service(value = "myFoodOrderErrorHandler")
public class FoodOrderErrorHandler implements ConsumerAwareListenerErrorHandler {
private static final Logger LOG = LoggerFactory.getLogger(FoodOrderErrorHandler.class);
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
LOG.warn("Food order error, sending to elasticsearch : {}, because : {}", message.getPayload(),
exception.getMessage());
if (exception.getCause() instanceof RuntimeException) {
throw exception;
}
return null;
}
}