In this example, we’ll look at the way to filter messages from the Kafka
CarLocation.java
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class CarLocation {
private String carId;
private long timestamp;
private int distance;
}
CarLocationProducer.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import com.course.kafka.entity.CarLocation;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
public class CarLocationProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
public void send(CarLocation carLocation) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(carLocation);
kafkaTemplate.send("t-location", json);
}
}
MainApp.java
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class KafkaProducerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
}
}
Create Topic
kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-location
Created topic t-location.
Consumer —
CarLocationConsumer.java
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import com.course.kafka.entity.CarLocation;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
public class CarLocationConsumer {
private static final Logger LOG = LoggerFactory.getLogger(CarLocationConsumer.class);
@Autowired
private ObjectMapper objectMapper;
@KafkaListener(topics = "t-location", groupId = "cg-all-location")
public void listenAll(String message) throws JsonMappingException, JsonProcessingException {
var carLocation = objectMapper.readValue(message, CarLocation.class);
LOG.info("listenAll : {}", carLocation);
}
@KafkaListener(topics = "t-location", groupId = "cg-far-location",containerFactory = "farLocationContainerFactory")
public void listenFar(String message) throws JsonMappingException, JsonProcessingException {
var carLocation = objectMapper.readValue(message, CarLocation.class);
LOG.info("---listenFar: {}", carLocation);
}
}
KafkaConfig.java
package com.course.kafka.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.util.backoff.FixedBackOff;
import com.course.kafka.entity.CarLocation;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Configuration
public class KafkaConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Autowired
private ObjectMapper objectMapper;
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
var properties = kafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "120000");
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean(name = "farLocationContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> farLocationContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());
factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {
@Override
public boolean filter(ConsumerRecord<Object, Object> consumerRecord) {
try {
CarLocation carLocation = objectMapper.readValue(consumerRecord.value().toString(), CarLocation.class);
return carLocation.getDistance() <= 100;
} catch (JsonProcessingException e) {
return false;
}
}
});
return factory;
}
}
CarLocationScheduler.java
package com.course.kafka.scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import com.course.kafka.entity.CarLocation;
import com.course.kafka.producer.CarLocationProducer;
import com.fasterxml.jackson.core.JsonProcessingException;
@Service
public class CarLocationScheduler {
private static final Logger LOG = LoggerFactory.getLogger(CarLocationScheduler.class);
private CarLocation carOne;
private CarLocation carTwo;
private CarLocation carThree;
@Autowired
private CarLocationProducer producer;
public CarLocationScheduler() {
var now = System.currentTimeMillis();
carOne = new CarLocation("car-one", now, 0);
carTwo = new CarLocation("car-two", now, 110);
carThree = new CarLocation("car-three", now, 95);
}
@Scheduled(fixedRate = 10000)
public void generateCarLocation() throws JsonProcessingException {
var now = System.currentTimeMillis();
carOne.setTimestamp(now);
carTwo.setTimestamp(now);
carThree.setTimestamp(now);
carOne.setDistance(carOne.getDistance() + 1);
carTwo.setDistance(carTwo.getDistance() - 1);
carThree.setDistance(carThree.getDistance() + 1);
producer.send(carOne);
producer.send(carTwo);
producer.send(carThree);
LOG.info("Sent : {}", carOne);
LOG.info("Sent : {}", carTwo);
LOG.info("Sent : {}", carThree);
}
}
We were able to filter the messages from the Kafka!