转载

How to take control over the execution of Mono and Flux ?

温馨提示:
本文最后更新于 2019年12月04日,已超过 1,872 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

Reactor is a Java library for creating reactive non-blocking applications on the JVM based on the Reactive Streams Specification.

This article is the forth of a series which goal is to guide you through the process of creating, manipulating and managing the execution of the Reactive Streams that offer Reactor through the Mono and Flux classes.

In the first three articles we covered how to create Mono and Flux, how to apply transformations to the data they hold and how they behave.

In this forth article, we will see how to take control over the way Flux and Mono are executed, either sequentially or in parallel. And also how to configure on which thread pool to run all the operations, or only one operation.

Schedulers

In computingscheduling is the method by which work specified by some means is assigned to resources that complete the work. The work may be virtual computation elements such as threadsprocesses or data flows, which are in turn scheduled onto hardware resources such as processorsnetwork links or expansion cards.
— Wikipedia

Reactor uses schedulers to manage on which thread pool the computing of your stream should be processed. The library embed four already configured schedulers that you can use to control the execution of your streams.

The 4 schedulers offered by Reactor are :

  • single: a one worker thread scheduler
  • immediate: a scheduler that computes the stream in the thread where the call to the method configuring it is done.
  • parallel: a scheduler that has as many workers as your CPU has cores (or threads if supporting hyper threading). The method it uses to get the amount of workers to use is Runtime.getRuntime().availableProcessors()
  • elastic: a scheduler that dynamically creates threads when needed, with no up limit. A thread is released after 60 non-working seconds.

These 4 schedulers are accessible through the Schedulers class via static methods.

More options are available for the schedulers. If you want to know more about this, I recommand you dive into the documentation.

How to configure a Mono or Flux’s scheduler

Mono and Flux have two methods to configure the scheduler to use. They are subscribeOn(Scheduler scheduler) and publishOn(Scheduler scheduler) . These two methods are pretty different.

SubscribeOn

subscribeOn(...) sets the scheduler for all the operations of your stream. It doesn’t matter when you call the method. So the below two codes do exactly the same, they are fully executed on a single scheduler:

Mono.just("1").
map(Integer::valueOf).
subscribeOn(Schedulers.single()).
subscribe(System.out::println)
Mono.just("1").
subscribeOn(Schedulers.single()).
map(Integer::valueOf).
subscribe(System.out::println)

A word about a counter intuitive thing with subscribeOn(...) . If the method get called multiple times on a stream, only the first one will be taken in account. So, the below code will run on the single scheduler and not the elastic one. This is important to know when working with streams that are modified along several methods or with libraries that return instances of Mono andFlux .

Mono.just(“1”).
subscribeOn(Schedulers.single()).
map(Integer::valueOf).
subscribeOn(Schedulers.elastic()).
subscribe(System.out::println)

PublishOn

publishOn(...) sets the scheduler for all the operations that follow this method call. It will override the scheduler set with the subscribeOn method.

Mono.just(“1”).
subscribeOn(Schedulers.single()).
map(Integer::valueOf). // -> on single
publishOn(Schedulers.parallel()). // -> on single
map(x -> x * x). // -> on parallel
subscribe(System.out::println) // -> on parallel

The flatMap case

Remember, when doing a flatMap, you return a Flux or Mono. This stream can have its own scheduler set. This scheduler won’t be overriden by any of the publishOn or subscribeOn method of the parent stream, but by default, it will run on the same scheduler as the parent.

This allow you to set different schedulers depending on what happens in your flatMap. For example, in case of error, you might want the Mono.error(...) to run on the same scheduler as the parent, but in case of success, to run on a different scheduler than the parent. This configuration is up to you.

Parallel Flux

Flux runs sequentially by default. To make a Flux parallel, there exists a .parallel() method in the class. This method returns an instance of ParallelFlux .

Conversely, it is possible to turn a parallel flux into a sequential flux with the method .sequential() .

ParallelFlux has no method subscribeOn or publishOn . Instead, it has a runOn(Scheduler scheduler) method that acts exactly as publishOn. By default, not calling runOn(...) will run the stream in the thread where you subscribed to it.

Combining Flux, Mono and ParallelFlux, by examples

For the following examples, I will very often use the following function:

private <T> T identityWithThreadLogging(T el, String operation) {
System.out.println(operation + " -- " + el + " -- " +
Thread.currentThread().getName());
return el;
}

First example

flatMap without changing the scheduler:

