ProjectReactor — Flux create & generate

Prateek
2 min readMar 2, 2022

public static <T> Flux<T> create( @NotNull java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>> emitter )
Programmatically create a Flux with the capability of emitting multiple elements in a synchronous or asynchronous manner through the FluxSink API. This includes emitting elements from multiple threads.

public class FluxCreate {
public static void main(String[] args) {
Flux.create(fluxSink -> {
String country;
do {
country = Util.faker().country().name();
fluxSink.next(country);
} while (!country.equalsIgnoreCase("india"));
fluxSink.complete();
})
.subscribe(Util.subscriber());
}
}

Received : Algeria
Received : Iceland
Received : Saint Kitts and Nevis
Received : New Zealand
Received : Micronesia (Federated States of)
Received : Lesotho
Received : India
Completed

public class FluxCreateIssueFix {

public static void main(String[] args) {
Flux.create(fluxSink -> {
String country;
int counter = 0;
do {
country = Util.faker().country().name();
System.out.println("emitting : " + country);
fluxSink.next(country);
counter++;
} while (!country.equalsIgnoreCase("canada") && !fluxSink.isCancelled() && counter < 10);
fluxSink.complete();
})
.take(3)
.subscribe(Util.subscriber());
}
}

emitting : Mauritius
Received : Mauritius
emitting : Mongolia
Received : Mongolia
emitting : Mauritius
Received : Mauritius
Completed

public static <T> Flux<T> generate( @NotNull java.util.function.Consumer<reactor.core.publisher.SynchronousSink<T>> generator )
Programmatically create a Flux by generating signals one-by-one via a consumer callback.

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;

public class Lec05FluxGenerate {

public static void main(String[] args) {
Flux.generate(synchronousSink -> {
System.out.println("emitting");
synchronousSink.next(Util.faker().country().name());
})
.take(2)
.subscribe(Util.subscriber());
}
}

emitting
Received : Vanuatu
emitting
Received : Nicaragua
Completed

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;

public class Lec06FluxGenerateAssignment {
public static void main(String[] args) {
Flux.generate(synchronousSink -> {
String country = Util.faker().country().name();
System.out.println("emitting " + country);
synchronousSink.next(country);
if (country.equalsIgnoreCase("india"))
synchronousSink.complete();
})
.take(7)
.subscribe(Util.subscriber());
}
}
import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;

public class Lec07FluxGenerateCounter {
public static void main(String[] args) {
Flux.generate(() -> 1, (counter, sink) -> {
String country = Util.faker().country().name();
sink.next(country);
if (counter >= 10 || country.equalsIgnoreCase("canada")) {
sink.complete();
}
return counter + 1;
}
)
.take(4)
.subscribe(Util.subscriber());
}
}

--

--