ProjectReactor — Flux Examples

Prateek
3 min readFeb 28, 2022

--

I’ve explain mono here: https://prateek-ashtikar512.medium.com/projectreactor-examples-7ca9f3611636 and will try to play around the Flux now.

Flux Intro: Subscribe a Consumer to this Flux that will consume all the elements in the sequence. It will request an unbounded demand (Long.MAX_VALUE). For a passive version that observe and forward incoming data see doOnNext(Consumer). For a version that gives you more control over backpressure and the request, see subscribe(Subscriber) with a BaseSubscriber.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;

public class FluxIntro {

public static void main(String[] args) {
Flux<Object> flux = Flux.just(1, 2, 3, "a", Util.faker().name().fullName());
flux.subscribe(
o -> System.out.println("Received : " + o),
e -> System.out.println("ERROR : " + e.getMessage()),
() -> System.out.println("Completed"));
}
}

Received : 1
Received : 2
Received : 3
Received : a
Received : Ivory Greenholt
Completed

import reactor.core.publisher.Flux;

public class Lec02MultipleSubscribers {

public static void main(String[] args) {
// Integer Flux
Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4);

// Even Flux
Flux<Integer> evenFlux = integerFlux.filter(i -> i % 2 == 0);

integerFlux
.log()
.subscribe(i -> System.out.println("Subscriber 1 : " + i));

System.out.println("---------------");
evenFlux
.log()
.subscribe(i -> System.out.println("Subscriber 2 : " + i));
}
}

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
Subscriber 1 : 1
[ INFO] (main) | onNext(2)
Subscriber 1 : 2
[ INFO] (main) | onNext(3)
Subscriber 1 : 3
[ INFO] (main) | onNext(4)
Subscriber 1 : 4
[ INFO] (main) | onComplete()
— — — — — — — -
[ INFO] (main) | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(2)
Subscriber 2 : 2
[ INFO] (main) | onNext(4)
Subscriber 2 : 4
[ INFO] (main) | onComplete()

— — — — — — — — — — — -

fromIterable — Create a Flux that emits the items contained in the provided Iterable. The Iterable.iterator() method will be invoked at least once and at most twice for each subscriber.

fromArray — Create a Flux that emits the items contained in the provided array.

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;

import java.util.Arrays;
import java.util.List;

public class Lec03FluxFromArrayOrList {

public static void main(String[] args) {
List<String> values = Arrays.asList("a", "b", "c");
Flux.fromIterable(values)
.log()
.subscribe(Util.onNext());

System.out.println("-----------------");
Integer[] arr = {2, 5, 7, 8};
Flux.fromArray(arr)
.log()
.subscribe(o -> System.out.println("Received : " + o));
}
}

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(a)
Received : a
[ INFO] (main) | onNext(b)
Received : b
[ INFO] (main) | onNext(c)
Received : c
[ INFO] (main) | onComplete()
— — — — — — — — -
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(2)
Received : 2
[ INFO] (main) | onNext(5)
Received : 5
[ INFO] (main) | onNext(7)
Received : 7
[ INFO] (main) | onNext(8)
Received : 8
[ INFO] (main) | onComplete()

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — -

Subscribe — Subscribe Consumer to this Flux that will respectively consume all the elements in the sequence, handle errors and react to completion. The subscription will request unbounded demand (Long.MAX_VALUE).
For a passive version that observe and forward incoming data see doOnNext(Consumer), doOnError(Consumer) and doOnComplete(Runnable).
For a version that gives you more control over backpressure and the request, see subscribe(Subscriber) with a BaseSubscriber.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;

import java.util.List;
import java.util.stream.Stream;

public class FluxFromStream {

public static void main(String[] args) {
Stream<Integer> stream = List.of(1, 2, 3, 4, 5).stream();
Flux<Integer> integerFlux = Flux.fromStream(() -> stream);

integerFlux
.subscribe(Util.onNext(), Util.onError(), Util.onComplete());

integerFlux
.subscribe(Util.onNext(), Util.onError(), Util.onComplete());
}
}

— — — — — — — — — — — — — — — — — — — — — — — — — —

FluxRange — Build a Flux that will only emit a sequence of count incrementing integers, starting from start. That is, emit integers between start (included) and start + count (excluded) then complete.

import com.rp.courseutil.Util;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

public class Lec05FluxRange {

public static void main(String[] args) {
Disposable disposable = Flux.range(3, 10)
.map(i -> Util.faker().name().fullName())
.subscribe(Util.onNext(), Util.onError(), Util.onComplete());
System.out.println(disposable);
}
}

--

--

Prateek
Prateek

Written by Prateek

Java Developer and enthusiast

Responses (1)