@Test
public void flatMapWithoutChangingScheduler() {
Flux.range(1, 3).
map(n -> identityWithThreadLogging(n, "map1").
flatMap(n -> Mono.just(n).map(nn -> identityWithThreadLogging(nn, "mono")).
subscribeOn(Schedulers.parallel()).
subscribe(n -> {
this.identityWithThreadLogging(n, "subscribe");
System.out.println(n);
});
}


Output of flatMapWithoutChangingScheduler

Second example

Same as above, but subscribing the Mono to another scheduler. So the Mono.just(n).map(this::identityWithThreadLogging) is suffixed with .subscribeOn(Schedulers.elastic()) . The new output is:

Output of flatMapWithoutChangingScheduler with a .subscribeOn(Schedulers.single()) added to the mono in flatMap.

As you might have noticed, each Mono was executed on a different thread. which was not the case for without adding subscribeOn(...) to the Mono . This made a part of the execution of the Flux parallel. That would also have worked with the parallel scheduler on the monos.

Also, the subscription of the Flux itself runned on the scheduler configured on each Mono .

Third example

With the same basis as the above examples, let’s make things more complicated. The code will speak for itself:

@Test
public void complexCase() {
Flux.range(1, 4).
subscribeOn(Schedulers.immediate()).
map(n -> identityWithThreadLogging(n, "map1")).
flatMap(n -> {
if (n == 1) return createMonoOnScheduler(n, Schedulers.parallel());
if (n == 2) return createMonoOnScheduler(n, Schedulers.elastic());
if (n == 3) return createMonoOnScheduler(n, Schedulers.single());
return Mono.error(new Exception("error")).subscribeOn(Schedulers.newSingle("error-thread"));
}).
map(n -> identityWithThreadLogging(n, "map2")).
subscribe(
success -> System.out.println(identityWithThreadLogging(success, "subscribe")),
error -> System.err.println(identityWithThreadLogging(error, "subscribe, err").getMessage())
);
}


Output of complexCase()

Fourth example

In this fourth example, I want to run all the second map and the subscribe on a single thread. Any idea how to do that ?

The solution is to add a .publishOn(Schedulers.single()) right before the second map. The result is:

Fifth example

This example’s goal is to show you that when working with ParallelFlux, the position where you call .runOn(...) matters.

@Test
public void runOnMethodCallOnParallelFluxMatters() {
// FIRST TEST : .runOn(parallel) after the sleeping map
ParallelFlux<Integer> fluxAfter = Flux.range(1, 100).parallel().
map(Object::toString).
map(Integer::valueOf).
map(i -> {
try { Thread.sleep(12); }
catch(InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName());
return i;
}).
runOn(elastic).
map(i -> i % 2 == 0 ? i : i + 10);

long afterSleepingResult = TestUtils.mesureParallelFluxOnScheduler(fluxAfter, elastic);

// SECOND TEST : .runOn(parallel) before the sleeping map
ParallelFlux<Integer> fluxBefore = Flux.range(1, 100).parallel().
map(Object::toString).
map(Integer::valueOf).
runOn(elastic).
map(i -> {
try { Thread.sleep(12); }
catch(InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName());
return i;
}).
map(i -> i % 2 == 0 ? i : i + 10);

long beforeSleepingResult = TestUtils.mesureParallelFluxOnScheduler(fluxBefore, elastic);

System.out.println("After sleeping result in an execution of " + afterSleepingResult + "ms");
System.out.println("Before sleeping result in an execution of " + beforeSleepingResult + "ms");

/* ---
Running this method gave me the following results :
after = 1369ms
before = 156ms
*/
}


Sixth example

Combining parallel and sequential flux.

@Test
public void combiningParallelAndSequentialFlux() {
Flux.range(1, 4).
subscribeOn(Schedulers.parallel()).
map(n -> identityWithThreadLogging(n, "map1")).
parallel().
runOn(Schedulers.elastic()).
map(n -> identityWithThreadLogging(n, "parallelFlux")).
sequential().
map(n -> identityWithThreadLogging(n, "map2")).
subscribe(n -> identityWithThreadLogging(n, "subscribe"));
}


Output of combiningParallelAndSequentialFlux()

What is interesting with this example is to observe that even though the sequential Flux subscribe on the elastic scheduler, it uses only one thread to perform this operation. On the opposite, the ParallelFlux uses 4 threads.

In addition to show how schedulers are used, it shows how ParallelFlux and Flux handle tasks, and that it is possible to control the execution of a Flux very precisely.

Conclusion

In this fourth example, we took a look at how to control the execution of the streams that Reactor offers, leveraging the possibilities offered by the methods subscribeOn , publishOn ,runOn and the very complex compositions that can be made using the flatMap method properly.

In the next article, we will take a look at a few benchmarks I made. They intend to help you pick the right scheduler depending on the kind of work your program is doing.

正文到此结束
本文目录