<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>kafka-ms-order</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-ms-order</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>
logging:
pattern:
console: "[Kafka Order] %clr(%d{HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:%5p}) %clr(---){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:%wEx}"

server
:
port: 9001

spring:
main:
banner-mode: OFF
h2:
console:
enabled: true
datasource:
url: jdbc:h2:mem:kafkaorderdb



kafka:
listener:
missing-topics-fatal: false
producer:
bootstrap-servers:
- localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: kafka-order-cg
enable-auto-commit: true
auto-offset-reset: earliest
bootstrap-servers:
- localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: com.course.kafka.broker.message
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
@Entity
@Table(name = "orders")
public class Order {

@Id
@GeneratedValue
private int orderId;

@Column
private String orderNumber;

@Column
private String orderLocation;

@Column
private LocalDateTime orderDateTime;

@Column
private String creditCardNumber;

@OneToMany(mappedBy = "order")
private List<OrderItem> items;
}
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
@Entity
@Table(name = "order_items")
public class OrderItem {

@Id
@GeneratedValue
private int orderItemId;

@Column
private String itemName;

@Column
private int price;

@Column
private int quantity;

@JoinColumn(name = "order_id")
@ManyToOne
private Order order;
}
@Repository
public interface OrderItemRepository extends CrudRepository<OrderItem, Integer> {
}
@Repository
public interface OrderRepository extends CrudRepository<Order, Integer> {
}
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaConfig {

@Bean
public NewTopic topicOrder() {
return TopicBuilder.name("t-commodity-order")
.partitions(2)
.replicas(1)
.build();
}

@Bean
public NewTopic topicOrderReply() {
return TopicBuilder.name("t-commodity-order-reply")
.partitions(2)
.replicas(1)
.build();
}
}
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class OrderRequest {
private String orderLocation;
private String creditCardNumber;
private List<OrderItemRequest> items;
}
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class OrderItemRequest {
private String itemName;
private int price;
private int quantity;
}
@AllArgsConstructor
@NoArgsConstructor
@Data
public class OrderResponse {
private String orderNumber;
}
import java.time.LocalDateTime;

import com.fasterxml.jackson.annotation.JsonFormat;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class OrderMessage {

private String orderLocation;
private String orderNumber;
private String creditCardNumber;

@JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss")
private LocalDateTime orderDateTime;

private String itemName;
private int price;
private int quantity;
}
@Service
@Slf4j
public class OrderProducer {

@Autowired
private KafkaTemplate<String, OrderMessage> kafkaTemplate;

public void publish(OrderMessage message) {
ProducerRecord<String, OrderMessage> buildProducerRecord = buildProducerRecord(message);

kafkaTemplate.send(buildProducerRecord)
.addCallback(new ListenableFutureCallback<SendResult<String, OrderMessage>>() {

@Override
public void onSuccess(SendResult<String, OrderMessage> result) {
log.info("Order {}, item {} published successfully", message.getOrderNumber(),
message.getItemName());
}

@Override
public void onFailure(Throwable ex) {
log.warn("Order {}, item {} failed to publish because {}", message.getOrderNumber(),
message.getItemName(), ex.getMessage());
}
});

log.info("Just a dummy message for order {}, item {}", message.getOrderNumber(), message.getItemName());
}

private ProducerRecord<String, OrderMessage> buildProducerRecord(OrderMessage message) {
int surpriseBonus = StringUtils.startsWithIgnoreCase(message.getOrderLocation(), "A") ? 25 : 15;
var headers = new ArrayList<Header>();
var surpriseBonusHeader = new RecordHeader("surpriseBonus", Integer.toString(surpriseBonus).getBytes());

headers.add(surpriseBonusHeader);

return new ProducerRecord<String, OrderMessage>("t-commodity-order", null, message.getOrderNumber(), message,
headers);
}
}
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.course.kafka.api.request.OrderItemRequest;
import com.course.kafka.api.request.OrderRequest;
import com.course.kafka.broker.message.OrderMessage;
import com.course.kafka.broker.producer.OrderProducer;
import com.course.kafka.entity.Order;
import com.course.kafka.entity.OrderItem;
import com.course.kafka.repository.OrderItemRepository;
import com.course.kafka.repository.OrderRepository;

