Kafka — Microservices Architecture Part-1
In this example, we’ll look at the ways to raise a message to Kafka when order is placed.
pom.xml — All dependencies needed for the application.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>kafka-ms-order</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-ms-order</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
application.yml
logging:
pattern:
console: "[Kafka Order] %clr(%d{HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:%5p}) %clr(---){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:%wEx}"
server:
port: 9001
spring:
main:
banner-mode: OFF
h2:
console:
enabled: true
datasource:
url: jdbc:h2:mem:kafkaorderdb
kafka:
listener:
missing-topics-fatal: false
producer:
bootstrap-servers:
- localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: kafka-order-cg
enable-auto-commit: true
auto-offset-reset: earliest
bootstrap-servers:
- localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: com.course.kafka.broker.message
Order.java — Model class for Order domain
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
@Entity
@Table(name = "orders")
public class Order {
@Id
@GeneratedValue
private int orderId;
@Column
private String orderNumber;
@Column
private String orderLocation;
@Column
private LocalDateTime orderDateTime;
@Column
private String creditCardNumber;
@OneToMany(mappedBy = "order")
private List<OrderItem> items;
}
OrderItem.java — Model class for OrderItem domain.
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
@Entity
@Table(name = "order_items")
public class OrderItem {
@Id
@GeneratedValue
private int orderItemId;
@Column
private String itemName;
@Column
private int price;
@Column
private int quantity;
@JoinColumn(name = "order_id")
@ManyToOne
private Order order;
}
OrderItemRepository.java
@Repository
public interface OrderItemRepository extends CrudRepository<OrderItem, Integer> {
}
OrderRepository.java
@Repository
public interface OrderRepository extends CrudRepository<Order, Integer> {
}
KafkaConfig.java — This is used to create the topics automatically.
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topicOrder() {
return TopicBuilder.name("t-commodity-order")
.partitions(2)
.replicas(1)
.build();
}
@Bean
public NewTopic topicOrderReply() {
return TopicBuilder.name("t-commodity-order-reply")
.partitions(2)
.replicas(1)
.build();
}
}
OrderRequest.java — Order Request model class.
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class OrderRequest {
private String orderLocation;
private String creditCardNumber;
private List<OrderItemRequest> items;
}
OrderItemRequest.java
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class OrderItemRequest {
private String itemName;
private int price;
private int quantity;
}
OrderResponse.java
@AllArgsConstructor
@NoArgsConstructor
@Data
public class OrderResponse {
private String orderNumber;
}
OrderMessage.java — Class which denotes the format of the message will be send to kafka.
import java.time.LocalDateTime;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class OrderMessage {
private String orderLocation;
private String orderNumber;
private String creditCardNumber;
@JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss")
private LocalDateTime orderDateTime;
private String itemName;
private int price;
private int quantity;
}
OrderProducer.java — Send/Produce the message.
@Service
@Slf4j
public class OrderProducer {
@Autowired
private KafkaTemplate<String, OrderMessage> kafkaTemplate;
public void publish(OrderMessage message) {
ProducerRecord<String, OrderMessage> buildProducerRecord = buildProducerRecord(message);
kafkaTemplate.send(buildProducerRecord)
.addCallback(new ListenableFutureCallback<SendResult<String, OrderMessage>>() {
@Override
public void onSuccess(SendResult<String, OrderMessage> result) {
log.info("Order {}, item {} published successfully", message.getOrderNumber(),
message.getItemName());
}
@Override
public void onFailure(Throwable ex) {
log.warn("Order {}, item {} failed to publish because {}", message.getOrderNumber(),
message.getItemName(), ex.getMessage());
}
});
log.info("Just a dummy message for order {}, item {}", message.getOrderNumber(), message.getItemName());
}
private ProducerRecord<String, OrderMessage> buildProducerRecord(OrderMessage message) {
int surpriseBonus = StringUtils.startsWithIgnoreCase(message.getOrderLocation(), "A") ? 25 : 15;
var headers = new ArrayList<Header>();
var surpriseBonusHeader = new RecordHeader("surpriseBonus", Integer.toString(surpriseBonus).getBytes());
headers.add(surpriseBonusHeader);
return new ProducerRecord<String, OrderMessage>("t-commodity-order", null, message.getOrderNumber(), message,
headers);
}
}
OrderAction.java
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.course.kafka.api.request.OrderItemRequest;
import com.course.kafka.api.request.OrderRequest;
import com.course.kafka.broker.message.OrderMessage;
import com.course.kafka.broker.producer.OrderProducer;
import com.course.kafka.entity.Order;
import com.course.kafka.entity.OrderItem;
import com.course.kafka.repository.OrderItemRepository;
import com.course.kafka.repository.OrderRepository;
@Component
public class OrderAction {
@Autowired
private OrderProducer orderProducer;
@Autowired
private OrderRepository orderRepository;
@Autowired
private OrderItemRepository orderItemRepository;
public Order convertToOrder(OrderRequest request) {
var result = new Order();
result.setCreditCardNumber(request.getCreditCardNumber());
result.setOrderLocation(request.getOrderLocation());
result.setOrderDateTime(LocalDateTime.now());
result.setOrderNumber(RandomStringUtils.randomAlphanumeric(8).toUpperCase());
List<OrderItem> items = request.getItems()
.stream()
.map(this::convertToOrderItem)
.collect(Collectors.toList());
items.forEach(item -> item.setOrder(result));
result.setItems(items);
return result;
}
private OrderItem convertToOrderItem(OrderItemRequest itemRequest) {
var result = new OrderItem();
result.setItemName(itemRequest.getItemName());
result.setPrice(itemRequest.getPrice());
result.setQuantity(itemRequest.getQuantity());
return result;
}
public void saveToDB(Order order) {
orderRepository.save(order);
order.getItems().forEach(orderItemRepository::save);
}
public void publishToKafka(OrderItem orderItem) {
var orderMessage = new OrderMessage();
orderMessage.setItemName(orderItem.getItemName());
orderMessage.setPrice(orderItem.getPrice());
orderMessage.setQuantity(orderItem.getQuantity());
var order = orderItem.getOrder();
orderMessage.setOrderDateTime(order.getOrderDateTime());
orderMessage.setOrderLocation(order.getOrderLocation());
orderMessage.setOrderNumber(order.getOrderNumber());
orderMessage.setCreditCardNumber(order.getCreditCardNumber());
// send message to kafka!
orderProducer.publish(orderMessage);
}
}
OrderService.java — Publish message to kafka!
@Service
public class OrderService {
@Autowired
private OrderAction orderAction;
public String saveOrder(OrderRequest request) {
Order order = orderAction.convertToOrder(request);
orderAction.saveToDB(order);
// flatten message & publish
order.getItems()
.forEach(orderAction::publishToKafka);
return order.getOrderNumber();
}
}
OrderApi.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import com.course.kafka.api.request.OrderRequest;
import com.course.kafka.api.response.OrderResponse;
import com.course.kafka.command.service.OrderService;
@RestController
@RequestMapping("/api/order")
public class OrderApi {
@Autowired
private OrderService orderService;
@PostMapping(value = "")
public ResponseEntity<OrderResponse> createOrder(@RequestBody OrderRequest orderRequest){
String saveOrder = orderService.saveOrder(orderRequest);
OrderResponse orderResponse = new OrderResponse(saveOrder);
return ResponseEntity.ok().body(orderResponse);
}
}
MainApp.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaMsOrderApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaMsOrderApplication.class, args);
}
}
Make the one more request
========
consumer
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>kafka-ms-patterns</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-ms-patterns</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
application.yml
logging:
pattern:
console: "[Kafka Pattern] %clr(%d{HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:%5p}) %clr(---){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:%wEx}"
spring:
main:
banner-mode: OFF
log-startup-info: false
kafka:
listener:
missing-topics-fatal: false
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: kafka-pattern-cg
enable-auto-commit: true
auto-offset-reset: earliest
bootstrap-servers:
- localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: com.course.kafka.broker.message
JsonConfig.java, OrderMessage will remain same.
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
@Configuration
public class JsonConfig {
@Bean
public ObjectMapper objectMapper() {
var objectMapper = new ObjectMapper();
objectMapper.findAndRegisterModules();
objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
return objectMapper;
}
}
OrderMessage.java
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class OrderMessage {
private String orderLocation;
private String orderNumber;
private String creditCardNumber;
@JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss")
private LocalDateTime orderDateTime;
private String itemName;
private int price;
private int quantity;
}
OrderConsumer.java
@Service
@Slf4j
public class OrderConsumer {
@KafkaListener(topics = "t-commodity-order")
public void listen(OrderMessage message) {
var totalItemAmount = message.getPrice() * message.getQuantity();
log.info("Processing order {}, item {}, credit card number {}. Total amount for this item is {}",
message.getOrderNumber(), message.getItemName(), message.getCreditCardNumber(), totalItemAmount);
}
}
MainApp.java
@SpringBootApplication
public class KafkaMsPatternsApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaMsPatternsApplication.class, args);
}
}