Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.
Get started with mocking and improve your application tests using our Mockito guide:
Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.
Get started with understanding multi-threaded applications with our Java Concurrency guide:
Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:
Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.
But these can also be overused and fall into some common pitfalls.
To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:
Get started with Spring and Spring Boot, through the Learn Spring course:
>> LEARN SPRINGExplore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:
Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.
I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.
You can explore the course here:
Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.
Get started with Spring Data JPA through the guided reference course:
Refactor Java code safely β and automatically β with OpenRewrite.
Refactoring big codebases by hand is slow, risky, and easy to put off. Thatβs where OpenRewrite comes in. The open-source framework for large-scale, automated code transformations helps teams modernize safely and consistently.
Each month, the creators and maintainers of OpenRewrite at Moderne run live, hands-on training sessions β one for newcomers and one for experienced users. Youβll see how recipes work, how to apply them across projects, and how to modernize code with confidence.
Join the next session, bring your questions, and learn how to automate the kind of work that usually eats your sprint time.
1. Introduction
Spring Cloud Data Flow is a cloud-native programming and operating model for composable data microservices.
With Spring Cloud Data Flow, developers can create and orchestrate data pipelines for common use cases such as data ingest, real-time analytics, and data import/export.
These data pipelines come in two flavors, streaming and batch data pipelines.
In the first case, an unbounded amount of data is consumed or produced via messaging middleware. In the second case, the short-lived task processes a finite set of data and then terminates.
This article will focus on streaming processing.
2. Architectural Overview
The key components of this type of architecture are Applications, the Data Flow Server, the Skipper Server, and the target runtime.
Also in addition to these key components, we usually have a Data Flow Shell and a message broker within the architecture.
Letβs see all these components in more detail.
2.1. Applications
Typically, a streaming data pipeline includes consuming events from external systems, data processing, and polyglot persistence. These phases are commonly referred to as Source, Processor, and Sink in Spring Cloud terminology:
- Source: is the application that consumes events
- Processor: consumes data from the Source, does some processing on it, and emits the processed data to the next application in the pipeline
- Sink: either consumes from a Source or Processor and writes the data to the desired persistence layer
These applications can be packaged in two ways:
- Spring Boot uber-jar that is hosted in a maven repository, file, http or any other Spring resource implementation (this method will be used in this article)
- Docker
Many sources, processor, and sink applications for common use-cases (e.g. jdbc, hdfs, http, router) are already provided and ready to use by the Spring Cloud Data Flow team.
2.2. Runtime
Also, a runtime is needed for these applications to execute. The supported runtimes are:
- Cloud Foundry
- Kubernetes
- Local Server for development (which will be used in this article)
2.3. Data Flow Server
The component that is responsible for deploying applications to a runtime is the Data Flow Server. There is a Data Flow Server executable jar provided for each of the target runtimes.
The Data Flow Server is responsible for interpreting:
- A stream DSL that describes the logical flow of data through multiple applications.
- A deployment manifest that describes the mapping of applications onto the runtime.
2.4. Skipper Server
The Skipper Server is responsible for:
- Deploying streams to one or more platforms.
- Upgrading and rolling back streams on one or more platforms by using a state machine-based blue/green update strategy.
- Storing the history of each streamβs manifest file
2.5. Data Flow Shell
The Data Flow Shell is a client for the Data Flow Server. The shell allows us to perform the DSL command needed to interact with the server.
As an example, the DSL to describe the flow of data from an http source to a jdbc sink would be written as βhttp | jdbcβ. These names in the DSL are registered with the Data Flow Server and map onto application artifacts that can be hosted in Maven or Docker repositories.
Spring also offers a graphical interface, named Flo, for creating and monitoring streaming data pipelines. However, its use is outside the discussion of this article.
2.6. Message Broker
As weβve seen in the example of the previous section, we have used the pipe symbol in the definition of the flow of data. The pipe symbol represents the communication between the two applications via messaging middleware.
This means that we need a message broker up and running in the target environment.
The two messaging middleware brokers that are supported are:
- Apache Kafka
- RabbitMQ
And so, now that we have an overview of the architectural components β itβs time to build our first stream processing pipeline.
3. Install a Message Broker
As we have seen, the applications in the pipeline need a messaging middleware to communicate. For this article, weβll go with RabbitMQ.
For the full details of the installation, you can follow the instructions on the official site.
4. The Local Data Flow Server and Skipper Server
Download the Spring Cloud Data Flow Server and Spring Cloud Skipper Server by using the following commands:
wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-server/2.11.2/spring-cloud-dataflow-server-2.11.2.jar
wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-skipper-server/2.11.2/spring-cloud-skipper-server-2.11.2.jar
Now you need to start the applications that comprise the server:
Skipper :
In the directory where you downloaded Skipper, run the server by using java -jar, as follows:
java -jar spring-cloud-skipper-server-2.11.2.jar
The application will boot up on port 7577.
Dataflow:
In a different terminal window and in the directory where you downloaded Data Flow, run the server by using java -jar, as follows:
java -jar spring-cloud-dataflow-server-2.11.2.jar
The application will boot up on port 9393.
5. The Data Flow Shell
Download the Spring Cloud Data Shell by using the following command:
wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-shell/2.11.2/spring-cloud-dataflow-shell-2.11.2.jar
Now start the Spring Cloud Data Flow Shell with the following command:
java -jar spring-cloud-dataflow-shell-2.11.2.jar
After the shell is running, we can type the help command in the prompt to see a complete list of commands that we can perform.
6. The Source Application
On Initializr, weβll now create a simple application and add a Stream Rabbit dependency called spring-cloud-starter-stream-rabbit:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Weβll then add the @EnableBinding(Source.class) annotation to the Spring Boot main class:
@EnableBinding(Source.class)
@SpringBootApplication
public class SpringDataFlowTimeSourceApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowTimeSourceApplication.class, args);
}
}
Now we need to define the source of the data that must be processed. This source could be any potentially endless workload (internet-of-things sensor data, 24/7 event processing, online transaction data ingest).
In our sample application, we produce one event (for simplicity a new timestamp) every 10 seconds with a Poller.
The @InboundChannelAdapter annotation sends a message to the sourceβs output channel, using the return value as the payload of the message:
@Bean
@InboundChannelAdapter(
value = Source.OUTPUT,
poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
)
public MessageSource<Long> timeMessageSource() {
return () -> MessageBuilder.withPayload(new Date().getTime()).build();
}
Our data source is ready.
7. The Processor Application
Next- weβll create an application and add a Stream Rabbit dependency.
Weβll then add the @EnableBinding(Processor.class) annotation to the Spring Boot main class:
@EnableBinding(Processor.class)
@SpringBootApplication
public class SpringDataFlowTimeProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowTimeProcessorApplication.class, args);
}
}
Next, we need to define a method to process the data that coming from the source application.
To define a transformer, we need to annotate this method with @Transformer annotation:
@Transformer(inputChannel = Processor.INPUT,
outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
String date = dateFormat.format(timestamp);
return date;
}
It converts a timestamp from the βinputβ channel to a formatted date which will be sent to the βoutputβ channel.
8. The Sink Application
The last application to create is the Sink application.
Again, go to the Spring Initializr and choose a Group, an Artifact name. After downloading the project letβs add a Stream Rabbit dependency.
Then add the @EnableBinding(Sink.class) annotation to the Spring Boot main class:
@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringDataFlowLoggingSinkApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowLoggingSinkApplication.class, args);
}
}
Now we need a method to intercept the messages coming from the processor application.
To do this, we need to add the @StreamListener(Sink.INPUT) annotation to our method:
@StreamListener(Sink.INPUT)
public void loggerSink(String date) {
logger.info("Received: " + date);
}
The method simply prints the timestamp transformed in a formatted date to a log file.
9. Register a Stream App
The Spring Cloud Data Flow Shell allows us to Register a Stream App with the App Registry using the app register command.
We must provide a unique name, application type, and a URI that can be resolved to the app artifact. For the type, specify βsourceβ, βprocessorβ, or βsinkβ.
When providing a URI with the maven scheme, the format should conform to the following:
maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
To register the Source, Processor, and Sink applications previously created, go to the Spring Cloud Data Flow Shell and issue the following commands from the prompt:
app register --name time-source --type source --uri file://local machine path to/time-source-0.0.1-SNAPSHOT.jar
app register --name time-processor --type processor --uri file://local machine path to/time-processor-0.0.1-SNAPSHOT.jar
app register --name log-sink --type sink --uri file://local machine path to/log-sink-0.0.1-SNAPSHOT.jar
10. Create and Deploy the Stream
To create a new stream definition go to the Spring Cloud Data Flow Shell and execute the following shell command:
stream create --name time-to-log --definition 'time-source | time-processor | log-sink'
This defines a stream named time-to-log based on the DSL expression βtime-source | time-processor | log-sinkβ.
Then to deploy the stream execute the following shell command:
stream deploy --name time-to-log
The Data Flow Server resolves time-source, time-processor, and log-sink to maven coordinates and uses those to launch the time-source, time-processor, and log-sink applications of the stream.
If the stream is correctly deployed youβll see in the Data Flow Server logs that the modules have been started and tied together:
2024-04-15 15:15:27.153 INFO 23568 β [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager : Getting status for org.springframework.cloud.skipper.domain.Release@4de8f9c3 using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v1
2024-04-15 15:15:32.156 INFO 23568 β [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager : Getting status for org.springframework.cloud.skipper.domain.Release@4d72121e using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v1
2024-04-15 15:15:37.157 INFO 23568 β [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager : Getting status for org.springframework.cloud.skipper.domain.Release@9a35173 using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v1
2024-04-15 15:15:42.160 INFO 23568 β [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager : Getting status for org.springframework.cloud.skipper.domain.Release@1ec5d911 using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v1
11. Reviewing the Result
In this example, the source sends the current timestamp as a message each second, the processor formats it and the log sink outputs the formatted timestamp using the logging framework.
The log files are located within the directory displayed in the Data Flow Serverβs log output, as shown above. To see the result, we can tail the log:
tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log
2016-08-24 12:40:42.029 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01
2016-08-24 12:40:52.035 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11
2016-08-24 12:41:02.030 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21
12. Conclusion
This article shows how to build a data pipeline for stream processing using Spring Cloud Data Flow.
Also, we saw the role of Source, Processor, and Sink applications inside the stream and how to plug and tie this module inside a Data Flow Server using Data Flow Shell.
