Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.
Get started with mocking and improve your application tests using our Mockito guide:
Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.
Get started with understanding multi-threaded applications with our Java Concurrency guide:
Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:
Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.
But these can also be overused and fall into some common pitfalls.
To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:
Get started with Spring and Spring Boot, through the Learn Spring course:
>> LEARN SPRINGExplore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:
Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.
I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.
You can explore the course here:
Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.
Get started with Spring Data JPA through the guided reference course:
Refactor Java code safely โ and automatically โ with OpenRewrite.
Refactoring big codebases by hand is slow, risky, and easy to put off. Thatโs where OpenRewrite comes in. The open-source framework for large-scale, automated code transformations helps teams modernize safely and consistently.
Each month, the creators and maintainers of OpenRewrite at Moderne run live, hands-on training sessions โ one for newcomers and one for experienced users. Youโll see how recipes work, how to apply them across projects, and how to modernize code with confidence.
Join the next session, bring your questions, and learn how to automate the kind of work that usually eats your sprint time.
1. Overview
Spring Cloud Stream is a framework built on top of Spring Boot and Spring Integration that helps in creating event-driven or message-driven microservices.
In this article, weโll introduce concepts and constructs of Spring Cloud Stream with some simple examples.
2. Maven Dependencies
To get started, weโll need to add the Spring Cloud Starter Stream with the broker RabbitMQ Maven dependency as messaging-middleware to our pom.xml:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
And weโll add the spring-cloud-stream-test-binder dependency to enable JUnit support as well:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>
Lastly, we will use spring-cloud-dependencies for dependency management and select the appropriate version based on the compatibility matrix:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2024.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
3. Main Concepts
Microservices architecture follows the โsmart endpoints and dumb pipesโ principle. Communication between endpoints is driven by messaging-middleware parties like RabbitMQ or Apache Kafka. Services communicate by publishing domain events via these endpoints or channels.
Letโs walk through the concepts that make up the Spring Cloud Stream framework, along with the essential paradigms that we must be aware of to build message-driven services.
3.1. Constructs
By adding the spring-cloud-stream dependencies to the classpath, we automatically connect to a message broker through a Spring Cloud Stream โbinderโ. Additionally, we can define a Java function as a Spring bean to process incoming messages:
@SpringBootApplication
public class LogEnricherApplication {
public static void main(String[] args) {
SpringApplication.run(LogEnricherApplication.class, args);
}
@Bean
public Function<String, String> enrichLogMessage() {
return value -> "[%s] - %s".formatted("Baeldung", value);
}
}
Letโs take a look at the definition of all these concepts:
- Bindings โ a collection of Java functions to process, transform, or send messages
- Binder โ messaging-middleware implementation such as Kafka or RabbitMQ
- Channel โ represents the communication pipe between messaging-middleware and the application
- Message Schemas โ used for serialization and deserialization of messages, these schemas can be statically read from a location or loaded dynamically, supporting the evolution of domain object types
- StreamListeners โ message-handling methods in beans that will be automatically invoked on a message from the channel after the MessageConverter does the serialization/deserialization between middleware-specific events and domain object types or POJOs
3.2. Communication Patterns
Messages designated to destinations are delivered by the Publish-Subscribe messaging pattern. Publishers categorize messages into topics, each identified by a name. Subscribers express interest in one or more topics. The middleware filters the messages, delivering those interesting topics to the subscribers.
Now, the subscribers could be grouped. A consumer group is a set of subscribers or consumers, identified by a group id, within which messages from a topic or topicโs partition are delivered in a load-balanced manner.
4. Programming Model
This section describes the basics of building Spring Cloud Stream applications.
4.1. Functional Testing
The test support allows us to leverage a binder implementation to interact with our Spring Cloud Stream channels:
@EnableTestBinder
@SpringBootTest
class LogEnricherApplicationUnitTest {
@Autowired
private InputDestination input;
@Autowired
private OutputDestination output;
@Test
void whenSendingLogMessage_thenMessageIsEnrichedWithPrefix() {
// ...
}
}
Letโs send a message to the above enrichLogMessage service and check whether the response contains the text โBaeldungโ prefix:
@Test
void whenSendingLogMessage_thenItsEnrichedWithPrefix() {
input.send(MessageBuilder.withPayload("hello world").build(), "enrichLogMessage-in-0");
Message<byte[]> message = output.receive(1000L, "enrichLogMessage-out-0");
assertThat(message.getPayload())
.asString()
.isEqualTo("[Baeldung] - hello world");
}
As we can see, input bindings are named using the function name followed by โ-in-โ and an index. Similarly, output bindings use the โ-out-โ suffix, and an index. The โinโ and โoutโ indicate the type of binding, and the index is usually 0 for single input and output functions, relevant only for functions with multiple inputs or outputs.
4.2. Binding Destinations
We can use the Spring Cloud Stream configuration to map the bindings to custom destinations โ such as topics or queue names. For instance, letโs update the application.yml and use the queues โqueue.log.messagesโ and โqueue.pretty.log.messagesโ as input and output for our service:
spring:
cloud:
stream:
bindings:
enrichLogMessage-in-0:
destination: queue.log.messages
enrichLogMessage-out-0:
destination: queue.pretty.log.messages
Now, we can update our test and start using the destination names instead of the binding names:
@Test
void whenSendingLogMessage_thenItsEnrichedWithPrefix() {
input.send(MessageBuilder.withPayload("hello world").build(), "queue.log.messages");
Message<byte[]> message = output.receive(1000L, "queue.pretty.log.messages");
assertThat(message.getPayload())
.asString()
.isEqualTo("[Baeldung] - hello world");
}
As a result, our application will listen to โqueue.log.messagesโ, process the messages, and publish the results to โqueue.pretty.log.messagesโ.
4.3. Event Routing
In Spring Cloud Stream, Event Routing involves managing the flow of events between sources and destinations. Its key purpose is to route events to a specific subscriber or a designated destination based on the event producer.
For instance, suppose we want to enrich only log messages longer than ten characters, leaving shorter ones unchanged. To achieve this, letโs define another function that returns a Message<String>, allowing us to customize the message metadata:
@Bean
public Function<String, Message<String>> processLogs() {
return log -> {
boolean shouldBeEnriched = log.length() > 10;
// ...
};
}
After that, weโll specify the new destination in the message header using the spring.cloud.stream.sendto.destination key. In our case, if the log needs enrichment, we will route it to the enrichLogMessage-in-0 binding. Otherwise, weโll send the message directly to the output queue:
@Bean
public Function<String, Message<String>> processLogs() {
return log -> {
boolean shouldBeEnriched = log.length() > 10;
String destination = shouldBeEnriched ? "enrichLogMessage-in-0" : "queue.pretty.log.messages";
return MessageBuilder.withPayload(log)
.setHeader("spring.cloud.stream.sendto.destination", destination)
.build();
};
}
Now, we need to update the configuration to enable event routing. Since we have more than one function declared as a bean, letโs also include both via the spring.cloud.function.definition property:
spring:
cloud:
function:
definition: enrichLogMessage;processLogs
stream:
function.routing.enabled: true
bindings:
# ...
Thatโs it! Letโs test our code โ if we send the โhello worldโ string to the processLog binding, weโll expect it to be updated and published to โqueue.pretty.log.messageโ:
@Test
void whenProcessingLongLogMessage_thenItsEnrichedWithPrefix() {
input.send(MessageBuilder.withPayload("hello world").build(), "processLogs-in-0");
Message<byte[]> message = output.receive(1000L, "queue.pretty.log.messages");
assertThat(message.getPayload())
.asString()
.isEqualTo("[Baeldung] - hello world");
}
On the other hand, if we send a string that is less than ten characters log, weโll expect it to be published to โqueue.pretty.log.messageโ without any modification:
@Test
void whenProcessingShortLogMessage_thenItsNotEnrichedWithPrefix() {
input.send(MessageBuilder.withPayload("hello").build(), "processLogs-in-0");
Message<byte[]> messgae= output.receive(1000L, "queue.pretty.log.messages");
assertThat(messgae.getPayload())
.asString()
.isEqualTo("hello");
}
5. Setup
Letโs set up the application that will process the message from the RabbitMQ broker.
5.1. Binder Configuration
As previously discussed, we can add the binder library for RabbitMQ to the classpath by including this dependency. However, if no binder implementation is provided, Spring will use direct message communication between the channels.
5.2. RabbitMQ Configuration
To configure the example in section 3.1 to use the RabbitMQ binder, we need to update the application.yml located at src/main/resources:
spring:
cloud:
stream:
bindings:
input:
destination: queue.log.messages
binder: local_rabbit
output:
destination: queue.pretty.log.messages
binder: local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: <host>
port: 5672
username: <username>
password: <password>
virtual-host: /
The input binding will use the exchange called queue.log.messages, and the output binding will use the exchange โqueue.pretty.log.messagesโ. Both bindings will use the binder called local_rabbit.
Note that we donโt need to create the RabbitMQ exchanges or queues in advance. When running the application, both exchanges are automatically created.
To test the application, we can use the RabbitMQ management site to publish a message. In the Publish Message panel of the exchange โqueue.log.messagesโ, we need to enter the request in JSON format.
5.3. Customizing Message Conversion
For this code example, letโs add another binding โ we can call it highlightLogs:
@Bean
Function<LogMessage, String> highlightLogs() {
return logMsg -> logMsg.message().toUpperCase();
}
This time, our function takes a Java object called LogMessage as input:
record LogMessage(String message) {
}
For such use cases, Spring Cloud Stream allows us to apply custom message conversion for specific content types. Letโs define a custom message converter to be used to deserialize LogMessage objects when the contentType header is set to text/plain:
@Component
class TextPlainMessageConverter extends AbstractMessageConverter {
public TextPlainMessageConverter() {
super(new MimeType("text", "plain"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (LogMessage.class == clazz);
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
String text = payload instanceof String ? (String) payload : new String((byte[]) payload);
return new LogMessage(text);
}
}
Finally, we can test our converter by sending a message with the text/plain content type. When we run the test, we expect the message to be processed and the uppercase string to be returned as the binding output:
@Test
void whenHighlightingLogMessage_thenItsTransformedToUppercase() {
Message<String> msgIn = MessageBuilder.withPayload("hello")
.setHeader("contentType", "text/plain")
.build();
input.send(msgIn, "highlightLogs-in-0");
Message<byte[]> msgOut = output.receive(1000L, "highlightLogs-out-0");
assertThat(msgOut.getPayload())
.asString()
.isEqualTo("HELLO");
}
5.4. Consumer Groups
When running multiple instances of our application, every time there is a new message in an input channel, all subscribers will be notified.
Most of the time, we need the message to be processed only once. Spring Cloud Stream implements this behavior via consumer groups.
To enable this behavior, each consumer binding can use the spring.cloud.stream.bindings.<CHANNEL>.group property to specify a group name:
spring:
cloud:
stream:
bindings:
enrichLogMessage-in-0:
destination: queue.log.messages
group: test-group
# ...
6. Message-Driven Microservices
In this section, we introduce all the required features for running our Spring Cloud Stream applications in a microservices context.
6.1. Scaling Up
When multiple applications are running, itโs important to ensure the data is split properly across consumers. To do so, Spring Cloud Stream provides two properties:
- spring.cloud.stream.instanceCount โ number of running applications
- spring.cloud.stream.instanceIndex โ index of the current application
For example, if weโve deployed two instances of the above MyLoggerServiceApplication application, the property spring.cloud.stream.instanceCount should be 2 for both applications, and the property spring.cloud.stream.instanceIndex should be 0 and 1 respectively.
These properties are automatically set if we deploy the Spring Cloud Stream applications using Spring Data Flow as described in this article.
6.2. Partitioning
The domain events could be Partitioned messages. This helps when we are scaling up the storage and improving application performance.
The domain event usually has a partition key so that it ends up in the same partition with related messages.
Letโs say that we want the log messages to be partitioned by the first letter in the message, which would be the partition key, and grouped into two partitions.
There would be one partition for the log messages that start with A-M and another partition for N-Z. This can be configured using two properties:
- spring.cloud.stream.bindings.output.producer.partitionKeyExpression โ the expression to partition the payloads
- spring.cloud.stream.bindings.output.producer.partitionCount โ the number of groups
Sometimes the expression to partition is too complex to write it in only one line. For these cases, we can write our custom partition strategy using the property spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.
6.3. Health Indicator
In a microservices context, we also need to detect when a service is down or starts failing. Spring Cloud Stream provides the property management.health.binders.enabled to enable the health indicators for binders.
When running the application, we can query the health status at http://<host>:<port>/health.
7. Conclusion
In this tutorial, we presented the main concepts of Spring Cloud Stream and showed how to use it through some simple examples over RabbitMQ. More info about Spring Cloud Stream can be found here.
