Spring WebFlux — Functional API

In this tutorials, we’ll learn how to developed the Functional API with Spring WebFlux.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>product-api-functional</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>product-api-functional</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-webflux-ui</artifactId>
<version>1.5.8</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>

Product.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
@Document
public class Product {
@Id
private String id;
private String name;
private Double price;
}

ProductEvent.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class ProductEvent {
private Long eventId;
private String eventType;
}

ProductRepository.java

import com.example.productapifunctional.model.Product;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

public interface ProductRepository extends ReactiveMongoRepository<Product, String> {
}

ProductHandler.java

import com.example.productapifunctional.model.Product;
import com.example.productapifunctional.model.ProductEvent;
import com.example.productapifunctional.repository.ProductRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;

@Component
public class ProductHandler {
@Autowired
private ProductRepository productRepository;

public Mono<ServerResponse> getAllProducts(ServerRequest serverRequest) {
Flux<Product> products = this.productRepository.findAll();
return ServerResponse
.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(products, Product.class);
}

public Mono<ServerResponse> getProduct(ServerRequest serverRequest) {
String id = serverRequest.pathVariable("id");
Mono<Product> productMono = this.productRepository.findById(id);

return productMono.flatMap(product -> ServerResponse
.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(product)))
.switchIfEmpty(ServerResponse.notFound().build());
}

public Mono<ServerResponse> saveProduct(ServerRequest request) {
Mono<Product> productMono = request.bodyToMono(Product.class);

return productMono.flatMap(product -> ServerResponse
.status(HttpStatus.CREATED)
.contentType(MediaType.APPLICATION_JSON)
.body(productRepository.save(product), Product.class));
}


public Mono<ServerResponse> updateProduct(ServerRequest request) {
String id = request.pathVariable("id");
Mono<Product> existingProductMono = this.productRepository.findById(id);
Mono<Product> productMono = request.bodyToMono(Product.class);

Mono<ServerResponse> notFound = ServerResponse.notFound().build();

return productMono.zipWith(existingProductMono,
(product, existingProduct) -> new Product(existingProduct.getId(), product.getName(),
product.getPrice()))
.flatMap(product -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(productRepository.save(product),
Product.class))
.switchIfEmpty(notFound);
}

public Mono<ServerResponse> deleteProduct(ServerRequest request) {
String id = request.pathVariable("id");

Mono<Product> productMono = this.productRepository.findById(id);
Mono<ServerResponse> notFound = ServerResponse.notFound().build();

return productMono.flatMap(existingProduct -> ServerResponse.ok().build(productRepository.delete(existingProduct)))
.switchIfEmpty(notFound);
}

public Mono<ServerResponse> deleteAllProducts(ServerRequest request) {
return ServerResponse.ok().build(productRepository.deleteAll());
}

public Mono<ServerResponse> getProductEvents(ServerRequest request) {
Flux<ProductEvent> eventsFlux = Flux.interval(Duration.ofSeconds(1))
.map(val -> new ProductEvent(val, "Product Event"));

return ServerResponse
.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(eventsFlux, ProductEvent.class);
}
}

MainApp.java

import com.example.productapifunctional.handler.ProductHandler;
import com.example.productapifunctional.model.Product;
import com.example.productapifunctional.repository.ProductRepository;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;

