ProjectReactor — Sinks

Prateek
4 min readMar 2, 2022

--

public static <T> Sinks.One<T> one()
A Sinks.One that works like a conceptual promise: it can be completed with or without a value at any time, but only once. This completion is replayed to late subscribers. Calling Sinks.One.tryEmitValue(Object) (or Sinks.One.emitValue(Object, Sinks.EmitFailureHandler)) is enough and will implicitly produce a Subscriber.onComplete() signal as well.
Use Sinks.One.asMono() to expose the Mono view of the sink to downstream consumers.

import com.rp.courseutil.Util;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class SinkOne {

public static void main(String[] args) {
Sinks.One<Object> sink = Sinks.one();
Mono<Object> mono = sink.asMono();

mono.subscribe(Util.subscriber("Sam"));
mono.subscribe(Util.subscriber("Mike"));
sink.tryEmitValue("Hello");
sink.tryEmitValue("Hi");
}
}

Output —

Sam — Received : Hello
Sam — Completed
Mike — Received : Hello
Mike — Completed

— — — — — — — — — — — — — — — — — — — — — — — — — — -

public static Sinks.ManySpec many()
Help building Sinks.Many sinks that will broadcast multiple signals to one or more Subscriber.
Use Sinks.Many.asFlux() to expose the Flux view of the sink to the downstream consumers.

public abstract Sinks.UnicastSpec unicast() — Help building Sinks.Many that will broadcast signals to a single Subscriber.

public abstract Sinks.MulticastSpec multicast() — Help building Sinks.Many that will broadcast signals to multiple Subscriber

public abstract Sinks.MulticastReplaySpec replay() — Help building Sinks.Many that will broadcast signals to multiple Subscriber with the ability to retain and replay all or an arbitrary number of elements.

public abstract <T> Sinks.Many<T> onBackpressureBuffer()
A Sinks.Many with the following characteristics:
- Unicast: contrary to most other Sinks.Many, the Flux view rejects subscribers past the first one.
- Backpressure : this sink honors downstream demand of its single Subscriber.
- Replaying: non-applicable, since only one Subscriber can register.
- Without Subscriber: all elements pushed to this sink are remembered and will be replayed once the Subscriber subscribes.

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public class SinkUnicast {

public static void main(String[] args) {

// handle through which we would push items
Sinks.Many<Object> sink = Sinks.many().unicast().onBackpressureBuffer();

// handle through which subscribers will receive items
Flux<Object> flux = sink.asFlux();

flux.subscribe(Util.subscriber("sam"));
flux.subscribe(Util.subscriber("mike"));

sink.tryEmitNext("hi");
sink.tryEmitNext("how are you");
sink.tryEmitNext("?");

sink.tryEmitNext("hello");
sink.tryEmitNext("howlly");
sink.tryEmitNext("Neha");
}
}

mike — ERROR : UnicastProcessor allows only a single Subscriber
sam — Received : hi
sam — Received : how are you
sam — Received : ?
sam — Received : hello
sam — Received : howlly
sam — Received : Neha

Note — Since sam has already subscribe hence mike will not received data because it’s a unicast processor

— — — — — — — — — — — — — — — — — — — — — — — — — — -

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class Lec03SinkThreadSafety {
public static void main(String[] args) {
// handle through which we would push items
Sinks.Many<Object> sink = Sinks.many().unicast().onBackpressureBuffer();

// handle through which subscribers will receive items
Flux<Object> flux = sink.asFlux();

List<Object> list = new ArrayList<>();
flux.subscribe(list::add);

for (int i = 0; i < 1000; i++) {
final int j = i;
CompletableFuture.runAsync(() -> {
sink.emitNext(j, (s, e) -> true);
});

}
Util.sleepSeconds(5);
System.out.println(list.size());
}
}

1000

— — — -

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public class Lec04SinkMulti {

public static void main(String[] args) {
// handle through which we would push items
Sinks.Many<Object> sink = Sinks.many().multicast().onBackpressureBuffer();

// handle through which subscribers will receive items
Flux<Object> flux = sink.asFlux();
sink.tryEmitNext("hi");
sink.tryEmitNext("how are you");

flux.subscribe(Util.subscriber("sam"));
flux.subscribe(Util.subscriber("mike"));

sink.tryEmitNext("?");

// Only after subscribe you'll be able to get the element
flux.subscribe(Util.subscriber("jake"));

sink.tryEmitNext("new msg");
}
}

sam — Received : hi
sam — Received : how are you
sam — Received : ?
mike — Received : ?
sam — Received : new msg
mike — Received : new msg
jake — Received : new msg

— — — — — — — — — — — — — — — — — — — — — — — — — — -

import com.rp.courseutil.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public class Lec06SinkReplay {

public static void main(String[] args) {
// handle through which we would push items
Sinks.Many<Object> sink = Sinks.many().replay().all();

// handle through which subscribers will receive items
Flux<Object> flux = sink.asFlux();

sink.tryEmitNext("hi");
sink.tryEmitNext("how are you");

flux.subscribe(Util.subscriber("sam"));
flux.subscribe(Util.subscriber("mike"));

sink.tryEmitNext("?");

flux.subscribe(Util.subscriber("jake"));

sink.tryEmitNext("new msg");
}
}

sam — Received : hi
sam — Received : how are you
mike — Received : hi
mike — Received : how are you
sam — Received : ?
mike — Received : ?
jake — Received : hi
jake — Received : how are you
jake — Received : ?
sam — Received : new msg
mike — Received : new msg
jake — Received : new msg

— — — — — — — — — — — — — — — — — — — — — — — — — — —

--

--

Prateek
Prateek

Written by Prateek

Java Developer and enthusiast

No responses yet