Spring Boot Microservices — Part7 — Event Driven Using RabbitMQ

Prateek
4 min readMay 8, 2022

In this example, I will show you how to send messages using RabbitMQ when the Order is placed successfully.

notification-service

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.7</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>notification-service</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>notification-service</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>11</java.version>
<spring-cloud.version>2021.0.2</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

EmailSender.java

package com.example;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class EmailSender {

public void sendEmail(String orderNumber) {
log.info("Order Placed Successfully - Order Number is {}", orderNumber);
log.info("Email Sent For Order Id {}", orderNumber);
}
}

MainApp.java

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;

import java.util.function.Consumer;

@SpringBootApplication
@EnableEurekaClient
public class NotificationServiceApplication {

public static void main(String[] args) {
SpringApplication.run(NotificationServiceApplication.class, args);
}

@Bean
public Consumer<Message<String>> notificationEventSupplier() {
return stringMessage -> new EmailSender().sendEmail(stringMessage.getPayload());
}
}

application.properties

spring.application.name=notification-service
eureka.instance.instance-id=${spring.application.name}:${random.uuid}
server.port=0

#
spring.cloud.stream.bindings.notificationEventSupplier-in-0.destination=notification-events

Eureka Dashboard — http://localhost:8761/

order-service

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.7</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>order-service</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>order-service</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>11</java.version>
<spring-cloud.version>2021.0.2</spring-cloud.version>
</properties>


<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-circuitbreaker-resilience4j</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>

application.properties

#MySQL DB
spring.datasource.username=root
spring.datasource.password=password

# Need to look at the ways to

spring.application.name=order-service
server.port=0
eureka.instance.instance-id=${spring.application.name}:${random.uuid}
#spring.config.import=optional:configserver:http://localhost:8888?fail-fast=true&max-attempts=10&max-interval=1500
spring.config.import=optional:configserver:http://localhost:8888?fail-fast=true&max-attempts=10&max-interval=1500&vault://secret/order-service

management.endpoints.web.exposure.include=*

#RabbitMQ
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#
spring.cloud.stream.bindings.notificationEventSupplier-out-0.destination=notification-events
spring.cloud.stream.source=notificationEventSupplier

OrderConbtroller.java

package com.example.controller;

import com.example.client.InventoryClient;
import com.example.dto.OrderDto;
import com.example.model.Order;
import com.example.repository.OrderRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.circuitbreaker.resilience4j.Resilience4JCircuitBreaker;
import org.springframework.cloud.circuitbreaker.resilience4j.Resilience4JCircuitBreakerFactory;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

@RestController
@RequestMapping("/api/order")
@RequiredArgsConstructor
@Slf4j
public class OrderController {
private final OrderRepository orderRepository;
private final InventoryClient inventoryClient;
private final Resilience4JCircuitBreakerFactory circuitBreakerFactory;
private final StreamBridge streamBridge;
// private final ExecutorService traceableExecutorService;

@PostMapping
public String placeOrder(@RequestBody OrderDto orderDto) {
Resilience4JCircuitBreaker circuitBreaker = circuitBreakerFactory.create("inventory");
Supplier<Boolean> booleanSupplier = () -> orderDto.getOrderLineItemsList()
.stream()
.allMatch(orderLineItems -> inventoryClient.checkStock(orderLineItems.getSkuCode()));

boolean isProductInStock = circuitBreaker.run(booleanSupplier, throwable -> handleErrorCase());

if (isProductInStock) {
Order order = new Order();
order.setOrderLineItems(orderDto.getOrderLineItemsList());
order.setOrderNumber(UUID.randomUUID().toString());

orderRepository.save(order);
log.info("Sending Order Details to Notification Service");

//Send the Notication via RabbitMQ
streamBridge.send("notificationEventSupplier-out-0",
MessageBuilder.withPayload(order.getId()).build());

return "Order Placed Successfully!";
}else{
return "Order Failed !, One of the products in the order is not in the stock!";
}
}

private Boolean handleErrorCase() {
System.out.println("fallback called !!");
return false;
}
}

Order.java

@Entity
@Table(name = "t_orders")
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String orderNumber;
@OneToMany(cascade = CascadeType.ALL)
private List<OrderLineItems> orderLineItems;
}

OrderLineItems.java

@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@Entity
@Table(name = "order_line_items")
public class OrderLineItems {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String skuCode;
private BigDecimal price;
private Integer quantity;
}

OrderDto.java

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class OrderDto {
private List<OrderLineItems> orderLineItemsList;
}

InventoryClient.java

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

@FeignClient(name = "inventory-service")
public interface InventoryClient {
@GetMapping("/api/inventory/{skuCode}")
Boolean checkStock(@PathVariable String skuCode);
}

OrderRepository.java

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

@FeignClient(name = "inventory-service")
public interface InventoryClient {
@GetMapping("/api/inventory/{skuCode}")
Boolean checkStock(@PathVariable String skuCode);
}

MainApp.java

@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
public class OrderServiceApplication {

public static void main(String[] args) {
SpringApplication.run(OrderServiceApplication.class, args);
}

}

Now if I place the order successfully notification event will be triggered.

As you can see from the logs that Email Send that shows event is consume by the notification-service.

--

--