ProjectReactor — Handle

Prateek
4 min readMar 2, 2022

--

public static Flux<Integer> range( int start, int count )
Build a Flux that will only emit a sequence of count incrementing integers, starting from start. That is, emit integers between start (included) and start + count (excluded) then complete.

public final <R> Flux<R> handle( @NotNull java.util.function.BiConsumer<? super T, reactor.core.publisher.SynchronousSink<R>> handler )
Handle the items emitted by this Flux by calling a biconsumer with the output sink for each onNext. At most one SynchronousSink.next(Object) call must be performed and/or 0 or 1 SynchronousSink.error(Throwable) or SynchronousSink.complete().

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;

public class Handle {
public static void main(String[] args) {
Flux.range(1, 20)
.handle((integer, synchronousSink) -> {
if(integer == 9)
synchronousSink.complete();
else
synchronousSink.next(integer);

})
.subscribe(Util.subscriber());
}
}

Received : 1
Received : 2
Received : 3
Received : 4
Received : 5
Received : 6
Received : 7
Received : 8
Completed

— — — — — — — — — — — — — — — — — — — — — — — —

public static <T> Flux<T> generate(
@NotNulljava.util.function.Consumer<reactor.core.publisher.SynchronousSink<T>> generator)

Programmatically create a Flux by generating signals one-by-one via a consumer callback.

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;

public class HandleAssignment {

public static void main(String[] args) {
Flux.generate(synchronousSink -> synchronousSink.next(Util.faker().country().name()))
.map(Object::toString)
.handle((s, synchronousSink) -> {
synchronousSink.next(s);
if(s.equalsIgnoreCase("india"))
synchronousSink.complete();
})
.subscribe(Util.subscriber());

}
}

Received : Congo, Democratic Republic of the
Received : Ghana
Received : Burkina Faso
Received : Papua New Guinea
Received : Tunisia
Received : San Marino
Received : India
Completed

— — — — — — — — — — — — — — — — — — — — — — — — — — — —

public final Flux<T> limitRate( int highTide, int lowTide )
Ensure that backpressure signals from downstream subscribers are split into batches capped at the provided highTide first, then replenishing at the provided lowTide, effectively rate limiting the upstream Publisher.
Note that this is an upper bound, and that this operator uses a prefetch-and-replenish strategy, requesting a replenishing amount when 75% of the prefetch amount has been emitted.
Typically used for scenarios where consumer(s) request a large amount of data (eg. Long.MAX_VALUE) but the data source behaves better or can be optimized with smaller requests (eg. database paging, etc…). All data is still processed, unlike with limitRequest(long) which will cap the grand total request amount.
Similar to flux.publishOn(Schedulers.immediate(),prefetchRate).subscribe() , except with a customized “low tide” instead of the default 75%. Note that the smaller the lowTide is, the higher the potential for concurrency between request and data production. And thus the more extraneous replenishment requests this operator could make. For example, for a global downstream request of 14, with a highTide of 10 and a lowTide of 2, the operator would perform low tide requests (request(2)) seven times in a row, whereas with the default lowTide of 8 it would only perform one low tide request (request(8)). Using a lowTide equal to highTide reverts to the default 75% strategy, while using a lowTide of 0 disables the lowTide, resulting in all requests strictly adhering to the highTide.

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;

public class LimitRate {
public static void main(String[] args) {
Flux.range(1, 1000) // 175
.log()
.limitRate(100, 10) // 75%
.subscribe(Util.subscriber());
}
}

— — — — — — — — — — — — — — — — — — — — — — — — —

public final Flux<T> onErrorReturn( @NotNull T fallbackValue )
Simply emit a captured fallback value when any error is observed on this Flux.

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class OnError {

public static void main(String[] args) {
Flux.range(1, 10)
.map(i -> 10 / (5 - i))
.onErrorReturn(-1)
.onErrorResume(e -> fallback())
// .onErrorContinue((err, obj) -> {
// System.out.println(err);
// })
.subscribe(Util.subscriber());
}

private static Mono<Integer> fallback() {
return Mono.fromSupplier(() -> Util.faker().random().nextInt(100, 200));
}
}

Received : 2
Received : 3
Received : 5
Received : 10
Received : -1
Completed

— — — —

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;

import java.time.Duration;

public class Timeout {
public static void main(String[] args) {
getOrderNumbers()
.timeout(Duration.ofSeconds(2), fallback())
.subscribe(Util.subscriber());
Util.sleepSeconds(60);
}

private static Flux<Integer> getOrderNumbers() {
return Flux.range(1, 10)
.delayElements(Duration.ofSeconds(5));
}

private static Flux<Integer> fallback() {
return Flux.range(100, 10)
.delayElements(Duration.ofSeconds(5));
}
}

Received : 100
Received : 101
Received : 102
Received : 103
Received : 104
Received : 105
Received : 106

— — — — — — — —

public final Flux<T> defaultIfEmpty(@NotNull T defaultV )
Provide a default unique value if this sequence is completed without any data.

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;

public class DefaultIfEmpty {
public static void main(String[] args) {
getOrderNumbers()
.filter(i -> i > 10)
.defaultIfEmpty(-100)
.subscribe(Util.subscriber());
}

private static Flux<Integer> getOrderNumbers() {
return Flux.range(1, 12);
}
}

Received : 11
Received : 12
Completed

— — — — — — — — — — — — — — — — — — — — — — — — — — —

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;

public class Lec09SwitchIfEmpty {
public static void main(String[] args) {
getOrderNumbers()
.filter(i -> i > 10)
.switchIfEmpty(fallback())
.subscribe(Util.subscriber());

}

// redis cache / db
private static Flux<Integer> getOrderNumbers(){
return Flux.range(1, 10);
}

// db // cache
private static Flux<Integer> fallback(){
return Flux.range(20, 5);
}
}

— — — — — — — — — — — — — — — — — — — — — — — — — — — -

public final <V> Flux<V> transform( @NotNull java.util.function.Function<? super Flux<T>, ? extends org.reactivestreams.Publisher<V>> transformer )
Transform this Flux in order to generate a target Flux. Unlike transformDeferred(Function), the provided function is executed as part of assembly.
Function applySchedulers = flux -> flux.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel());
flux.transform(applySchedulers).map(v -> v * v).subscribe();

import com.rp.courseutil.Util;
import com.rp.sec04.helper.Person;
import reactor.core.publisher.Flux;

import java.util.function.Function;

public class Transform {
public static void main(String[] args) {
getPerson()
.transform(applyFilterMap())
.subscribe(Util.subscriber());
}

public static Flux<Person> getPerson() {
return Flux.range(1, 10)
.map(i -> new Person());
}

public static Function<Flux<Person>, Flux<Person>> applyFilterMap() {
return flux -> flux
.filter(p -> p.getAge() > 10)
.doOnNext(p -> p.setName(p.getName().toUpperCase()))
.doOnDiscard(Person.class, p -> System.out.println("Not allowing : " + p));
}
}

Received : Person(name=HIROKO, age=14)
Received : Person(name=REFUGIO, age=22)
Received : Person(name=KOREY, age=12)
Not allowing : Person(name=Manual, age=3)
Received : Person(name=JULIEANN, age=29)
Not allowing : Person(name=Sheri, age=1)
Received : Person(name=TOMMYE, age=17)
Received : Person(name=ZITA, age=26)
Not allowing : Person(name=Jamey, age=5)
Not allowing : Person(name=Bob, age=8)
Completed

--

--

Prateek
Prateek

Written by Prateek

Java Developer and enthusiast

No responses yet