Reactor is a fully non-blocking reactive programming foundation for the JVM, with efficient demand management (in the form of managing “backpressure”). It integrates directly with the Java 8 functional APIs, notably CompletableFuture
, Stream
, and Duration
. It offers composable asynchronous sequence APIs — Flux
(for [N] elements) and Mono
(for [0|1] elements) — and extensively implements the Reactive Streams specification.
Reactor also supports non-blocking inter-process communication with the reactor-netty
project. Suited for Microservices Architecture, Reactor Netty offers backpressure-ready network engines for HTTP (including Websockets), TCP, and UDP. Reactive encoding and decoding are fully supported.
Worth going through — https://projectreactor.io/docs/core/release/reference/#intro-reactive
Mono.Just :
import reactor.core.publisher.Mono;
public class MonoJust {
public static void main(String[] args) {
Mono.just(1)
.log()
.subscribe(i -> System.out.println("Item Received :" + i));
}
}
Output-
[ INFO] (main) | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
Item Received :1
[ INFO] (main) | onComplete()
— — — — — — — — — — — — — — — — — — — — — — — — — — —
Subscribe Consumer to this Mono that will respectively consume all the elements in the sequence, handle errors and react to completion. The subscription will request unbounded demand (Long.MAX_VALUE).
For a passive version that observe and forward incoming data see doOnSuccess(Consumer) and doOnError(Consumer).
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
public class MonoSubscribe {
public static void main(String[] args) {
Mono.just("Football")
.map(String::length)
.log()
.subscribe(o -> System.out.println("## Item Received : " + o),
e -> System.out.println("## ERROR Consumer : " + e.getMessage()),
() -> System.out.println("## Completed"));
}
}
[ INFO] (main) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(8)
## Item Received : 8
## Completed
[ INFO] (main) | onComplete()
— — — — — — — — — — — — — — — — — — — — — — — — — — — — —
MonoEmptyOrError: try various conditions within userDataService.
import com.rp.courseutil.Util;
import reactor.core.publisher.Mono;
public class MonoEmptyOrError {
public static void main(String[] args) {
userDataService(5)
.log()
.subscribe(o -> System.out.println("Item Received : " + o),
e -> System.out.println("ERROR Consumer : " + e.getMessage()),
() -> System.out.println("Complete Consumer"));
}
private static Mono<String> userDataService(int userId) {
if (userId == 1) {
return Mono.just(Util.faker().name().firstName());
} else if (userId == 2) {
return Mono.empty();
} else {
return Mono.error(new RuntimeException("Not in the allowed range"));
}
}
}
[ INFO] (main) onSubscribe([Fuseable] Operators.EmptySubscription)
[ INFO] (main) request(unbounded)
[ERROR] (main) onError(java.lang.RuntimeException: Not in the allowed range)
[ERROR] (main) — java.lang.RuntimeException: Not in the allowed range
java.lang.RuntimeException: Not in the allowed range
at com.rp.sec01.Lec04MonoEmptyOrError.userDataService(Lec04MonoEmptyOrError.java:22)
at com.rp.sec01.Lec04MonoEmptyOrError.main(Lec04MonoEmptyOrError.java:9)
ERROR Consumer : Not in the allowed range
— — — — — — — — — — — — — — — — — — — — — — — — — — — —
Mongo from Supplier & callable:
Supplier — Create a Mono
, producing its value using the provided Supplier
. If the Supplier resolves to null
, the resulting Mono completes empty.
Callable — Create a Mono producing its value using the provided Callable. If the Callable resolves to null, the resulting Mono completes empty.
import com.rp.courseutil.Util;
import reactor.core.publisher.Mono;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
public class Lec05MonoFromSupplier {
public static void main(String[] args) {
System.out.println("-------------------------");
Mono.fromSupplier(() -> getName()).subscribe(o -> System.out.println("Received : " + o));
Mono.fromCallable(() -> getName()).subscribe(o -> System.out.println("Received : " + o));
Mono.fromSupplier(() -> "Neha Parate").subscribe(o -> System.out.println("Received : " + o));
}
private static String getName() {
System.out.println("Generating name..");
return Util.faker().name().fullName();
}
}
Generating name..
Received : Alise Raynor
Generating name..
Received : Pauletta Metz
Received : Neha Parate
— — — — — — — — — — — — — — — — — — — — — — — — — — -
Supplier —
public static reactor.core.scheduler.Scheduler boundedElastic()
Scheduler that dynamically creates a bounded number of ExecutorService-based Workers, reusing them once the Workers have been shut down. The underlying daemon threads can be evicted if idle for more than 60 seconds.
The maximum number of created threads is bounded by a cap (by default ten times the number of available CPU cores, see DEFAULT_BOUNDED_ELASTIC_SIZE). The maximum number of task submissions that can be enqueued and deferred on each of these backing threads is bounded (by default 100K additional tasks, see DEFAULT_BOUNDED_ELASTIC_QUEUESIZE). Past that point, a RejectedExecutionException is thrown.
public class SupplierRefactoring {
public static void main(String[] args) {
String name = getName()
.subscribeOn(Schedulers.boundedElastic())
.block();
System.out.println(name);
}
private static Mono<String> getName() {
System.out.println("entered getName method");
return Mono.fromSupplier(() -> {
System.out.println("Generating name..");
Util.sleepSeconds(3);
return Util.faker().name().fullName();
}).map(String::toUpperCase);
}
}
entered getName method
Generating name..
MS. LEEANNE WINTHEISER
— — — — — — — — — — — — — — — — — — — — — — -
fromFuture — Create a Mono, producing its value using the provided CompletableFuture.
Note that the future is not cancelled when that Mono is cancelled, but that behavior can be obtained by using a doFinally(Consumer) that checks for a SignalType.CANCEL and calls CompletableFuture.cancel(boolean).
public class MonoFromFuture {
public static void main(String[] args) {
Mono.fromFuture(getName())
.subscribe(Util.onNext());
Util.sleepSeconds(1);
}
private static CompletableFuture<String> getName(){
return CompletableFuture.supplyAsync(() -> Util.faker().name().fullName());
}
}
Received : Flora Steuber
— — — — — — — — —
fromRunnable — Create a Mono that completes empty once the provided Runnable has been executed.
import com.rp.courseutil.Util;
import reactor.core.publisher.Mono;
public class MonoFromRunnable {
public static void main(String[] args) {
Mono.fromRunnable(timeConsumingProcess())
.subscribe(Util.onNext(), Util.onError(), () -> {
System.out.println("process is done. Sending emails...");
});
}
private static Runnable timeConsumingProcess() {
return () -> {
Util.sleepSeconds(3);
System.out.println("Operation completed");
};
}
}