In this tutorial, I would demo Spring WebFlux Streaming Response (aka Server Sent Events) — a mechanism for pushing notifications or messages to the browsers from the back-end application in real time. These messages can be consumed by other services.
Spring WebFlux Streaming:
As old traditional architecture browsers or other service have to make a request to the server to fetch the latest information. When we have to do periodic polling like this, most of the times there might not be any updates! When we have multiple clients calling the server endpoint every second for updates when there are not much updates, it is simply wasting resources and makes unnecessary network calls.
Server Sent Events is a standard for transmitting data to the client applications using the persistent connection established between the client and the server. With Server Sent Events (SSE / Event Stream) approach, our server notifies the browser when the server has some updates in more efficient way.
This can be easily achieved by using Spring WebFlux Streaming response.
Product API: By Calling the endpoint every-time will result into random product details.
{
"productId": 5,
"productName": "Lightweight Aluminum Wallet",
"color": "orchid",
"promotionCode": "PremiumPrice585480",
"price": "24.23"
}
DataGenerator.java
package com.vinsguru.api;
import com.github.javafaker.Faker;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
@Service
public class DataGenerator {
private List<Product> products = new ArrayList<>();
@PostConstruct
public void getProducts(){
Faker faker = new Faker();
for (int i = 1; i <= 1000; i++) {
String productName = faker.commerce().productName();
String color = faker.commerce().color();
String promotionCode = faker.commerce().promotionCode();
String price = faker.commerce().price();
Product product = Product.builder()
.productId(Integer.valueOf(i))
.productName(productName)
.color(color)
.promotionCode(promotionCode)
.price(price)
.build();
products.add(product);
}
}
public Product findProduct(){
int i = ThreadLocalRandom.current().nextInt(1, 1000);
return products.get(i);
}
}
Product.java
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Product {
private Integer productId;
private String productName;
private String color;
private String promotionCode;
private String price;
}
ProductController.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("product")
public class ProductController {
@Autowired
private ProductService service;
@GetMapping()
public Mono<ResponseEntity<Product>> getProducts(){
return this.service.getProduct()
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
}
ProductService.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
public class ProductService {
@Autowired
private DataGenerator dataGenerator;
public Mono<Product> getProduct(){
return Mono.just(dataGenerator.findProduct());
}
}
Now, so far we’ve setup endpoint to generate the data and we’ve to call this endpoint and stream this data.
AppConfig.java
package com.vinsguru.sink;
import com.vinsguru.api.Product;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
@Configuration
public class AppConfig {
private static final String API_ENDPOINT = "http://localhost:8080/product";
@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl(API_ENDPOINT)
.build();
}
@Bean
public Sinks.Many<Product> sink() {
return Sinks.many().replay().latest();
}
@Bean
public Flux<Product> flux(Sinks.Many<Product> sink){
return sink.asFlux();
}
}
ProductPublisher.java
package com.vinsguru.sink;
import com.vinsguru.api.Product;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Sinks;
@Service
public class ProductPublisher {
@Autowired
private WebClient webClient;
@Autowired
private Sinks.Many<Product> sink;
@Scheduled(fixedRate = 3000)
public void publish(){
this.webClient
.get()
.retrieve()
.bodyToMono(Product.class)
.doOnNext(e -> System.out.println("Received : "+ e))
.subscribe(this.sink::tryEmitNext);
}
}
StreamingController.java
package com.vinsguru.sink;
import com.vinsguru.api.Product;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class StreamController {
@Autowired
private Flux<Product> flux;
@GetMapping(value = "product-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Product> getJoke(){
return flux;
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.vinsguru</groupId>
<artifactId>spring-sinks</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-sinks</name>
<description>Demo project for Spring Boot and Camel</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
By Calling the streaming endpoint — it will keep generating the items.
http://localhost:8080/product-stream
data:{"productId":172,"productName":"Awesome Iron Watch","color":"red","promotionCode":"GoodPromotion436640","price":"5.03"}
data:{"productId":965,"productName":"Durable Bronze Coat","color":"purple","promotionCode":"KillerSavings580237","price":"47.44"}
data:{"productId":930,"productName":"Practical Paper Shoes","color":"cyan","promotionCode":"PremiumPrice213627","price":"9.52"}
data:{"productId":696,"productName":"Incredible Granite Coat","color":"silver","promotionCode":"AmazingPromotion116038","price":"80.57"}
data:{"productId":69,"productName":"Incredible Steel Pants","color":"sky blue","promotionCode":"PremiumSale037340","price":"7.44"}
data:{"productId":912,"productName":"Sleek Marble Computer","color":"red","promotionCode":"AmazingDiscount022232","price":"24.95"}