Project Reactor —

Prateek
6 min readJun 15, 2021

--

Reactor is a fourth-generation reactive library, based on the Reactive Streams specification, for building non-blocking applications on the JVM.

  • Reactive Core — Reactor is fully non-blocking & provides efficient demand management. It directly interacts with Java’sfunctional API, CompletableFuture, Stream and Duration.
  • Typed [0|1|N] — Reactor offers two reactive abd composable APIs, Flux[N] & Mono[0|1] which us extensively implement Reactive Extentions.
  • Non Blocking IO
    Well suited for a microservices architecture, Reactor offers backpressure-ready network engines for HTTP (Including Websockets), TCP and UDP.

Worth Going through — https://projectreactor.io/docs/core/release/reference/

Flux — The following image shows how a Flux transforms items:

A Flux<T> is a standard Publisher<T> that represents an asynchronous sequence of 0 to N emitted items, optionally terminated by either a completion signal or an error. As in the Reactive Streams spec, these three types of signal translate to calls to a downstream Subscriber’s onNext, onComplete, and onError methods.

With this large scope of possible signals, Flux is the general-purpose reactive type. Note that all events, even terminating ones, are optional: no onNext event but an onComplete event represents an empty finite sequence, but remove the onComplete and you have an infinite empty sequence (not particularly useful, except for tests around cancellation). Similarly, infinite sequences are not necessarily empty. For example, Flux.interval(Duration) produces a Flux<Long> that is infinite and emits regular ticks from a clock.

Mono — The following image shows how a Mono transforms an item:

A Mono<T> is a specialized Publisher<T> that emits at most one item via the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono).

Most Mono implementations are expected to immediately call onComplete on their Subscriber after having called onNext. Mono.never() is an outlier: it doesn’t emit any signal, which is not technically forbidden although not terribly useful outside of tests. On the other hand, a combination of onNext and onError is explicitly forbidden.

Mono offers only a subset of the operators that are available for a Flux, and some operators (notably those that combine the Mono with another Publisher) switch to a Flux. For example, Mono#concatWith(Publisher) returns a Flux while Mono#then(Mono) returns another Mono.

Note that you can use a Mono to represent no-value asynchronous processes that only have the concept of completion (similar to a Runnable). To create one, you can use an empty Mono<Void>.

@Test
void firstMono() {
Mono.just("A")
.log()
.subscribe();
}

[DEBUG] (main) Using Console logging
[ INFO] (main) | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(A)
[ INFO] (main) | onComplete()

@Test
void monoWithConsumer() {
Mono.just("A")
.log()
.subscribe(s -> System.out.println(s));
}

[DEBUG] (main) Using Console logging
[ INFO] (main) | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(A)
A
[ INFO] (main) | onComplete()

@Test
void monoWithDoOn() {
Mono.just("A")
.log()
.doOnSubscribe(subs -> System.out.println("## Subscribed: " + subs))
.doOnRequest(request -> System.out.println("## Request: " + request))
.doOnSuccess(complete -> System.out.println("## Complete: " + complete))
.subscribe(System.out::println);
}

[DEBUG] (main) Using Console logging
[ INFO] (main) | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
## Subscribed: reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber@5a63f509
## Request: 9223372036854775807
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(A)
## Complete: A
A
[ INFO] (main) | onComplete()

@Test
void emptyMono() {
Mono.empty()
.log()
.subscribe(System.out::println);
}

[DEBUG] (main) Using Console logging
[ INFO] (main) onSubscribe([Fuseable] Operators.EmptySubscription)
[ INFO] (main) request(unbounded)
[ INFO] (main) onComplete()

@Test
void emptyCompleteConsumerMono() {
Mono.empty()
.log()
.subscribe(System.out::println,
null,
() -> System.out.println("Done")
);
}

[DEBUG] (main) Using Console logging
[ INFO] (main) onSubscribe([Fuseable] Operators.EmptySubscription)
[ INFO] (main) request(unbounded)
[ INFO] (main) onComplete()
Done

@Test
void errorRuntimeExceptionMono() {
Mono.error(new RuntimeException())
.log()
.subscribe();
}
@Test
void errorConsumerMono() {
Mono.error(new Exception())
.log()
.subscribe(System.out::println,
e -> System.out.println("Error: " + e)
);
}
@Test
void errorDoOnErrorMono() {
Mono.error(new Exception())
.doOnError(e -> System.out.println("Error: " + e))
.log()
.subscribe();
}
@Test
void errorOnErrorResumeMono() {
Mono.error(new Exception())
.onErrorResume(e -> {
System.out.println("Caught: " + e);
return Mono.just("B");
})
.log()
.subscribe();
}

[DEBUG] (main) Using Console logging
[ INFO] (main) onSubscribe(FluxOnErrorResume.ResumeSubscriber)
[ INFO] (main) request(unbounded)
Caught: java.lang.Exception
[ INFO] (main) onNext(B)
[ INFO] (main) onComplete()

@Test
void errorOnErrorReturnMono() {
Mono.error(new Exception())
.onErrorReturn("B")
.log()
.subscribe();
}

