Spring WebFlux — Functional API

In this tutorials, we’ll learn how to developed the Functional API with Spring WebFlux.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>product-api-functional</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>product-api-functional</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-webflux-ui</artifactId>
<version>1.5.8</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>

Product.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
@Document
public class Product {
@Id
private String id;
private String name;
private Double price;
}

ProductEvent.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class ProductEvent {
private Long eventId;
private String eventType;
}

ProductRepository.java

import com.example.productapifunctional.model.Product;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

public interface ProductRepository extends ReactiveMongoRepository<Product, String> {
}

ProductHandler.java

import com.example.productapifunctional.model.Product;
import com.example.productapifunctional.model.ProductEvent;
import com.example.productapifunctional.repository.ProductRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;

@Component
public class ProductHandler {
@Autowired
private ProductRepository productRepository;

public Mono<ServerResponse> getAllProducts(ServerRequest serverRequest) {
Flux<Product> products = this.productRepository.findAll();
return ServerResponse
.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(products, Product.class);
}

public Mono<ServerResponse> getProduct(ServerRequest serverRequest) {
String id = serverRequest.pathVariable("id");
Mono<Product> productMono = this.productRepository.findById(id);

return productMono.flatMap(product -> ServerResponse
.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(product)))
.switchIfEmpty(ServerResponse.notFound().build());
}

public Mono<ServerResponse> saveProduct(ServerRequest request) {
Mono<Product> productMono = request.bodyToMono(Product.class);

return productMono.flatMap(product -> ServerResponse
.status(HttpStatus.CREATED)
.contentType(MediaType.APPLICATION_JSON)
.body(productRepository.save(product), Product.class));
}


public Mono<ServerResponse> updateProduct(ServerRequest request) {
String id = request.pathVariable("id");
Mono<Product> existingProductMono = this.productRepository.findById(id);
Mono<Product> productMono = request.bodyToMono(Product.class);

Mono<ServerResponse> notFound = ServerResponse.notFound().build();

return productMono.zipWith(existingProductMono,
(product, existingProduct) -> new Product(existingProduct.getId(), product.getName(),
product.getPrice()))
.flatMap(product -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(productRepository.save(product),
Product.class))
.switchIfEmpty(notFound);
}

public Mono<ServerResponse> deleteProduct(ServerRequest request) {
String id = request.pathVariable("id");

Mono<Product> productMono = this.productRepository.findById(id);
Mono<ServerResponse> notFound = ServerResponse.notFound().build();

return productMono.flatMap(existingProduct -> ServerResponse.ok().build(productRepository.delete(existingProduct)))
.switchIfEmpty(notFound);
}

public Mono<ServerResponse> deleteAllProducts(ServerRequest request) {
return ServerResponse.ok().build(productRepository.deleteAll());
}

public Mono<ServerResponse> getProductEvents(ServerRequest request) {
Flux<ProductEvent> eventsFlux = Flux.interval(Duration.ofSeconds(1))
.map(val -> new ProductEvent(val, "Product Event"));

return ServerResponse
.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(eventsFlux, ProductEvent.class);
}
}

MainApp.java

import com.example.productapifunctional.handler.ProductHandler;
import com.example.productapifunctional.model.Product;
import com.example.productapifunctional.repository.ProductRepository;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;

