Spring WebFlux — WebSocket

Prateek
2 min readMay 5, 2022

In this example, we’ll look at the Spring WebFlux WebSocket communication using the real time client-server communication.

The client will send a message(s) to the server via WebSocket connection. The server will receive the string message and just modify it (ex: uppercase) it and respond to the client.

Spring WebClient Test: chrome-extension://fgponpodhbmadfljofbimhhlengambbn/index.html

WebFluxConfig.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;

import java.util.Map;

@Configuration
public class WebFluxConfig {

@Autowired
private MyWebFluxWebSocketHandler myWebFluxWebSocketHandler;

@Bean
public HandlerMapping handlerMapping() {
Map<String, MyWebFluxWebSocketHandler> handlerMap = Map.of("/uppercase", myWebFluxWebSocketHandler);
return new SimpleUrlHandlerMapping(handlerMap, 1);
}
}

WebSocketHandler: A WebSocketHandler must compose the inbound and outbound streams into a unified flow and return a Mono that reflects the completion of that flow. That means there is no need to check if the connection is open, since Reactive Streams signals will terminate activity. The inbound stream receives a completion/error signal, and the outbound stream receives a cancellation signal.

WebSocketSession: Represents a WebSocket session.
Use session.receive() to compose on the inbound message stream, and session.send(publisher) to provide the outbound message stream.

WebSocketMessage: Representation of a WebSocket message.
See static factory methods in WebSocketSession for creating messages with the org.springframework.core.io.buffer.DataBufferFactory for the session.

MyWebFluxWebSocketHandler.java

import org.springframework.stereotype.Service;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class MyWebFluxWebSocketHandler implements WebSocketHandler {

@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> webSocketMessageFlux = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(String::toUpperCase)
.map(session::textMessage);

return session.send(webSocketMessageFlux);
}
}

MainApp.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringWebfluxWebsocketApplication {

public static void main(String[] args) {
SpringApplication.run(SpringWebfluxWebsocketApplication.class, args);
}

}

WebClientTest.java

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import reactor.core.publisher.Flux;

import java.net.URI;
import java.time.Duration;

@SpringBootTest
public class WebClientTest {
@Test
public void testWebClientEndpoint() {
WebSocketClient client = new ReactorNettyWebSocketClient();
URI uri = URI.create("ws://localhost:8080/uppercase");

Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1));
client.execute(uri, webSocketSession -> webSocketSession
.send(longFlux.map(i -> webSocketSession.textMessage("Laxmi " + i)))
.and(webSocketSession.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(System.out::println))
.then()
).block(Duration.ofSeconds(5));
}
}

--

--