Prateek
3 min readMay 14, 2022

Kafka — Dead Letter Topic

In this example, we’ll see how to make the use of DeadLetterTopic to send messages even after retry.

Scenario —

  • Publish message to t-invoice
  • If amount less than 1 throw exception, retry 5 times
  • After 5 failed retry attempts, publish to t-invoice-dead
  • Another consumer will consume from t-invoice-dead for further processing.

Create Topic —

kafka-topics --bootstrap-server localhost:9092 --create --partitions 2 --replication-factor 1 --topic t-invoice
Created topic t-invoice.
kafka-topics --bootstrap-server localhost:9092 --create --partitions 2 --replication-factor 1 --topic t-invoice-dead
Created topic t-invoice-dead.

Invoice.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Invoice {
private String invoiceNumber;
private int amount;
private String currency;
}

InvoiceService.java

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.stereotype.Service;

import com.course.kafka.entity.Invoice;

@Service
public class InvoiceService {

private AtomicInteger counter = new AtomicInteger();

public Invoice generateInvoice() {
var invoiceNumber = "INV-" + counter.incrementAndGet();
var amount = ThreadLocalRandom.current().nextInt(1, 1000);

return new Invoice(invoiceNumber, amount, "USD");
}
}

InvoiceProducer.java

package com.course.kafka.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.Invoice;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class InvoiceProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private ObjectMapper objectMapper;

public void send(Invoice invoice) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(invoice);
kafkaTemplate.send("t-invoice", invoice.getAmount() % 2, invoice.getInvoiceNumber(), json);
}

}

Consumer

KafkaConfig.java

package com.course.kafka.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.util.backoff.FixedBackOff;

import com.course.kafka.entity.CarLocation;
import com.course.kafka.error.handler.GlobalErrorHandler;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Configuration
public class KafkaConfig {

@Autowired
private KafkaProperties kafkaProperties;

@Autowired
private ObjectMapper objectMapper;

@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
var properties = kafkaProperties.buildConsumerProperties();

properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "120000");

return new DefaultKafkaConsumerFactory<>(properties);
}

@Bean(name = "farLocationContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> farLocationContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());

factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {

@Override
public boolean filter(ConsumerRecord<Object, Object> consumerRecord) {
try {
CarLocation carLocation = objectMapper.readValue(consumerRecord.value().toString(), CarLocation.class);
return carLocation.getDistance() <= 100;
} catch (JsonProcessingException e) {
return false;
}
}
});

return factory;
}


@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());

factory.setCommonErrorHandler(new GlobalErrorHandler());

return factory;
}

@Bean(name = "imageRetryContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> imageRetryContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());

factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(10_000, 3)));

return factory;
}


@Bean(name = "invoiceDltContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> invoiceDltContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer, KafkaTemplate<String, String> kafkaTemplate) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());

//var recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
var recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate, (record, ex) -> new TopicPartition("t-invoice-dead", record.partition()));

factory.setCommonErrorHandler(new DefaultErrorHandler(recoverer, new FixedBackOff(1000, 5)));

return factory;
}
}

InvoiceConsumer.java

package com.course.kafka.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.Invoice;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class InvoiceConsumer {

private static final Logger LOG = LoggerFactory.getLogger(InvoiceConsumer.class);

@Autowired
private ObjectMapper objectMapper;

@KafkaListener(topics = "t-invoice", concurrency = "2", containerFactory = "invoiceDltContainerFactory")
public void consume(String message) throws JsonMappingException, JsonProcessingException {
var invoice = objectMapper.readValue(message, Invoice.class);

if (invoice.getAmount() < 1) {
throw new IllegalArgumentException("Invalid amount for " + invoice);
}

LOG.info("Processing invoice : {}", invoice);
}
}