import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.http.MediaType.TEXT_EVENT_STREAM;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@SpringBootApplication
public class ProductApiFunctionalApplication {

public static void main(String[] args) {
SpringApplication.run(ProductApiFunctionalApplication.class, args);
}

@Bean
CommandLineRunner init(ProductRepository productRepository) {
return args -> {
Flux<Product> productFlux = Flux.just(new Product(null, "Big Latte", 2.99),
new Product(null, "Big Decaf", 2.49),
new Product(null, "Green team", 1.99))
.flatMap(productRepository::save);

productRepository.deleteAll()
.thenMany(productFlux)
.thenMany(productRepository.findAll())
.subscribe(System.out::println);

};
}


@Bean
RouterFunction<ServerResponse> routes(ProductHandler handler) {
// return route(GET("/products").and(accept(APPLICATION_JSON)),
// handler::getAllProducts)
// .andRoute(POST("/products").and(contentType(APPLICATION_JSON)),
// handler::saveProduct)
// .andRoute(DELETE("/products").and(accept(APPLICATION_JSON)),
// handler::deleteAllProducts)
// .andRoute(GET("/products/events").and(accept(TEXT_EVENT_STREAM)),
// handler::getProductEvents)
// .andRoute(GET("/products/{id}").and(accept(APPLICATION_JSON)),
// handler::getProduct)
// .andRoute(PUT("/products/{id}").and(contentType(APPLICATION_JSON)),
// handler::updateProduct)
// .andRoute(DELETE("/products/{id}").and(accept(APPLICATION_JSON)),
// handler::deleteProduct);

return nest(path("/products"),
nest(accept(APPLICATION_JSON).or(contentType(APPLICATION_JSON)).or(accept(TEXT_EVENT_STREAM)),
route(GET("/"), handler::getAllProducts)
.andRoute(method(HttpMethod.POST), handler::saveProduct)
.andRoute(DELETE("/"), handler::deleteAllProducts)
.andRoute(GET("/events"), handler::getProductEvents).andNest(path("/{id}"),
route(method(HttpMethod.GET), handler::getProduct)
.andRoute(method(HttpMethod.PUT), handler::updateProduct)
.andRoute(method(HttpMethod.DELETE), handler::deleteProduct))));
}
}
  • Add Product
  • Get All Products Again
  • Update Product
  • Delete Product

Get Product Events:

http://localhost:8080/products/events

data:{"eventId":0,"eventType":"Product Event"}

data:{"eventId":1,"eventType":"Product Event"}

data:{"eventId":2,"eventType":"Product Event"}

data:{"eventId":3,"eventType":"Product Event"}

data:{"eventId":4,"eventType":"Product Event"}

data:{"eventId":5,"eventType":"Product Event"}

data:{"eventId":6,"eventType":"Product Event"}

Output —

package com.example.productapifunctional;

import com.example.productapifunctional.model.Product;
import com.example.productapifunctional.model.ProductEvent;
import org.springframework.http.ResponseEntity;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class WebClientAPI {

private WebClient webClient;

public WebClientAPI() {
this.webClient = WebClient.create("http://localhost:8080/products/").mutate().build();
}

private Mono<ResponseEntity<Product>> postNewProduct() {
return webClient.post()
.body(Mono.just(new Product(null, "Basundi", 4.99)), Product.class)
.exchangeToMono(clientResponse -> clientResponse.toEntity(Product.class))
.doOnSuccess(o -> System.out.println("****** POST " + o));
}

private Flux<Product> getAllProducts() {
return webClient.get()
.retrieve()
.bodyToFlux(Product.class)
.doOnNext(o -> System.out.println("**** GET " + o));
}

private Mono<Product> updateProduct(String id, String name, double price) {
return webClient
.put()
.uri("/{id}", id)
.body(Mono.just(new Product(id, name, price)), Product.class)
.retrieve()
.bodyToMono(Product.class)
.doOnSuccess(o -> System.out.println("***** UPDATE : " + o));
}

private Mono<Void> deleteProduct(String id) {
return webClient
.delete()
.uri("/{id}", id)
.retrieve()
.bodyToMono(Void.class)
.doOnSuccess(o -> System.out.println("***** DELETE : " + o));

}

private Flux<ProductEvent> getAllEvents() {
return webClient
.get()
.uri("/events")
.retrieve()
.bodyToFlux(ProductEvent.class);
}

public static void main(String[] args) {
WebClientAPI webClientAPI = new WebClientAPI();
webClientAPI
.postNewProduct()
.thenMany(webClientAPI.getAllProducts())
.take(1)
.flatMap(p -> webClientAPI.updateProduct(p.getId(), "White Tea", 0.99))
.flatMap(p -> webClientAPI.deleteProduct(p.getId()))
.thenMany(webClientAPI.getAllProducts())
.thenMany(webClientAPI.getAllEvents())
.subscribe(System.out::println);

try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

--

--

--

Java Developer and enthusiast

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

5 mistakes that beginner software developers face when learning to code.

Story Of “Hello World”

A Holographic SDK to Rule Them All

3 Tips to Control the Cost of AWS Lambda

Install the Fastai Requirements on Windows

Pyramid Maker

Python — Quick Search

The Number 1 Attribute For Front End Web Developers!

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
PA

PA

Java Developer and enthusiast

More from Medium

Spring Data JDBC: Implementing Domain Driven Design Aggregate

Java — Overview

How to Get and Validate User Token Issued by Azure AD B2C via Java

Spring WebFlux — Rest API with Annotated Controllers