Kafka — Dead Letter Topic
In this example, we’ll see how to make the use of DeadLetterTopic to send messages even after retry.
Scenario —
- Publish message to t-invoice
- If amount less than 1 throw exception, retry 5 times
- After 5 failed retry attempts, publish to t-invoice-dead
- Another consumer will consume from t-invoice-dead for further processing.
Create Topic —
kafka-topics --bootstrap-server localhost:9092 --create --partitions 2 --replication-factor 1 --topic t-invoice
Created topic t-invoice.kafka-topics --bootstrap-server localhost:9092 --create --partitions 2 --replication-factor 1 --topic t-invoice-dead
Created topic t-invoice-dead.
Invoice.java
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Invoice {
private String invoiceNumber;
private int amount;
private String currency;
}
InvoiceService.java
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.stereotype.Service;
import com.course.kafka.entity.Invoice;
@Service
public class InvoiceService {
private AtomicInteger counter = new AtomicInteger();
public Invoice generateInvoice() {
var invoiceNumber = "INV-" + counter.incrementAndGet();
var amount = ThreadLocalRandom.current().nextInt(1, 1000);
return new Invoice(invoiceNumber, amount, "USD");
}
}
InvoiceProducer.java
package com.course.kafka.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import com.course.kafka.entity.Invoice;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
public class InvoiceProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
public void send(Invoice invoice) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(invoice);
kafkaTemplate.send("t-invoice", invoice.getAmount() % 2, invoice.getInvoiceNumber(), json);
}
}
Consumer
KafkaConfig.java
package com.course.kafka.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.util.backoff.FixedBackOff;
import com.course.kafka.entity.CarLocation;
import com.course.kafka.error.handler.GlobalErrorHandler;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Configuration
public class KafkaConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Autowired
private ObjectMapper objectMapper;
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
var properties = kafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "120000");
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean(name = "farLocationContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> farLocationContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());
factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {
@Override
public boolean filter(ConsumerRecord<Object, Object> consumerRecord) {
try {
CarLocation carLocation = objectMapper.readValue(consumerRecord.value().toString(), CarLocation.class);
return carLocation.getDistance() <= 100;
} catch (JsonProcessingException e) {
return false;
}
}
});
return factory;
}
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());
factory.setCommonErrorHandler(new GlobalErrorHandler());
return factory;
}
@Bean(name = "imageRetryContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> imageRetryContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(10_000, 3)));
return factory;
}
@Bean(name = "invoiceDltContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> invoiceDltContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer, KafkaTemplate<String, String> kafkaTemplate) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());
//var recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
var recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate, (record, ex) -> new TopicPartition("t-invoice-dead", record.partition()));
factory.setCommonErrorHandler(new DefaultErrorHandler(recoverer, new FixedBackOff(1000, 5)));
return factory;
}
}
InvoiceConsumer.java
package com.course.kafka.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.course.kafka.entity.Invoice;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
public class InvoiceConsumer {
private static final Logger LOG = LoggerFactory.getLogger(InvoiceConsumer.class);
@Autowired
private ObjectMapper objectMapper;
@KafkaListener(topics = "t-invoice", concurrency = "2", containerFactory = "invoiceDltContainerFactory")
public void consume(String message) throws JsonMappingException, JsonProcessingException {
var invoice = objectMapper.readValue(message, Invoice.class);
if (invoice.getAmount() < 1) {
throw new IllegalArgumentException("Invalid amount for " + invoice);
}
LOG.info("Processing invoice : {}", invoice);
}
}