[DEBUG] (main) Using Console logging
[ INFO] (main) onSubscribe(FluxOnErrorResume.ResumeSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(B)
[ INFO] (main) onComplete()

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

@Test
void firstFlux() {
Flux.just("A", "B", "C")
.log()
.subscribe();
}

[DEBUG] (main) Using Console logging
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(A)
[ INFO] (main) | onNext(B)
[ INFO] (main) | onNext(C)
[ INFO] (main) | onComplete()

@Test
void fluxFromIterable() {
Flux.fromIterable(Arrays.asList("A", "B", "C"))
.log()
.subscribe();
}

[DEBUG] (main) Using Console logging
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(A)
[ INFO] (main) | onNext(B)
[ INFO] (main) | onNext(C)
[ INFO] (main) | onComplete()

@Test
void fluxFromRange() {
Flux.range(10, 5)
.log()
.subscribe();
}

[DEBUG] (main) Using Console logging
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(10)
[ INFO] (main) | onNext(11)
[ INFO] (main) | onNext(12)
[ INFO] (main) | onNext(13)
[ INFO] (main) | onNext(14)
[ INFO] (main) | onComplete()

@Test
void fluxFromInterval() throws Exception {
Flux.interval(Duration.ofSeconds(1))
.log()
.take(10)
.subscribe();
Thread.sleep(3000);
}

[DEBUG] (main) Using Console logging
[ INFO] (main) onSubscribe(FluxInterval.IntervalRunnable)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) onNext(0)
[ INFO] (parallel-1) onNext(1)
[ INFO] (parallel-1) onNext(2)

@Test
void fluxRequest() {
Flux.range(1, 10)
.log()
.subscribe(null, null, null, s -> s.request(20));
}

[DEBUG] (main) Using Console logging
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(20)
[ INFO] (main) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (main) | onNext(4)
[ INFO] (main) | onNext(5)
[ INFO] (main) | onNext(6)
[ INFO] (main) | onNext(7)
[ INFO] (main) | onNext(8)
[ INFO] (main) | onNext(9)
[ INFO] (main) | onNext(10)
[ INFO] (main) | onComplete()

@Test
void fluxCustomSubscriber() {
Flux.range(1, 10)
.log()
.subscribe(new BaseSubscriber<Integer>() {
int elementsToProcess = 3;
int counter = 0;

public void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed!");
request(elementsToProcess);
}

public void hookOnNext(Integer value) {
counter++;
if (counter == elementsToProcess) {
counter = 0;

Random r = new Random();
elementsToProcess = r.ints(1, 4).findFirst().getAsInt();
request(elementsToProcess);
}
}
});
}

[DEBUG] (main) Using Console logging
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
Subscribed!
[ INFO] (main) | request(3)
[ INFO] (main) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(4)
[ INFO] (main) | request(3)
[ INFO] (main) | onNext(5)
[ INFO] (main) | onNext(6)
[ INFO] (main) | onNext(7)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(8)
[ INFO] (main) | request(2)
[ INFO] (main) | onNext(9)
[ INFO] (main) | onNext(10)
[ INFO] (main) | request(3)
[ INFO] (main) | onComplete()

@Test
void fluxLimitRate() {
Flux.range(1, 10)
.log()
.limitRate(3)
.subscribe();
}

[DEBUG] (main) Using Console logging
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(3)
[ INFO] (main) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (main) | request(3)
[ INFO] (main) | onNext(4)
[ INFO] (main) | onNext(5)
[ INFO] (main) | onNext(6)
[ INFO] (main) | request(3)
[ INFO] (main) | onNext(7)
[ INFO] (main) | onNext(8)
[ INFO] (main) | onNext(9)
[ INFO] (main) | request(3)
[ INFO] (main) | onNext(10)
[ INFO] (main) | onComplete()

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

@Test
void map() {
Flux.range(1, 5)
.map(i -> i * 10)
.subscribe(System.out::println);
}

[DEBUG] (main) Using Console logging
10
20
30
40
50

@Test
void flatMap() {
Flux.range(1, 5)
.flatMap(i -> Flux.range(i*10, 2))
.subscribe(System.out::println);
}

10
11
20
21
30
31
40
41
50
51

@Test
void flatMapMany() {
Mono.just(3)
.flatMapMany(i -> Flux.range(1, i))
.subscribe(System.out::println);
}

[DEBUG] (main) Using Console logging
1
2
3

@Test
void concat() throws InterruptedException {
Flux<Integer> oneToFive = Flux.range(1, 5)
.delayElements(Duration.ofMillis(200));
Flux<Integer> sixToTen = Flux.range(6, 5)
.delayElements(Duration.ofMillis(400));

Flux.concat(oneToFive, sixToTen)
.subscribe(System.out::println);


// oneToFive.concatWith(sixToTen)
// .subscribe(System.out::println);

Thread.sleep(4000);
}

[DEBUG] (main) Using Console logging
1
2
3
4
5
6
7
8
9
10

@Test
void merge() throws InterruptedException {
Flux<Integer> oneToFive = Flux.range(1, 5)
.delayElements(Duration.ofMillis(200));
Flux<Integer> sixToTen = Flux.range(6, 5)
.delayElements(Duration.ofMillis(400));

Flux.merge(oneToFive, sixToTen)
.subscribe(System.out::println);


// oneToFive.mergeWith(sixToTen)
// .subscribe(System.out::println);

Thread.sleep(4000);
}
@Test
void zip() {
Flux<Integer> oneToFive = Flux.range(1, 5);
Flux<Integer> sixToTen = Flux.range(6, 5);

Flux.zip(oneToFive, sixToTen,
(item1, item2) -> item1 + ", " + + item2)
.subscribe(System.out::println);


// oneToFive.zipWith(sixToTen)
// .subscribe(System.out::println);
}

[DEBUG] (main) Using Console logging
1, 6
2, 7
3, 8
4, 9
5, 10

--

--

Prateek
Prateek

Written by Prateek

Java Developer and enthusiast

No responses yet