Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:
Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.
Get started with mocking and improve your application tests using our Mockito guide:
Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.
Get started with understanding multi-threaded applications with our Java Concurrency guide:
Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:
Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.
But these can also be overused and fall into some common pitfalls.
To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:
Get started with Spring and Spring Boot, through the Learn Spring course:
>> LEARN SPRINGExplore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:
Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.
I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.
You can explore the course here:
Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.
Get started with Spring Data JPA through the guided reference course:
Refactor Java code safely β and automatically β with OpenRewrite.
Refactoring big codebases by hand is slow, risky, and easy to put off. Thatβs where OpenRewrite comes in. The open-source framework for large-scale, automated code transformations helps teams modernize safely and consistently.
Each month, the creators and maintainers of OpenRewrite at Moderne run live, hands-on training sessions β one for newcomers and one for experienced users. Youβll see how recipes work, how to apply them across projects, and how to modernize code with confidence.
Join the next session, bring your questions, and learn how to automate the kind of work that usually eats your sprint time.
1. Introduction
The popularity of RxJava has led to the creation of multiple third-party libraries that extend its functionality.
Many of those libraries were an answer to typical problems that developers were dealing with when using RxJava. RxRelay is one of these solutions.
2. Dealing With a Subject
Simply put, a Subject acts as a bridge between Observable and Observer. Since itβs an Observer, it can subscribe to one or more Observables and receive events from them.
Also, given itβs at the same time an Observable, it can reemit events or emit new events to its subscribers. More information about the Subject can be found in this article.
One of the issues with Subject is that after it receives onComplete() or onError() β itβs no longer able to move data. Sometimes itβs the desired behavior, but sometimes itβs not.
In cases when such behavior isnβt desired, we should consider using RxRelay.
3. Relay
A Relay is basically a Subject, but without the ability to call onComplete() and onError(), thus itβs constantly able to emit data.
This allows us to create bridges between different types of API without worrying about accidentally triggering the terminal state.
To use RxRelay we need to add the following dependency to our project:
<dependency>
<groupId>com.jakewharton.rxrelay2</groupId>
<artifactId>rxrelay</artifactId>
<version>1.2.0</version>
</dependency>
4. Types of Relay
Thereβre three different types of Relay available in the library. Weβll quickly explore all three here.
4.1. PublishRelay
This type of Relay will reemit all events once the Observer has subscribed to it.
The events will be emitted to all subscribers:
public void whenObserverSubscribedToPublishRelay_itReceivesEmittedEvents() {
PublishRelay<Integer> publishRelay = PublishRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
publishRelay.subscribe(firstObserver);
firstObserver.assertSubscribed();
publishRelay.accept(5);
publishRelay.accept(10);
publishRelay.subscribe(secondObserver);
secondObserver.assertSubscribed();
publishRelay.accept(15);
firstObserver.assertValues(5, 10, 15);
// second receives only the last event
secondObserver.assertValue(15);
}
Thereβs no buffering of events in this case, so this behavior is similar to a cold Observable.
4.2. BehaviorRelay
This type of Relay will reemit the most recent observed event and all subsequent events once the Observer has subscribed:
public void whenObserverSubscribedToBehaviorRelay_itReceivesEmittedEvents() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
behaviorRelay.accept(5);
behaviorRelay.subscribe(firstObserver);
behaviorRelay.accept(10);
behaviorRelay.subscribe(secondObserver);
behaviorRelay.accept(15);
firstObserver.assertValues(5, 10, 15);
secondObserver.assertValues(10, 15);
}
When weβre creating the BehaviorRelay we can specify the default value, which will be emitted, if thereβre no other events to emit.
To specify the default value we can use createDefault() method:
public void whenObserverSubscribedToBehaviorRelay_itReceivesDefaultValue() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.createDefault(1);
TestObserver<Integer> firstObserver = new TestObserver<>();
behaviorRelay.subscribe(firstObserver);
firstObserver.assertValue(1);
}
If we donβt want to specify the default value, we can use the create() method:
public void whenObserverSubscribedToBehaviorRelayWithoutDefaultValue_itIsEmpty() {
BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
TestObserver<Integer> firstObserver = new TestObserver<>();
behaviorRelay.subscribe(firstObserver);
firstObserver.assertEmpty();
}
4.3. ReplayRelay
This type of Relay buffers all events it has received and then reemits it to all subscribers that subscribe to it:
public void whenObserverSubscribedToReplayRelay_itReceivesEmittedEvents() {
ReplayRelay<Integer> replayRelay = ReplayRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
replayRelay.subscribe(firstObserver);
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.subscribe(secondObserver);
firstObserver.assertValues(5, 10, 15);
secondObserver.assertValues(5, 10, 15);
}
All elements are buffered and all subscribers will receive the same events, so this behavior is similar to the cold Observable.
When weβre creating the ReplayRelay we can provide maximal buffer size and time to live for events.
To create the Relay with limited buffer size we can use the createWithSize() method. When thereβre more events to be buffered than the set buffer size, previous elements will be discarded:
public void whenObserverSubscribedToReplayRelayWithLimitedSize_itReceivesEmittedEvents() {
ReplayRelay<Integer> replayRelay = ReplayRelay.createWithSize(2);
TestObserver<Integer> firstObserver = TestObserver.create();
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.accept(20);
replayRelay.subscribe(firstObserver);
firstObserver.assertValues(15, 20);
}
We can also create ReplayRelay with max time to leave for buffered events using the createWithTime() method:
public void whenObserverSubscribedToReplayRelayWithMaxAge_thenItReceivesEmittedEvents() {
SingleScheduler scheduler = new SingleScheduler();
ReplayRelay<Integer> replayRelay =
ReplayRelay.createWithTime(2000, TimeUnit.MILLISECONDS, scheduler);
long current = scheduler.now(TimeUnit.MILLISECONDS);
TestObserver<Integer> firstObserver = TestObserver.create();
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.accept(20);
Thread.sleep(3000);
replayRelay.subscribe(firstObserver);
firstObserver.assertEmpty();
}
5. Custom Relay
All types described above extend the common abstract class Relay, this gives us an ability to write our own custom Relay class.
To create a custom Relay we need to implement three methods: accept(), hasObservers() and subscribeActual().
Letβs write a simple Relay that will reemit event to one of the subscribers chosen at random:
public class RandomRelay extends Relay<Integer> {
Random random = new Random();
List<Observer<? super Integer>> observers = new ArrayList<>();
@Override
public void accept(Integer integer) {
int observerIndex = random.nextInt() % observers.size();
observers.get(observerIndex).onNext(integer);
}
@Override
public boolean hasObservers() {
return observers.isEmpty();
}
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observers.add(observer);
observer.onSubscribe(Disposables.fromRunnable(
() -> System.out.println("Disposed")));
}
}
We can now test that only one subscriber will receive the event:
public void whenTwoObserversSubscribedToRandomRelay_thenOnlyOneReceivesEvent() {
RandomRelay randomRelay = new RandomRelay();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
randomRelay.subscribe(firstObserver);
randomRelay.subscribe(secondObserver);
randomRelay.accept(5);
if(firstObserver.values().isEmpty()) {
secondObserver.assertValue(5);
} else {
firstObserver.assertValue(5);
secondObserver.assertEmpty();
}
}
6. Conclusion
In this tutorial, we had a look at RxRelay, a type similar to Subject but without the ability to trigger the terminal state.
