pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>kafka-ms-order</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-ms-order</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

PromotionRequest.java

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class PromotionRequest {
private String promotionCode;
}

PromotionAction.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.course.kafka.api.request.PromotionRequest;
import com.course.kafka.broker.message.PromotionMessage;
import com.course.kafka.broker.producer.PromotionProducer;

@Component
public class PromotionAction {
@Autowired
private PromotionProducer producer;

public void publishToKafka(PromotionRequest request) {
var message = new PromotionMessage(request.getPromotionCode());
producer.publish(message);
}
}

PromotionService.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.course.kafka.api.request.PromotionRequest;
import com.course.kafka.command.action.PromotionAction;

@Service
public class PromotionService {
@Autowired
private PromotionAction action;

public void createPromotion(PromotionRequest request) {
action.publishToKafka(request);
}
}

PromotionProducer.java

import java.util.concurrent.ExecutionException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import com.course.kafka.broker.message.PromotionMessage;

@Service
public class PromotionProducer {

private static final Logger LOG = LoggerFactory.getLogger(PromotionProducer.class);

@Autowired
private KafkaTemplate<String, PromotionMessage> kafkaTemplate;

public void publish(PromotionMessage message) {
try {
var sendResult = kafkaTemplate.send("t-commodity-promotion", message).get();
LOG.info("Send result success for message {}", sendResult.getProducerRecord().value());
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error publishing {}, because {}", message, e.getMessage());
}
}
}

PromotionApi.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.course.kafka.api.request.PromotionRequest;
import com.course.kafka.command.service.PromotionService;

@RestController
@RequestMapping("/api/promotion")
public class PromotionApi {

@Autowired
private PromotionService service;

@PostMapping(value = "")
public ResponseEntity<String> create(@RequestBody PromotionRequest request) {
service.createPromotion(request);

return ResponseEntity.status(HttpStatus.CREATED).body(request.getPromotionCode());
}
}

Create Topic -

kafka-topics — bootstrap-server localhost:9092 — create — partitions 1 — replication-factor 1 — topic t-commodity-promotionCreated topic t-commodity-promotion.

Consumer

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>kafka-ms-storage</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-ms-storage</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

application.yml

logging:
pattern:
console: "[Kafka Storage] %clr(%d{HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:%5p}) %clr(---){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:%wEx}"

spring
:
main:
banner-mode: OFF
log-startup-info: false
kafka:
listener:
missing-topics-fatal: false
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: kafka-storage-cg
enable-auto-commit: true
auto-offset-reset: earliest
bootstrap-servers:
- localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: com.course.kafka.broker.message
value:
default:
type: java.lang.Object

OrderMessage.java

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class OrderMessage {

private String orderLocation;
private String orderNumber;
private String creditCardNumber;

@JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss")
private LocalDateTime orderDateTime;

private String itemName;
private int price;
private int quantity;
}

DiscountMessage.java

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class DiscountMessage {
private String discountCode;
private int discountPercentage;
}

PromotionMessage.java

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class PromotionMessage {
private String promotionCode;
}

PromotionConsumer.java

@Service
@KafkaListener(topics = "t-commodity-promotion")
@Slf4j
public class PromotionConsumer {
@KafkaHandler
public void listenPromotion(PromotionMessage message) {
log.info("Processing promotion : {}", message);
}

@KafkaHandler
public void listenDiscount(DiscountMessage message) {
log.info("Processing discount : {}", message);
}
}

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store