How do Mono and Flux behave ?
Reactor is a Java library for creating reactive non-blocking applications on the JVM based on the Reactive Streams Specification.
This article is the third of a series which goal is to guide you through the process of creating, manipulating and managing the execution of the Reactive Streams that offer Reactor through the Mono and Flux classes.
In the first two articles we covered how to create Mono and Flux, and how to apply transformations to the data they hold.
In this third article, I’ll show you how these two classes behave.
Laziness
By definition, every stream is lazy. This means that nothing is executed until you consume the stream. With Mono and Flux, the method to consume them is subscribe(...)
.
The biggest benefits of this approach is that it consumes less memory than doing transformations in an imperative way, and all the operations are Thread safe.
public void fluxAreLazy() {
Instant beginning = Instant.now();
Flux<Integer> flux = Flux.range(1, 5).
flatMap(n -> {
try { Thread.sleep(100); return Mono.just(n * n); }
catch (InterruptedException e) { return Mono.error(e); }
});
System.out.println("After step1, program runs for : " + Utils.timeDifference(beginning, Instant.now()));
flux.subscribe(System.out::println);
System.out.println("The whole test last : " + Utils.timePastFrom(beginning));
}
Immutability
Flux and Mono are immutable. This means that an instance of any of them can not be modified at all. Calling any method on them will return a new instance of Flux or Mono.
If you are not familiar with immutability, you can read this article from MIT.
public void fluxAreImmutable() {
Flux<Integer> flux = Flux.range(1, 5);
flux.flatMap(n -> {
try { Thread.sleep(100); return Mono.just(n * n); }
catch (InterruptedException e) { return Mono.error(e); }
});
flux.subscribe(System.out::println);
// Logs : 1 2 3 4 5
// Means that it is not the flux we modified.
// In order to execute the flux we modified, we should write :
Flux<Integer> flux2 = Flux.range(1, 5);
Flux<Integer> flux3 = flux2.flatMap(n -> {
try { Thread.sleep(100); return Mono.just(n * n); }
catch (InterruptedException e) { return Mono.error(e); }
});
flux3.subscribe(System.out::println);
// Logs : 1 4 9 16 25
// Success !
}
Flux can be infinite
public void fluxCanBeInfinite() {
/* ---
This example demonstrates that flux can be infinite.
To do that, we just need to make the flux ticking, as if it was a clock.
*/
Flux.interval(Duration.ofMillis(100)).
map(i -> "Tick : " + i).
subscribe(System.out::println);
// This flux will never finish emitting values.
}
Infinite Flux can be stopped
public void infiniteFluxCanBeStopped() {
Disposable disposable = Flux.interval(Duration.ofMillis(100)).
map(i -> "Tick : " + i).
subscribe(System.out::println);
try { Thread.sleep(1000); }
catch (InterruptedException e) { e.printStackTrace(); }
disposable.dispose();
System.out.println("Stopped flux");
}
Flux run sequentially
public void fluxRunSequentially() {
Flux.range(1, 3).
flatMap(n -> {
System.out.println("In flatMap n=" + n + " --- Thread is : " + Thread.currentThread().getName());
try {
Thread.sleep(100);
System.out.println("After Thread.sleep n=" + n);
return Mono.just(n);
} catch (InterruptedException e) { return Mono.error(e); }
}).
map(n -> { System.out.println("In map n=" + n); return n; }).
subscribe(System.out::println);
}
Conclusion
This third article covered some of the key properties of how Mono and Flux behave.
In the next article, we will cover how to take control over the execution of your Mono and Flux. This will enable you to take full advantage of the power this technology offers.
Article #4:https://www.v8en.com/article/227