Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:
Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.
Get started with mocking and improve your application tests using our Mockito guide:
Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.
Get started with understanding multi-threaded applications with our Java Concurrency guide:
Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:
Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.
But these can also be overused and fall into some common pitfalls.
To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:
Get started with Spring and Spring Boot, through the Learn Spring course:
>> LEARN SPRINGExplore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:
Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.
I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.
You can explore the course here:
Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.
Get started with Spring Data JPA through the guided reference course:
Refactor Java code safely β and automatically β with OpenRewrite.
Refactoring big codebases by hand is slow, risky, and easy to put off. Thatβs where OpenRewrite comes in. The open-source framework for large-scale, automated code transformations helps teams modernize safely and consistently.
Each month, the creators and maintainers of OpenRewrite at Moderne run live, hands-on training sessions β one for newcomers and one for experienced users. Youβll see how recipes work, how to apply them across projects, and how to modernize code with confidence.
Join the next session, bring your questions, and learn how to automate the kind of work that usually eats your sprint time.
1. Overview
Debugging reactive streams is probably one of the main challenges weβll have to face once we start using these data structures.
And having in mind that Reactive Streams have been gaining popularity over the last years, itβs a good idea to know how we can carry out this task efficiently.
Letβs start by setting up a project using a reactive stack to see why this is often troublesome.
2. Scenario with Bugs
We want to simulate a real-case scenario, where several asynchronous processes are running, and where weβve introduced some defects in the code that will eventually trigger exceptions.
To understand the big picture, weβll mention that our application will be consuming and processing streams of simple Foo objects which contain only an id, a formattedName, and a quantity field.
2.1. Analyzing the Log Output
Now, letβs examine a snippet and the output it generates when an unhandled error shows up:
public void processFoo(Flux<Foo> flux) {
flux.map(FooNameHelper::concatFooName)
.map(FooNameHelper::substringFooName)
.map(FooReporter::reportResult)
.subscribe();
}
public void processFooInAnotherScenario(Flux<Foo> flux) {
flux.map(FooNameHelper::substringFooName)
.map(FooQuantityHelper::divideFooQuantity)
.subscribe();
}
After running our application for a few seconds, weβll realize that itβs logging exceptions from time to time.
Having a close look at one of the errors, weβll find something similar to this:
Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
at j.l.String.substring(String.java:1963)
at com.baeldung.debugging.consumer.service.FooNameHelper
.lambda$1(FooNameHelper.java:38)
at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at r.c.p.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275)
at r.c.p.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849)
at r.c.p.Operators$MonoSubscriber.complete(Operators.java:1476)
at r.c.p.MonoDelayUntil$DelayUntilCoordinator.signal(MonoDelayUntil.java:211)
at r.c.p.MonoDelayUntil$DelayUntilTrigger.onComplete(MonoDelayUntil.java:290)
at r.c.p.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:118)
at r.c.s.SchedulerTask.call(SchedulerTask.java:50)
at r.c.s.SchedulerTask.call(SchedulerTask.java:27)
at j.u.c.FutureTask.run(FutureTask.java:266)
at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
.access$201(ScheduledThreadPoolExecutor.java:180)
at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
.run(ScheduledThreadPoolExecutor.java:293)
at j.u.c.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at j.u.c.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at j.l.Thread.run(Thread.java:748)
Based on the root cause, and noticing the FooNameHelper class mentioned in the stack trace, we can imagine that on some occasions, our Foo objects are being processed with a formattedName value that is shorter than expected.
Of course, this is just a simplified case, and the solution seems rather obvious.
But letβs imagine this was a real-case scenario where the exception itself doesnβt help us solve the issue without some context information.
Was the exception triggered as a part of the processFoo, or of the processFooInAnotherScenario method?
Did other previous steps affect the formattedName field before arriving at this stage?
The log entry wouldnβt help us figure out these questions.
To make things worse, sometimes the exception isnβt even thrown from within our functionality.
For example, imagine we rely on a reactive repository to persist our Foo objects. If an error rises at that point, we might not even have a clue on where to get started to debug our code.
We need tools to debug reactive streams efficiently.
3. Using a Debug Session
One option to figure out whatβs going on with our application is to start a debugging session using our favorite IDE.
Weβll have to set up a couple of conditional breakpoints and analyze the flow of data when each step in the stream gets executed.
Indeed, this might be a cumbersome task, especially when weβve got a lot of reactive processes running and sharing resources.
Additionally, there are many circumstances where we canβt start a debugging session for security reasons.
4. Logging Information With the doOnErrorMethod or Using the Subscribe Parameter
Sometimes, we can add useful context information, by providing a Consumer as a second parameter of the subscribe method:
public void processFoo(Flux<Foo> flux) {
// ...
flux.subscribe(foo -> {
logger.debug("Finished processing Foo with Id {}", foo.getId());
}, error -> {
logger.error(
"The following error happened on processFoo method!",
error);
});
}
Note: Itβs worth mentioning that if we donβt need to carry out further processing on the subscribe method, we can chain the doOnError function on our publisher:
flux.doOnError(error -> {
logger.error("The following error happened on processFoo method!", error);
}).subscribe();
Now weβll have some guidance on where the error might be coming from, even though we still donβt have much information about the actual element that generated the exception.
5. Activating Reactorβs Global Debug Configuration
The Reactor library provides a Hooks class that lets us configure the behavior of Flux and Mono operators.
By just adding the following statement, our application will instrument the calls to the publishersβ methods, wrap the construction of the operator, and capture a stack trace:
Hooks.onOperatorDebug();
After the debug mode gets activated, our exception logs will include some helpful information:
16:06:35.334 [parallel-1] ERROR c.b.d.consumer.service.FooService
- The following error happened on processFoo method!
java.lang.StringIndexOutOfBoundsException: String index out of range: 15
at j.l.String.substring(String.java:1963)
at c.d.b.c.s.FooNameHelper.lambda$1(FooNameHelper.java:38)
...
at j.l.Thread.run(Thread.java:748)
Suppressed: r.c.p.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
reactor.core.publisher.Flux.map(Flux.java:5653)
c.d.b.c.s.FooNameHelper.substringFooName(FooNameHelper.java:32)
c.d.b.c.s.FooService.processFoo(FooService.java:24)
c.d.b.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
o.s.s.s.DelegatingErrorHandlingRunnable
.run(DelegatingErrorHandlingRunnable.java:54)
o.u.c.Executors$RunnableAdapter.call(Executors.java:511)
o.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
|_ Flux.map β’ c.d.b.c.s.FooNameHelper
.substringFooName(FooNameHelper.java:32)
|_ Flux.map β’ c.d.b.c.s.FooReporter.reportResult(FooReporter.java:15)
As we can see, the first section remains relatively the same, but the following sections provide information about:
- The assembly trace of the publisher β here we can confirm that the error was first generated in the processFoo method.
- The operators that observed the error after it was first triggered, with the user class where they were chained.
Note: In this example, mainly to see this clearly, weβre adding the operations on different classes.
We can toggle the debug mode on or off at any time, but it wonβt affect Flux and Mono objects that have already been instantiated.
5.1. Executing Operators on Different Threads
One other aspect to keep in mind is that the assembly trace is generated properly even if there are different threads operating on the stream.
Letβs have a look at the following example:
public void processFoo(Flux<Foo> flux) {
flux.publishOn(Schedulers.newSingle("foo-thread"))
// ...
.publishOn(Schedulers.newSingle("bar-thread"))
.map(FooReporter::reportResult)
.subscribeOn(Schedulers.newSingle("starter-thread"))
.subscribe();
}
Now if we check the logs weβll appreciate that in this case, the first section might change a little bit, but the last two remain fairly the same.
The first part is the thread stack trace, therefore itβll show only the operations carried out by a particular thread.
As weβve seen, thatβs not the most important section when weβre debugging the application, so this change is acceptable.
6. Activating the Debug Output on a Single Process
Instrumenting and generating a stack trace in every single reactive process is costly.
Thus, we should implement the former approach only in critical cases.
Anyhow, Reactor provides a way to enable the debug mode on single crucial processes, which is less memory-consuming.
Weβre referring to the checkpoint operator:
public void processFoo(Flux<Foo> flux) {
// ...
flux.checkpoint("Observed error on processFoo", true)
.subscribe();
}
Note that in this manner, the assembly trace will be logged at the checkpoint stage:
Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
...
Assembly trace from producer [reactor.core.publisher.FluxMap],
described as [Observed error on processFoo] :
r.c.p.Flux.checkpoint(Flux.java:3096)
c.b.d.c.s.FooService.processFoo(FooService.java:26)
c.b.d.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
o.s.s.s.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
j.u.c.Executors$RunnableAdapter.call(Executors.java:511)
j.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
|_ Flux.checkpoint β’ c.b.d.c.s.FooService.processFoo(FooService.java:26)
We should implement the checkpoint method towards the end of the reactive chain.
Otherwise, the operator wonβt be able to observe errors occurring downstream.
Also, letβs note that the library offers an overloaded method. We can avoid:
- specifying a description for the observed error if we use the no-args option
- generating a filled stack trace (which is the most costly operation), by providing just the custom description
7. Logging a Sequence of Elements
Finally, Reactor publishers offer one more method that could potentially come in handy in some cases.
By calling the log method in our reactive chain, the application will log each element in the flow with the state that it has at that stage.
Letβs try it out in our example:
public void processFoo(Flux<Foo> flux) {
flux.map(FooNameHelper::concatFooName)
.map(FooNameHelper::substringFooName)
.log();
.map(FooReporter::reportResult)
.doOnError(error -> {
logger.error("The following error happened on processFoo method!", error);
})
.subscribe();
}
And check the logs:
INFO reactor.Flux.OnAssembly.1 - onSubscribe(FluxMap.MapSubscriber)
INFO reactor.Flux.OnAssembly.1 - request(unbounded)
INFO reactor.Flux.OnAssembly.1 - onNext(Foo(id=0, formattedName=theFo, quantity=8))
INFO reactor.Flux.OnAssembly.1 - onNext(Foo(id=1, formattedName=theFo, quantity=3))
INFO reactor.Flux.OnAssembly.1 - onNext(Foo(id=2, formattedName=theFo, quantity=5))
INFO reactor.Flux.OnAssembly.1 - onNext(Foo(id=3, formattedName=theFo, quantity=6))
INFO reactor.Flux.OnAssembly.1 - onNext(Foo(id=4, formattedName=theFo, quantity=6))
INFO reactor.Flux.OnAssembly.1 - cancel()
ERROR c.b.d.consumer.service.FooService
- The following error happened on processFoo method!
...
We can easily see the state of each Foo object at this stage, and how the framework cancels the flow when an exception happens.
Of course, this approach is also costly, and weβll have to use it in moderation.
8. Conclusion
We can consume a lot of our time and effort troubleshooting problems if we donβt know the tools and mechanisms to debug our application properly.
This is especially true if weβre not used to handling reactive and asynchronous data structures, and we need extra help to figure out how things work.