import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.http.MediaType.TEXT_EVENT_STREAM;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@SpringBootApplication
public class ProductApiFunctionalApplication {

public static void main(String[] args) {
SpringApplication.run(ProductApiFunctionalApplication.class, args);
}

@Bean
CommandLineRunner init(ProductRepository productRepository) {
return args -> {
Flux<Product> productFlux = Flux.just(new Product(null, "Big Latte", 2.99),
new Product(null, "Big Decaf", 2.49),
new Product(null, "Green team", 1.99))
.flatMap(productRepository::save);

productRepository.deleteAll()
.thenMany(productFlux)
.thenMany(productRepository.findAll())
.subscribe(System.out::println);

};
}


@Bean
RouterFunction<ServerResponse> routes(ProductHandler handler) {
// return route(GET("/products").and(accept(APPLICATION_JSON)),
// handler::getAllProducts)
// .andRoute(POST("/products").and(contentType(APPLICATION_JSON)),
// handler::saveProduct)
// .andRoute(DELETE("/products").and(accept(APPLICATION_JSON)),
// handler::deleteAllProducts)
// .andRoute(GET("/products/events").and(accept(TEXT_EVENT_STREAM)),
// handler::getProductEvents)
// .andRoute(GET("/products/{id}").and(accept(APPLICATION_JSON)),
// handler::getProduct)
// .andRoute(PUT("/products/{id}").and(contentType(APPLICATION_JSON)),
// handler::updateProduct)
// .andRoute(DELETE("/products/{id}").and(accept(APPLICATION_JSON)),
// handler::deleteProduct);

return nest(path("/products"),
nest(accept(APPLICATION_JSON).or(contentType(APPLICATION_JSON)).or(accept(TEXT_EVENT_STREAM)),
route(GET("/"), handler::getAllProducts)
.andRoute(method(HttpMethod.POST), handler::saveProduct)
.andRoute(DELETE("/"), handler::deleteAllProducts)
.andRoute(GET("/events"), handler::getProductEvents).andNest(path("/{id}"),
route(method(HttpMethod.GET), handler::getProduct)
.andRoute(method(HttpMethod.PUT), handler::updateProduct)
.andRoute(method(HttpMethod.DELETE), handler::deleteProduct))));
}
}
  • Add Product
  • Get All Products Again
  • Update Product
  • Delete Product

Get Product Events:

http://localhost:8080/products/events

data:{"eventId":0,"eventType":"Product Event"}

data:{"eventId":1,"eventType":"Product Event"}

data:{"eventId":2,"eventType":"Product Event"}

data:{"eventId":3,"eventType":"Product Event"}

data:{"eventId":4,"eventType":"Product Event"}

data:{"eventId":5,"eventType":"Product Event"}

data:{"eventId":6,"eventType":"Product Event"}

Output —

package com.example.productapifunctional;

import com.example.productapifunctional.model.Product;
import com.example.productapifunctional.model.ProductEvent;
import org.springframework.http.ResponseEntity;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class WebClientAPI {

private WebClient webClient;

public WebClientAPI() {
this.webClient = WebClient.create("http://localhost:8080/products/").mutate().build();
}

private Mono<ResponseEntity<Product>> postNewProduct() {
return webClient.post()
.body(Mono.just(new Product(null, "Basundi", 4.99)), Product.class)
.exchangeToMono(clientResponse -> clientResponse.toEntity(Product.class))
.doOnSuccess(o -> System.out.println("****** POST " + o));
}

private Flux<Product> getAllProducts() {
return webClient.get()
.retrieve()
.bodyToFlux(Product.class)
.doOnNext(o -> System.out.println("**** GET " + o));
}

private Mono<Product> updateProduct(String id, String name, double price) {
return webClient
.put()
.uri("/{id}", id)
.body(Mono.just(new Product(id, name, price)), Product.class)
.retrieve()
.bodyToMono(Product.class)
.doOnSuccess(o -> System.out.println("***** UPDATE : " + o));
}

private Mono<Void> deleteProduct(String id) {
return webClient
.delete()
.uri("/{id}", id)
.retrieve()
.bodyToMono(Void.class)
.doOnSuccess(o -> System.out.println("***** DELETE : " + o));

}

private Flux<ProductEvent> getAllEvents() {
return webClient
.get()
.uri("/events")
.retrieve()
.bodyToFlux(ProductEvent.class);
}

public static void main(String[] args) {
WebClientAPI webClientAPI = new WebClientAPI();
webClientAPI
.postNewProduct()
.thenMany(webClientAPI.getAllProducts())
.take(1)
.flatMap(p -> webClientAPI.updateProduct(p.getId(), "White Tea", 0.99))
.flatMap(p -> webClientAPI.deleteProduct(p.getId()))
.thenMany(webClientAPI.getAllProducts())
.thenMany(webClientAPI.getAllEvents())
.subscribe(System.out::println);

try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store