In this example, we’ll implement the retry in Spring Consumer.
Producer
Image.class
package com.course.kafka.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Image {
private String name;
private long size;
private String type;
}
ImageProducer.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import com.course.kafka.entity.Image;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
public class ImageProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
public void send(Image image, int partition) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(image);
kafkaTemplate.send("t-image", partition, image.getType(), json);
}
}
MainApp.java
package com.course.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.course.kafka.entity.FoodOrder;
import com.course.kafka.entity.Image;
import com.course.kafka.entity.SimpleNumber;
import com.course.kafka.producer.FoodOrderProducer;
import com.course.kafka.producer.ImageProducer;
import com.course.kafka.producer.ImageService;
import com.course.kafka.producer.SimpleNumberProducer;
@SpringBootApplication
@EnableScheduling
public class KafkaProducerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
@Autowired
private ImageService imageService;
@Autowired
private ImageProducer imageProducer;
@Override
public void run(String... args) throws Exception {
Image img1 = imageService.generateImage("jpg");
Image img2 = imageService.generateImage("svg");
Image img3 = imageService.generateImage("png");
Image img4 = imageService.generateImage("gif");
Image img5 = imageService.generateImage("bmp");
Image img6 = imageService.generateImage("tiff");
imageProducer.send(img1, 0);
imageProducer.send(img2, 0);
imageProducer.send(img3, 0);
imageProducer.send(img4, 1);
imageProducer.send(img5, 1);
imageProducer.send(img6, 1);
}
}
Consumer
ImageConsumer.java
package com.course.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.course.kafka.entity.Image;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
public class ImageConsumer {
private static final Logger LOG = LoggerFactory.getLogger(ImageConsumer.class);
@Autowired
private ObjectMapper objectMapper;
@KafkaListener(topics = "t-image", containerFactory = "imageRetryContainerFactory", concurrency = "2")
public void consume(ConsumerRecord<String, String> consumerRecord)
throws JsonMappingException, JsonProcessingException {
var image = objectMapper.readValue(consumerRecord.value(), Image.class);
if (image.getType().equalsIgnoreCase("svg")) {
LOG.warn("Throwing exception on partition {} for image {}", consumerRecord.partition(), image);
throw new IllegalArgumentException("Simulate API call failed");
}
LOG.info("Processing on partition {} for image {}", consumerRecord.partition(), image);
}
}
KafkaConfig.java
package com.course.kafka.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.util.backoff.FixedBackOff;
import com.course.kafka.entity.CarLocation;
import com.course.kafka.error.handler.GlobalErrorHandler;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Configuration
public class KafkaConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Autowired
private ObjectMapper objectMapper;
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
var properties = kafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "120000");
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean(name = "farLocationContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> farLocationContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());
factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {
@Override
public boolean filter(ConsumerRecord<Object, Object> consumerRecord) {
try {
CarLocation carLocation = objectMapper.readValue(consumerRecord.value().toString(), CarLocation.class);
return carLocation.getDistance() <= 100;
} catch (JsonProcessingException e) {
return false;
}
}
});
return factory;
}
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());
factory.setCommonErrorHandler(new GlobalErrorHandler());
return factory;
}
@Bean(name = "imageRetryContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> imageRetryContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(10_000, 3)));
return factory;
}
}