Kafka — Itempotency Alternative

Prateek
2 min readMay 9, 2022

--

In this example, we’ll look at the alternative approach for idempotency, will only consider few fields to identify uniqueness.

Create Topic

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-payment-request
Created topic t-payment-request.

PaymentRequest.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class PaymentRequest {
private String paymentNumber;
private int amount;
private String currency;
private String notes;
private String transactionType;
}

PaymentRequestProducer.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.PaymentRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class PaymentRequestProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private ObjectMapper objectMapper;

public void send(PaymentRequest paymentRequest) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(paymentRequest);
kafkaTemplate.send("t-payment-request", paymentRequest.getPaymentNumber(), json);
}

}

MainApp.java

package com.course.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.course.kafka.entity.PaymentRequest;
import com.course.kafka.entity.PurchaseRequest;
import com.course.kafka.producer.PaymentRequestProducer;
import com.course.kafka.producer.PurchaseRequestProducer;

@SpringBootApplication
@EnableScheduling
public class KafkaProducerApplication implements CommandLineRunner {

public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}

@Autowired
private PaymentRequestProducer producer;

@Override
public void run(String... args) throws Exception {
var alphaTx1 = PaymentRequest.builder().paymentNumber("Pay-Alpha").amount(551).currency("USD")
.notes("Notes Alpha").transactionType("Budget").build();
var alphaTx2 = PaymentRequest.builder().paymentNumber("Pay-Alpha").amount(551).currency("USD")
.notes("Notes Alpha").transactionType("Approval").build();
var alphaTx3 = PaymentRequest.builder().paymentNumber("Pay-Alpha").amount(551).currency("USD")
.notes("Notes Alpha").transactionType("Notifications").build();

var betaTx1 = PaymentRequest.builder().paymentNumber("Pay-Beta").amount(552).currency("USD")
.notes("Notes Beta").transactionType("Budget").build();
var betaTx2 = PaymentRequest.builder().paymentNumber("Pay-Beta").amount(552).currency("USD")
.notes("Notes Beta").transactionType("Approval").build();
var betaTx3 = PaymentRequest.builder().paymentNumber("Pay-Beta").amount(552).currency("USD")
.notes("Notes Beta").transactionType("Notifications").build();

producer.send(alphaTx1);
producer.send(alphaTx2);
producer.send(alphaTx3);

producer.send(betaTx1);
producer.send(betaTx2);
producer.send(betaTx3);

producer.send(alphaTx2);
producer.send(betaTx3);
}
}

Consumer

CacheConfig.java

package com.course.kafka.config;

import java.time.Duration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.course.kafka.entity.PaymentRequestCacheKey;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

@Configuration
public class CacheConfig {

@Bean(name = "cachePurchaseRequest")
public Cache<Integer, Boolean> cachePurchaseRequest() {
return Caffeine.newBuilder().expireAfterWrite(Duration.ofMinutes(2)).maximumSize(1000).build();
}

@Bean(name = "cachePaymentRequest")
public Cache<PaymentRequestCacheKey, Boolean> cachePaymentRequest() {
return Caffeine.newBuilder().expireAfterWrite(Duration.ofMinutes(2)).maximumSize(1000).build();
}
}

PaymentRequestConsumer.java

package com.course.kafka.consumer;

import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.PaymentRequest;
import com.course.kafka.entity.PaymentRequestCacheKey;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Cache;

@Service
public class PaymentRequestConsumer {

private static final Logger LOG = LoggerFactory.getLogger(PaymentRequestConsumer.class);

@Autowired
private ObjectMapper objectMapper;

@Autowired
@Qualifier("cachePaymentRequest")
private Cache<PaymentRequestCacheKey, Boolean> cache;

private boolean isExistsInCache(PaymentRequestCacheKey key) {
return Optional.ofNullable(cache.getIfPresent(key)).orElse(false);
}

@KafkaListener(topics = "t-payment-request")
public void consume(String message) throws JsonMappingException, JsonProcessingException {
var paymentRequest = objectMapper.readValue(message, PaymentRequest.class);

var cacheKey = new PaymentRequestCacheKey(paymentRequest.getPaymentNumber(), paymentRequest.getAmount(),
paymentRequest.getTransactionType());
var processed = isExistsInCache(cacheKey);

if (processed) {
return;
}

LOG.info("Processing {}", paymentRequest);

cache.put(cacheKey, true);
}

}

PaymentRequestCacheKey.java

import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class PaymentRequestCacheKey {
private String paymentNumber;
private int amount;
private String transactionType;
}

Although if I run the producer again and tried to generate the same data, producer will not consume it again!

--

--