Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:
Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.
Get started with mocking and improve your application tests using our Mockito guide:
Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.
Get started with understanding multi-threaded applications with our Java Concurrency guide:
Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:
Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.
But these can also be overused and fall into some common pitfalls.
To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:
Get started with Spring and Spring Boot, through the Learn Spring course:
>> LEARN SPRINGExplore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:
Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.
I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.
You can explore the course here:
Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.
Get started with Spring Data JPA through the guided reference course:
Refactor Java code safely β and automatically β with OpenRewrite.
Refactoring big codebases by hand is slow, risky, and easy to put off. Thatβs where OpenRewrite comes in. The open-source framework for large-scale, automated code transformations helps teams modernize safely and consistently.
Each month, the creators and maintainers of OpenRewrite at Moderne run live, hands-on training sessions β one for newcomers and one for experienced users. Youβll see how recipes work, how to apply them across projects, and how to modernize code with confidence.
Join the next session, bring your questions, and learn how to automate the kind of work that usually eats your sprint time.
1. Introduction
Reactor Core is a Java 8 library that implements the reactive programming model. Itβs built on top of the Reactive Streams specification, a standard for building reactive applications.
From the background of non-reactive Java development, going reactive can be quite a steep learning curve. This becomes more challenging when comparing it to the Java 8 Stream API, as they could be mistaken for being the same high-level abstractions.
In this article, weβll attempt to demystify this paradigm. Weβll take small steps through Reactor until weβve built a picture of how to compose reactive code, laying the foundation for more advanced articles to come in a later series.
2. Reactive Streams Specification
Before we look at Reactor, we should look at the Reactive Streams Specification. This is what Reactor implements, and it lays the groundwork for the library.
Essentially, Reactive Streams is a specification for asynchronous stream processing.
In other words, a system where lots of events are being produced and consumed asynchronously. Think about a stream of thousands of stock updates per second coming into a financial application, and for it to have to respond to those updates in a timely manner.
One of the main goals of this is to address the problem of backpressure. If we have a producer which is emitting events to a consumer faster than it can process them, then eventually the consumer will be overwhelmed with events, running out of system resources.
Backpressure means that our consumer should be able to tell the producer how much data to send in order to prevent this, and this is what is laid out in the specification.
3. Maven Dependencies
Before we get started, letβs add our Maven dependencies for reactor-core and logback-classic:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.4</version>
</dependency>
Weβre also adding Logback as a dependency. This is because weβll be logging the output of the Reactor in order to better understand the flow of data.
4. Producing a Stream of Data
In order for an application to be reactive, the first thing it must be able to do is to produce a stream of data.
This could be something like the stock update example that we gave earlier. Without this data, we wouldnβt have anything to react to, which is why this is a logical first step.
Reactive Core gives us two data types that enable us to do this.
4.1. Flux
The first way of doing this is with Flux. Itβs a stream that can emit 0..n elements. Letβs try creating a simple one:
Flux<Integer> just = Flux.just(1, 2, 3, 4);
In this case, we have a static stream of four elements.
4.2. Mono
The second way of doing this is with a Mono, which is a stream of 0..1 elements. Letβs try instantiating one:
Mono<Integer> just = Mono.just(1);
This looks and behaves almost exactly the same as the Flux, only this time we are limited to no more than one element.
4.3. Why Not Only Flux?
Before experimenting further, itβs worth highlighting why we have these two data types.
First, it should be noted that both Flux and Mono are implementations of the Reactive Streams Publisher interface. Both classes are compliant with the specification, and we could use this interface in their place:
Publisher<String> just = Mono.just("foo");
But really, knowing this cardinality is useful. This is because a few operations only make sense for one of the two types and because it can be more expressive (imagine findOne() in a repository).
5. Subscribing to a Stream
Now we have a high-level overview of how to produce a stream of data, we need to subscribe to it in order for it to emit the elements.
5.1. Collecting Elements
Letβs use the subscribe() method to collect all the elements in a stream:
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.subscribe(elements::add);
assertThat(elements).containsExactly(1, 2, 3, 4);
The data wonβt start flowing until we subscribe. Notice that we have added some logging as well, this will be helpful when we look at whatβs happening behind the scenes.
5.2. The Flow of Elements
With logging in place, we can use it to visualize how the data is flowing through our stream:
20:25:19.550 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(1)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(2)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(3)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(4)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onComplete()
First of all, everything is running on the main thread. Letβs not go into any details about this, as weβll be taking a further look at concurrency later on in this article. It does make things simple, though, as we can deal with everything in order.
Now letβs go through the sequence that we have logged one by one:
- onSubscribe() β This is called when we subscribe to our stream
- request(unbounded) β When we call subscribe, behind the scenes we are creating a Subscription. This subscription requests elements from the stream. In this case, it defaults to unbounded, meaning it requests every single element available
- onNext() β This is called on every single element
- onComplete() β This is called last, after receiving the last element. Thereβs actually an onError() as well, which would be called if there is an exception, but in this case, there isnβt
This is the flow laid out in the Subscriber interface as part of the Reactive Streams Specification, and in reality, thatβs whatβs been instantiated behind the scenes in our call to onSubscribe(). Itβs a useful method, but to better understand whatβs happening letβs provide a Subscriber interface directly:
Flux.just(1, 2, 3, 4)
.log()
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
elements.add(integer);
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
We can see that each possible stage in the above flow maps to a method in the Subscriber implementation. It just happens that Flux has provided us with a helper method to reduce this verbosity.
5.3. Comparison to Java 8 Streams
It still might appear that we have something synonymous to a Java 8 Stream doing collect:
List<Integer> collected = Stream.of(1, 2, 3, 4)
.collect(toList());
Only we donβt.
The core difference is that Reactive is a push model, whereas the Java 8 Streams are a pull model. In a reactive approach, events are pushed to the subscribers as they come in.
The next thing to notice is a Streams terminal operator is just that, terminal, pulling all the data and returning a result. With Reactive we could have an infinite stream coming in from an external resource, with multiple subscribers attached and removed on an ad hoc basis. We can also do things like combine streams, throttle streams, and apply backpressure, which we will cover next.
6. Backpressure
The next thing we should consider is backpressure. In our example, the subscriber is telling the producer to push every single element at once. This could end up becoming overwhelming for the subscriber, consuming all of its resources.
Backpressure is when a downstream can tell an upstream to send it less data in order to prevent it from being overwhelmed.
We can modify our Subscriber implementation to apply backpressure. Letβs tell the upstream to only send two elements at a time by using request():
Flux.just(1, 2, 3, 4)
.log()
.subscribe(new Subscriber<Integer>() {
private Subscription s;
int onNextAmount;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(2);
}
@Override
public void onNext(Integer integer) {
elements.add(integer);
onNextAmount++;
if (onNextAmount % 2 == 0) {
s.request(2);
}
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
Now if we run our code again, weβll see the request(2) is called, followed by two onNext() calls, then request(2) again.
23:31:15.395 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:31:15.397 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.397 [main] INFO reactor.Flux.Array.1 - | onNext(1)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(3)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(4)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onComplete()
Essentially, this is reactive pull backpressure. We are requesting the upstream to only push a certain amount of elements, and only when we are ready.
If we imagine we were being streamed tweets from Twitter, it would then be up to the upstream to decide what to do. If tweets were coming in but there are no requests from the downstream, then the upstream could drop items, store them in a buffer, or some other strategy.
7. Operating on a Stream
We can also perform operations on the data in our stream, responding to events as we see fit.
7.1. Mapping Data in a Stream
A simple operation that we can perform is applying a transformation. In this case, letβs just double all the numbers in our stream:
Flux.just(1, 2, 3, 4)
.log()
.map(i -> {
LOGGER.debug("{}:{}", i, Thread.currentThread());
return i * 2;
})
.subscribe(elements::add);
map() will be applied when onNext() is called.
7.2. Combining Two Streams
We can then make things more interesting by combining another stream with this one. Letβs try this by using zip() function:
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.zipWith(Flux.range(0, Integer.MAX_VALUE),
(one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two))
.subscribe(elements::add);
assertThat(elements).containsExactly(
"First Flux: 2, Second Flux: 0",
"First Flux: 4, Second Flux: 1",
"First Flux: 6, Second Flux: 2",
"First Flux: 8, Second Flux: 3");
Here, we are creating another Flux that keeps incrementing by one and streaming it together with our original one. We can see how these work together by inspecting the logs:
20:04:38.064 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:04:38.065 [main] INFO reactor.Flux.Array.1 - | onNext(1)
20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onNext(0)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(2)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(1)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(3)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(2)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(4)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(3)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onComplete()
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | cancel()
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | cancel()
Note how we now have one subscription per Flux. The onNext() calls are also alternated, so the index of each element in the stream will match when we apply the zip() function.
8. Hot Streams
Currently, weβve focused primarily on cold streams. These are static, fixed-length streams that are easy to deal with. A more realistic use case for reactive might be something that happens infinitely.
For example, we could have a stream of mouse movements that constantly needs to be reacted to or a Twitter feed. These types of streams are called hot streams, as they are always running and can be subscribed to at any point in time, missing the start of the data.
8.1. Creating a ConnectableFlux
One way to create a hot stream is by converting a cold stream into one. Letβs create a Flux that lasts forever, outputting the results to the console, which would simulate an infinite stream of data coming from an external resource:
ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
while(true) {
fluxSink.next(System.currentTimeMillis());
}
})
.publish();
By calling publish() we are given a ConnectableFlux. This means that calling subscribe() wonβt cause it to start emitting, allowing us to add multiple subscriptions:
publish.subscribe(System.out::println);
publish.subscribe(System.out::println);
If we try running this code, nothing will happen. Itβs not until we call connect(), that the Flux will start emitting:
publish.connect();
8.2. Throttling
If we run our code, our console will be overwhelmed with logging. This is simulating a situation where too much data is being passed to our consumers. Letβs try getting around this with throttling:
ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
while(true) {
fluxSink.next(System.currentTimeMillis());
}
})
.sample(ofSeconds(2))
.publish();
Here, weβve introduced a sample() method with an interval of two seconds. Now values will only be pushed to our subscriber every two seconds, meaning the console will be a lot less hectic.
Of course, there are multiple strategies to reduce the amount of data sent downstream, such as windowing and buffering, but they will be left out of scope for this article.
9. Concurrency
All of our above examples have currently run on the main thread. However, we can control which thread our code runs on if we want. The Scheduler interface provides an abstraction around asynchronous code, for which many implementations are provided for us. Letβs try subscribing to a different thread to main:
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
The Parallel scheduler will cause our subscription to be run on a different thread, which we can prove by looking at the logs. We see the first entry comes from the main thread and the Flux is running in another thread called parallel-1.
20:03:27.505 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:03:27.529 [parallel-1] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | request(unbounded)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(1)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(2)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(3)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(4)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onComplete()
Concurrency getβs more interesting than this, and it will be worth us exploring it in another article.
10. Conclusion
In this article, weβve given a high-level, end-to-end overview of Reactive Core. Weβve explained how we can publish and subscribe to streams, apply backpressure, operate on streams, and also handle data asynchronously. This should hopefully lay a foundation for us to write reactive applications.
Later articles in this series will cover more advanced concurrency and other reactive concepts. Thereβs also another article covering Reactor with Spring.
