kafKa— Retry

In this example, we’ll implement the retry in Spring Consumer.

Producer

Image.class

package com.course.kafka.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Image {
private String name;
private long size;
private String type;
}

ImageProducer.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.Image;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class ImageProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private ObjectMapper objectMapper;

public void send(Image image, int partition) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(image);
kafkaTemplate.send("t-image", partition, image.getType(), json);
}
}

MainApp.java

package com.course.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.course.kafka.entity.FoodOrder;
import com.course.kafka.entity.Image;
import com.course.kafka.entity.SimpleNumber;
import com.course.kafka.producer.FoodOrderProducer;
import com.course.kafka.producer.ImageProducer;
import com.course.kafka.producer.ImageService;
import com.course.kafka.producer.SimpleNumberProducer;

@SpringBootApplication
@EnableScheduling
public class KafkaProducerApplication implements CommandLineRunner {

public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}

@Autowired
private ImageService imageService;
@Autowired
private ImageProducer imageProducer;

@Override
public void run(String... args) throws Exception {
Image img1 = imageService.generateImage("jpg");
Image img2 = imageService.generateImage("svg");
Image img3 = imageService.generateImage("png");
Image img4 = imageService.generateImage("gif");
Image img5 = imageService.generateImage("bmp");
Image img6 = imageService.generateImage("tiff");

imageProducer.send(img1, 0);
imageProducer.send(img2, 0);
imageProducer.send(img3, 0);

imageProducer.send(img4, 1);
imageProducer.send(img5, 1);
imageProducer.send(img6, 1);
}
}

Consumer

ImageConsumer.java

package com.course.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.Image;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class ImageConsumer {

private static final Logger LOG = LoggerFactory.getLogger(ImageConsumer.class);

@Autowired
private ObjectMapper objectMapper;

@KafkaListener(topics = "t-image", containerFactory = "imageRetryContainerFactory", concurrency = "2")
public void consume(ConsumerRecord<String, String> consumerRecord)
throws JsonMappingException, JsonProcessingException {
var image = objectMapper.readValue(consumerRecord.value(), Image.class);

if (image.getType().equalsIgnoreCase("svg")) {
LOG.warn("Throwing exception on partition {} for image {}", consumerRecord.partition(), image);
throw new IllegalArgumentException("Simulate API call failed");
}

LOG.info("Processing on partition {} for image {}", consumerRecord.partition(), image);
}

}

KafkaConfig.java

package com.course.kafka.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.util.backoff.FixedBackOff;

import com.course.kafka.entity.CarLocation;
import com.course.kafka.error.handler.GlobalErrorHandler;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Configuration
public class KafkaConfig {

@Autowired
private KafkaProperties kafkaProperties;

@Autowired
private ObjectMapper objectMapper;

@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
var properties = kafkaProperties.buildConsumerProperties();

properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "120000");

return new DefaultKafkaConsumerFactory<>(properties);
}

@Bean(name = "farLocationContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> farLocationContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());

factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {

@Override
public boolean filter(ConsumerRecord<Object, Object> consumerRecord) {
try {
CarLocation carLocation = objectMapper.readValue(consumerRecord.value().toString(), CarLocation.class);
return carLocation.getDistance() <= 100;
} catch (JsonProcessingException e) {
return false;
}
}
});

return factory;
}


@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());

factory.setCommonErrorHandler(new GlobalErrorHandler());

return factory;
}

@Bean(name = "imageRetryContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> imageRetryContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());

factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(10_000, 3)));

return factory;
}
}

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store