Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:
Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.
Get started with mocking and improve your application tests using our Mockito guide:
Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.
Get started with understanding multi-threaded applications with our Java Concurrency guide:
Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:
Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.
But these can also be overused and fall into some common pitfalls.
To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:
Get started with Spring and Spring Boot, through the Learn Spring course:
>> LEARN SPRINGExplore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:
Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.
I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.
You can explore the course here:
Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.
Get started with Spring Data JPA through the guided reference course:
Refactor Java code safely β and automatically β with OpenRewrite.
Refactoring big codebases by hand is slow, risky, and easy to put off. Thatβs where OpenRewrite comes in. The open-source framework for large-scale, automated code transformations helps teams modernize safely and consistently.
Each month, the creators and maintainers of OpenRewrite at Moderne run live, hands-on training sessions β one for newcomers and one for experienced users. Youβll see how recipes work, how to apply them across projects, and how to modernize code with confidence.
Join the next session, bring your questions, and learn how to automate the kind of work that usually eats your sprint time.
1. Overview
In this article, weβll be looking at the Java 9 Reactive Streams. Simply put, weβll be able to use the Flow class, which encloses the primary building blocks for building reactive stream processing logic.
Reactive Streams is a standard for asynchronous stream processing with non-blocking back pressure. This specification is defined in the Reactive Manifesto, and there are various implementations of it, for example, RxJava or Akka-Streams.
2. Reactive API Overview
To build a Flow, we can use three main abstractions and compose them into asynchronous processing logic.
Every Flow needs to process events that are published to it by a Publisher instance; the Publisher has one method β subscribe().
If any of the subscribers want to receive events published by it, they need to subscribe to the given Publisher.
The receiver of messages needs to implement the Subscriber interface. Typically this is the end for every Flow processing because the instance of it does not send messages further.
We can think about Subscriber as a Sink. This has four methods that need to be overridden β onSubscribe(), onNext(), onError(), and onComplete(). Weβll be looking at those in the next section.
If we want to transform incoming message and pass it further to the next Subscriber, we need to implement the Processor interface. This acts both as a Subscriber because it receives messages, and as the Publisher because it processes those messages and sends them for further processing.
3. Publishing and Consuming Messages
Letβs say we want to create a simple Flow, in which we have a Publisher publishing messages, and a simple Subscriber consuming messages as they arrive β one at the time.
Letβs create an EndSubscriber class. We need to implement the Subscriber interface. Next, weβll override the required methods.
The onSubscribe() method is called before processing starts. The instance of the Subscription is passed as the argument. It is a class that is used to control the flow of messages between Subscriber and the Publisher:
public class EndSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
}
We also initialized an empty List of consumedElements thatβll be utilized in the tests.
Now, we need to implement the remaining methods from the Subscriber interface. The main method here is onNext() β this is called whenever the Publisher publishes a new message:
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
consumedElements.add(item);
subscription.request(1);
}
Note that when we started the subscription in the onSubscribe() method and when we processed a message we need to call the request() method on the Subscription to signal that the current Subscriber is ready to consume more messages.
Lastly, we need to implement onError() β which is called whenever some exception will be thrown in the processing, as well as onComplete() β called when the Publisher is closed:
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
Letβs write a test for the Processing Flow. Weβll be using the SubmissionPublisher class β a construct from the java.util.concurrent β which implements the Publisher interface.
Weβre going to be submitting N elements to the Publisher β which our EndSubscriber will be receiving:
@Test
public void whenSubscribeToIt_thenShouldConsumeAll()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>();
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
// when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(
() -> assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(items)
);
}
Note, that weβre calling the close() method on the instance of the EndSubscriber. It will invoke onComplete() callback underneath on every Subscriber of the given Publisher.
Running that program will produce the following output:
Got : 1
Got : x
Got : 2
Got : x
Got : 3
Got : x
Done
4. Transformation of Messages
Letβs say that we want to build similar logic between a Publisher and a Subscriber, but also apply some transformation.
Weβll create the TransformProcessor class that implements Processor and extends SubmissionPublisher β as this will be both Publisher and Subscriber.
Weβll pass in a Function that will transform inputs into outputs:
public class TransformProcessor<T, R>
extends SubmissionPublisher<R>
implements Flow.Processor<T, R> {
private Function<T, R> function;
private Flow.Subscription subscription;
public TransformProcessor(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
submit(function.apply(item));
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
close();
}
}
Letβs now write a quick test with a processing flow in which the Publisher is publishing String elements.
Our TransformProcessor will be parsing the String as Integer β which means a conversion needs to happen here:
@Test
public void whenSubscribeAndTransformElements_thenShouldConsumeAll()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
TransformProcessor<String, Integer> transformProcessor
= new TransformProcessor<>(Integer::parseInt);
EndSubscriber<Integer> subscriber = new EndSubscriber<>();
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);
// when
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.close();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(expectedResult)
);
}
Note, that calling the close() method on the base Publisher will cause the onComplete() method on the TransformProcessor to be invoked.
Keep in mind that all publishers in the processing chain need to be closed this way.
5. Controlling Demand for Messages Using the Subscription
Letβs say that we want to consume only the first element from the Subscription, apply some logic and finish processing. We can use the request() method to achieve this.
Letβs modify our EndSubscriber to consume only N number of messages. Weβll be passing that number as the howMuchMessagesConsume constructor argument:
public class EndSubscriber<T> implements Subscriber<T> {
private AtomicInteger howMuchMessagesConsume;
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();
public EndSubscriber(Integer howMuchMessagesConsume) {
this.howMuchMessagesConsume
= new AtomicInteger(howMuchMessagesConsume);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
howMuchMessagesConsume.decrementAndGet();
System.out.println("Got : " + item);
consumedElements.add(item);
if (howMuchMessagesConsume.get() > 0) {
subscription.request(1);
}
}
//...
}
We can request elements as long we want to.
Letβs write a test in which we only want to consume one element from the given Subscription:
@Test
public void whenRequestForOnlyOneElement_thenShouldConsumeOne()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>(1);
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
List<String> expected = List.of("1");
// when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(expected)
);
}
Although the publisher is publishing six elements, our EndSubscriber will consume only one element because it signals demand for processing only that single one.
By using the request() method on the Subscription, we can implement a more sophisticated back-pressure mechanism to control the speed of the message consumption.
6. Conclusion
In this article, we had a look at the Java 9 Reactive Streams.
We saw how to create a processing Flow consisting of a Publisher and a Subscriber. We created a more complex processing flow with the transformation of elements using Processors.
Finally, we used the Subscription to control the demand for elements by the Subscriber.
