Kafka — Message Filter

Prateek
3 min readMay 9, 2022

In this example, we’ll look at the way to filter messages from the Kafka

CarLocation.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class CarLocation {
private String carId;
private long timestamp;
private int distance;
}

CarLocationProducer.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.CarLocation;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class CarLocationProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private ObjectMapper objectMapper;

public void send(CarLocation carLocation) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(carLocation);
kafkaTemplate.send("t-location", json);
}
}

MainApp.java

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class KafkaProducerApplication implements CommandLineRunner {

public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}

@Override
public void run(String... args) throws Exception {

}
}

Create Topic

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-location
Created topic t-location.

Consumer —

CarLocationConsumer.java

import org.springframework.stereotype.Service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;

import com.course.kafka.entity.CarLocation;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class CarLocationConsumer {

private static final Logger LOG = LoggerFactory.getLogger(CarLocationConsumer.class);

@Autowired
private ObjectMapper objectMapper;

@KafkaListener(topics = "t-location", groupId = "cg-all-location")
public void listenAll(String message) throws JsonMappingException, JsonProcessingException {
var carLocation = objectMapper.readValue(message, CarLocation.class);
LOG.info("listenAll : {}", carLocation);
}


@KafkaListener(topics = "t-location", groupId = "cg-far-location",containerFactory = "farLocationContainerFactory")
public void listenFar(String message) throws JsonMappingException, JsonProcessingException {
var carLocation = objectMapper.readValue(message, CarLocation.class);
LOG.info("---listenFar: {}", carLocation);
}
}

KafkaConfig.java

package com.course.kafka.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.util.backoff.FixedBackOff;

import com.course.kafka.entity.CarLocation;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Configuration
public class KafkaConfig {

@Autowired
private KafkaProperties kafkaProperties;

@Autowired
private ObjectMapper objectMapper;

@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
var properties = kafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "120000");
return new DefaultKafkaConsumerFactory<>(properties);
}

@Bean(name = "farLocationContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> farLocationContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());
factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {

@Override
public boolean filter(ConsumerRecord<Object, Object> consumerRecord) {
try {
CarLocation carLocation = objectMapper.readValue(consumerRecord.value().toString(), CarLocation.class);
return carLocation.getDistance() <= 100;
} catch (JsonProcessingException e) {
return false;
}
}
});
return factory;
}
}

CarLocationScheduler.java

package com.course.kafka.scheduler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.CarLocation;
import com.course.kafka.producer.CarLocationProducer;
import com.fasterxml.jackson.core.JsonProcessingException;

@Service
public class CarLocationScheduler {

private static final Logger LOG = LoggerFactory.getLogger(CarLocationScheduler.class);

private CarLocation carOne;
private CarLocation carTwo;
private CarLocation carThree;

@Autowired
private CarLocationProducer producer;

public CarLocationScheduler() {
var now = System.currentTimeMillis();

carOne = new CarLocation("car-one", now, 0);
carTwo = new CarLocation("car-two", now, 110);
carThree = new CarLocation("car-three", now, 95);
}

@Scheduled(fixedRate = 10000)
public void generateCarLocation() throws JsonProcessingException {
var now = System.currentTimeMillis();

carOne.setTimestamp(now);
carTwo.setTimestamp(now);
carThree.setTimestamp(now);

carOne.setDistance(carOne.getDistance() + 1);
carTwo.setDistance(carTwo.getDistance() - 1);
carThree.setDistance(carThree.getDistance() + 1);

producer.send(carOne);
producer.send(carTwo);
producer.send(carThree);

LOG.info("Sent : {}", carOne);
LOG.info("Sent : {}", carTwo);
LOG.info("Sent : {}", carThree);
}
}

We were able to filter the messages from the Kafka!

--

--