In this example, we’ll look at the alternative approach for idempotency, will only consider few fields to identify uniqueness.
Create Topic
kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-payment-request
Created topic t-payment-request.
PaymentRequest.java
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class PaymentRequest {
private String paymentNumber;
private int amount;
private String currency;
private String notes;
private String transactionType;
}
PaymentRequestProducer.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import com.course.kafka.entity.PaymentRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
public class PaymentRequestProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
public void send(PaymentRequest paymentRequest) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(paymentRequest);
kafkaTemplate.send("t-payment-request", paymentRequest.getPaymentNumber(), json);
}
}
MainApp.java
package com.course.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.course.kafka.entity.PaymentRequest;
import com.course.kafka.entity.PurchaseRequest;
import com.course.kafka.producer.PaymentRequestProducer;
import com.course.kafka.producer.PurchaseRequestProducer;
@SpringBootApplication
@EnableScheduling
public class KafkaProducerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
@Autowired
private PaymentRequestProducer producer;
@Override
public void run(String... args) throws Exception {
var alphaTx1 = PaymentRequest.builder().paymentNumber("Pay-Alpha").amount(551).currency("USD")
.notes("Notes Alpha").transactionType("Budget").build();
var alphaTx2 = PaymentRequest.builder().paymentNumber("Pay-Alpha").amount(551).currency("USD")
.notes("Notes Alpha").transactionType("Approval").build();
var alphaTx3 = PaymentRequest.builder().paymentNumber("Pay-Alpha").amount(551).currency("USD")
.notes("Notes Alpha").transactionType("Notifications").build();
var betaTx1 = PaymentRequest.builder().paymentNumber("Pay-Beta").amount(552).currency("USD")
.notes("Notes Beta").transactionType("Budget").build();
var betaTx2 = PaymentRequest.builder().paymentNumber("Pay-Beta").amount(552).currency("USD")
.notes("Notes Beta").transactionType("Approval").build();
var betaTx3 = PaymentRequest.builder().paymentNumber("Pay-Beta").amount(552).currency("USD")
.notes("Notes Beta").transactionType("Notifications").build();
producer.send(alphaTx1);
producer.send(alphaTx2);
producer.send(alphaTx3);
producer.send(betaTx1);
producer.send(betaTx2);
producer.send(betaTx3);
producer.send(alphaTx2);
producer.send(betaTx3);
}
}
Consumer
CacheConfig.java
package com.course.kafka.config;
import java.time.Duration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.course.kafka.entity.PaymentRequestCacheKey;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
@Configuration
public class CacheConfig {
@Bean(name = "cachePurchaseRequest")
public Cache<Integer, Boolean> cachePurchaseRequest() {
return Caffeine.newBuilder().expireAfterWrite(Duration.ofMinutes(2)).maximumSize(1000).build();
}
@Bean(name = "cachePaymentRequest")
public Cache<PaymentRequestCacheKey, Boolean> cachePaymentRequest() {
return Caffeine.newBuilder().expireAfterWrite(Duration.ofMinutes(2)).maximumSize(1000).build();
}
}
PaymentRequestConsumer.java
package com.course.kafka.consumer;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.course.kafka.entity.PaymentRequest;
import com.course.kafka.entity.PaymentRequestCacheKey;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Cache;
@Service
public class PaymentRequestConsumer {
private static final Logger LOG = LoggerFactory.getLogger(PaymentRequestConsumer.class);
@Autowired
private ObjectMapper objectMapper;
@Autowired
@Qualifier("cachePaymentRequest")
private Cache<PaymentRequestCacheKey, Boolean> cache;
private boolean isExistsInCache(PaymentRequestCacheKey key) {
return Optional.ofNullable(cache.getIfPresent(key)).orElse(false);
}
@KafkaListener(topics = "t-payment-request")
public void consume(String message) throws JsonMappingException, JsonProcessingException {
var paymentRequest = objectMapper.readValue(message, PaymentRequest.class);
var cacheKey = new PaymentRequestCacheKey(paymentRequest.getPaymentNumber(), paymentRequest.getAmount(),
paymentRequest.getTransactionType());
var processed = isExistsInCache(cacheKey);
if (processed) {
return;
}
LOG.info("Processing {}", paymentRequest);
cache.put(cacheKey, true);
}
}
PaymentRequestCacheKey.java
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class PaymentRequestCacheKey {
private String paymentNumber;
private int amount;
private String transactionType;
}
Although if I run the producer again and tried to generate the same data, producer will not consume it again!