@Component
public class OrderAction {
@Autowired
private OrderProducer orderProducer;

@Autowired
private OrderRepository orderRepository;

@Autowired
private OrderItemRepository orderItemRepository;

public Order convertToOrder(OrderRequest request) {
var result = new Order();
result.setCreditCardNumber(request.getCreditCardNumber());
result.setOrderLocation(request.getOrderLocation());
result.setOrderDateTime(LocalDateTime.now());
result.setOrderNumber(RandomStringUtils.randomAlphanumeric(8).toUpperCase());

List<OrderItem> items = request.getItems()
.stream()
.map(this::convertToOrderItem)
.collect(Collectors.toList());

items.forEach(item -> item.setOrder(result));

result.setItems(items);
return result;
}

private OrderItem convertToOrderItem(OrderItemRequest itemRequest) {
var result = new OrderItem();
result.setItemName(itemRequest.getItemName());
result.setPrice(itemRequest.getPrice());
result.setQuantity(itemRequest.getQuantity());
return result;
}

public void saveToDB(Order order) {
orderRepository.save(order);
order.getItems().forEach(orderItemRepository::save);
}

public void publishToKafka(OrderItem orderItem) {
var orderMessage = new OrderMessage();
orderMessage.setItemName(orderItem.getItemName());
orderMessage.setPrice(orderItem.getPrice());
orderMessage.setQuantity(orderItem.getQuantity());

var order = orderItem.getOrder();
orderMessage.setOrderDateTime(order.getOrderDateTime());
orderMessage.setOrderLocation(order.getOrderLocation());
orderMessage.setOrderNumber(order.getOrderNumber());
orderMessage.setCreditCardNumber(order.getCreditCardNumber());

// send message to kafka!
orderProducer.publish(orderMessage);
}
}
@Service
public class OrderService {

@Autowired
private OrderAction orderAction;

public String saveOrder(OrderRequest request) {
Order order = orderAction.convertToOrder(request);
orderAction.saveToDB(order);

// flatten message & publish
order.getItems()
.forEach(orderAction::publishToKafka);

return order.getOrderNumber();
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;

import com.course.kafka.api.request.OrderRequest;
import com.course.kafka.api.response.OrderResponse;
import com.course.kafka.command.service.OrderService;

@RestController
@RequestMapping("/api/order")
public class OrderApi {
@Autowired
private OrderService orderService;

@PostMapping(value = "")
public ResponseEntity<OrderResponse> createOrder(@RequestBody OrderRequest orderRequest){
String saveOrder = orderService.saveOrder(orderRequest);
OrderResponse orderResponse = new OrderResponse(saveOrder);
return ResponseEntity.ok().body(orderResponse);
}
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaMsOrderApplication {

public static void main(String[] args) {
SpringApplication.run(KafkaMsOrderApplication.class, args);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>kafka-ms-patterns</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-ms-patterns</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>
logging:
pattern:
console: "[Kafka Pattern] %clr(%d{HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:%5p}) %clr(---){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:%wEx}"

spring
:
main:
banner-mode: OFF
log-startup-info: false
kafka:
listener:
missing-topics-fatal: false
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: kafka-pattern-cg
enable-auto-commit: true
auto-offset-reset: earliest
bootstrap-servers:
- localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: com.course.kafka.broker.message
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

@Configuration
public class JsonConfig {

@Bean
public ObjectMapper objectMapper() {
var objectMapper = new ObjectMapper();
objectMapper.findAndRegisterModules();
objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);

return objectMapper;
}
}
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class OrderMessage {

private String orderLocation;
private String orderNumber;
private String creditCardNumber;

@JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss")
private LocalDateTime orderDateTime;

private String itemName;
private int price;
private int quantity;
}
@Service
@Slf4j
public class OrderConsumer {

@KafkaListener(topics = "t-commodity-order")
public void listen(OrderMessage message) {
var totalItemAmount = message.getPrice() * message.getQuantity();

log.info("Processing order {}, item {}, credit card number {}. Total amount for this item is {}",
message.getOrderNumber(), message.getItemName(), message.getCreditCardNumber(), totalItemAmount);
}
}
@SpringBootApplication
public class KafkaMsPatternsApplication {

public static void main(String[] args) {
SpringApplication.run(KafkaMsPatternsApplication.class, args);
}

}

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store