Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.
Get started with mocking and improve your application tests using our Mockito guide:
Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.
Get started with understanding multi-threaded applications with our Java Concurrency guide:
Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:
Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.
But these can also be overused and fall into some common pitfalls.
To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:
Get started with Spring and Spring Boot, through the Learn Spring course:
>> LEARN SPRINGExplore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:
Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.
I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.
You can explore the course here:
Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.
Get started with Spring Data JPA through the guided reference course:
Refactor Java code safely β and automatically β with OpenRewrite.
Refactoring big codebases by hand is slow, risky, and easy to put off. Thatβs where OpenRewrite comes in. The open-source framework for large-scale, automated code transformations helps teams modernize safely and consistently.
Each month, the creators and maintainers of OpenRewrite at Moderne run live, hands-on training sessions β one for newcomers and one for experienced users. Youβll see how recipes work, how to apply them across projects, and how to modernize code with confidence.
Join the next session, bring your questions, and learn how to automate the kind of work that usually eats your sprint time.
1. Overview
Spring Cloud Data Flow is a cloud-native toolkit for building real-time data pipelines and batch processes. Spring Cloud Data Flow is ready to be used for a range of data processing use cases like simple import/export, ETL processing, event streaming, and predictive analytics.
In this tutorial, weβll learn an example of real-time Extract Transform and Load (ETL) using a stream pipeline that extracts data from a JDBC database, transforms it to simple POJOs and loads it into a MongoDB.
2. ETL and Event-Stream Processing
ETL β extract, transform and load β was commonly referred to as a process that batch-loads data from several databases and systems into a common data warehouse. In this data warehouse, itβs possible to do heavy data analysis processing without compromising the overall performance of the system.
However, new trends are changing the way how this is done. ETL still has a role in transferring data to data warehouses and data lakes.
Nowadays this can be done with streams in an event-stream architecture with the help of Spring Cloud Data Flow.
3. Spring Cloud Data Flow
With Spring Cloud Data Flow (SCDF), developers can create data pipelines in two flavors:
- Long-lived real-time stream applications using Spring Cloud Stream
- Short-lived batched task applications using Spring Cloud Task
In this article, weβll cover the first, a long-lived streaming application based on Spring Cloud Stream.
3.1. Spring Cloud Stream Applications
The SCDF Stream pipelines are composed of steps, where each step is an application built in Spring Boot style using the Spring Cloud Stream micro-framework. These applications are integrated by a messaging middleware like Apache Kafka or RabbitMQ.
These applications are classified into sources, processors, and sinks. Comparing to the ETL process, we could say that the source is the βextractβ, the processor is the βtransformerβ and the sink is the βloadβ part.
In some cases, we can use an application starter in one or more steps of the pipeline. This means that we wouldnβt need to implement a new application for a step, but instead, configure an existing application starter already implemented.
A list of application starters could be found here.
3.2. Spring Cloud Data Flow Server
The last piece of the architecture is the Spring Cloud Data Flow Server. The SCDF Server does the deployment of the applications and the pipeline stream using the Spring Cloud Deployer Specification. This specification supports the SCDF cloud-native flavor by deploying to a range of modern runtimes, such as Kubernetes, Apache Mesos, Yarn, and Cloud Foundry.
Also, we can run the stream as a local deployment.
More information about the SCDF architecture can be found here.
4. Environment Setup
Before we start, we need to choose the pieces of this complex deployment. The first piece to define is the SCDF Server.
For testing, weβll use SCDF Server Local for local development. For the production deployment, we can later choose a cloud-native runtime, like SCDF Server Kubernetes. We can find the list of server runtimes here.
Now, letβs check the system requirements to run this server.
4.1. System Requirements
To run the SCDF Server, weβll have to define and set up two dependencies:
- the messaging middleware, and
- the RDBMS.
For the messaging middleware, weβll work with RabbitMQ, and we choose PostgreSQL as an RDBMS for storing our pipeline stream definitions.
For running RabbitMQ, download the latest version here and start a RabbitMQ instance using the default configuration or run the following Docker command:
docker run --name dataflow-rabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management
As the last setup step, install and run the PostgreSQL RDBMS on the default port 5432. After this, create a database where SCDF can store its stream definitions using the following script:
CREATE DATABASE dataflow;
4.2. Spring Cloud Data Flow Server Local
For running the SCDF Server Local, we can choose to start the server using docker-compose, or we can start it as a Java application.
Here, weβll run the SCDF Server Local as a Java application. For configuring the application, we have to define the configuration as Java application parameters. Weβll need Java 8 in the System path.
To host the jars and dependencies, we need to create a home folder for our SCDF Server and download the SCDF Server Local distribution into this folder. You can download the most recent distribution of SCDF Server Local here.
Also, we need to create a lib folder and put a JDBC driver there. The latest version of the PostgreSQL driver is available here.
Finally, letβs run the SCDF local server:
$java -Dloader.path=lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \
--spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dataflow \
--spring.datasource.username=postgres_username \
--spring.datasource.password=postgres_password \
--spring.datasource.driver-class-name=org.postgresql.Driver \
--spring.rabbitmq.host=127.0.0.1 \
--spring.rabbitmq.port=5672 \
--spring.rabbitmq.username=guest \
--spring.rabbitmq.password=guest
We can check if itβs running by looking at this URL:
http://localhost:9393/dashboard
4.3. Spring Cloud Data Flow Shell
The SCDF Shell is a command line tool that makes it easy to compose and deploy our applications and pipelines. These Shell commands run over the Spring Cloud Data Flow Server REST API.
Download the latest version of the jar into your SCDF home folder, available here. Once it is done, run the following command (update the version as needed):
$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar
____ ____ _ __
/ ___| _ __ _ __(_)_ __ __ _ / ___| | ___ _ _ __| |
\___ \| '_ \| '__| | '_ \ / _` | | | | |/ _ \| | | |/ _` |
___) | |_) | | | | | | | (_| | | |___| | (_) | |_| | (_| |
|____/| .__/|_| |_|_| |_|\__, | \____|_|\___/ \__,_|\__,_|
____ |_| _ __|___/ __________
| _ \ __ _| |_ __ _ | ___| | _____ __ \ \ \ \ \ \
| | | |/ _` | __/ _` | | |_ | |/ _ \ \ /\ / / \ \ \ \ \ \
| |_| | (_| | || (_| | | _| | | (_) \ V V / / / / / / /
|____/ \__,_|\__\__,_| |_| |_|\___/ \_/\_/ /_/_/_/_/_/
Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
If instead of βdataflow:>β you get βserver-unknown:>β in the last line, you are not running the SCDF Server at localhost. In this case, run the following command to connect to another host:
server-unknown:>dataflow config server http://{host}
Now, Shell is connected to the SCDF Server, and we can run our commands.
The first thing we need to do in Shell is to import the application starters. Find the latest version here for RabbitMQ+Maven in Spring Boot 2.0.x, and run the following command (again, update the version, here βDarwin-SR1β, as needed):
$ dataflow:>app import --uri http://bit.ly/Darwin-SR1-stream-applications-rabbit-maven
For checking the installed applications run the following Shell command:
$ dataflow:> app list
As a result, we should see a table containing all the installed applications.
Also, SCDF offers a graphical interface, named Flo, that we can access by this address: http://localhost:9393/dashboard. However, its use isnβt in the scope of this article.
5. Composing an ETL Pipeline
Letβs now create our stream pipeline. For doing this, weβll use the JDBC Source application starter to extract information from our relational database.
Also, weβll create a custom processor for transforming the information structure and a custom sink to load our data into a MongoDB.
5.1. Extract β Preparing a Relational Database for Extraction
Letβs create a database with the name of crm and a table with the name of customer:
CREATE DATABASE crm;
CREATE TABLE customer (
id bigint NOT NULL,
imported boolean DEFAULT false,
customer_name character varying(50),
PRIMARY KEY(id)
)
Note that weβre using a flag imported, which will store which record has already been imported. We could also store this information in another table, if necessary.
Now, letβs insert some data:
INSERT INTO customer(id, customer_name, imported) VALUES (1, 'John Doe', false);
5.2. Transform β Mapping JDBC Fields to the MongoDB Fields Structure
For the transformation step, weβll do a simple translation of the field customer_name from the source table, to a new field name. Other transformations could be done here, but letβs keep the example short.
To do this, weβll create a new project with the name customer-transform. The easiest way to do this is by using the Spring Initializr site to create the project. After reaching the website, choose a Group and an Artifact name. Weβll use com.customer and customer-transform, respectively.
Once this is done, click on the button βGenerate Projectβ to download the project. Then, unzip the project and import it into your favorite IDE, and add the following dependency to the pom.xml:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
Now weβre set to start coding the field name conversion. To do this, weβll create the Customer class to act as an adapter. This class will receive the customer_name via the setName() method and will output its value via getName method.
The @JsonProperty annotations will do the transformation while deserializing from JSON to Java:
public class Customer {
private Long id;
private String name;
@JsonProperty("customer_name")
public void setName(String name) {
this.name = name;
}
@JsonProperty("name")
public String getName() {
return name;
}
// Getters and Setters
}
The processor needs to receive data from an input, do the transformation and bind the outcome to an output channel. Letβs create a class to do this:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.Transformer;
@EnableBinding(Processor.class)
public class CustomerProcessorConfiguration {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Customer convertToPojo(Customer payload) {
return payload;
}
}
In the above code, we can observe that the transformation occurs automatically. The input receives the data as JSON and Jackson deserialize it into a Customer object using the set methods.
The opposite is for the output, the data is serialized to JSON using the get methods.
5.3. Load β Sink in MongoDB
Similarly to the transform step, weβll create another maven project, now with the name customer-mongodb-sink. Again, access the Spring Initializr, for the Group choose com.customer, and for the Artifact choose customer-mongodb-sink. Then, type βMongoDBβ in the dependencies search box and download the project.
Next, unzip and import it to your favorite IDE.
Then, add the same extra dependency as in the customer-transform project.
Now weβll create another Customer class, for receiving input in this step:
import org.springframework.data.mongodb.core.mapping.Document;
@Document(collection="customer")
public class Customer {
private Long id;
private String name;
// Getters and Setters
}
For sinking the Customer, weβll create a Listener class that will save the customer entity using the CustomerRepository:
@EnableBinding(Sink.class)
public class CustomerListener {
@Autowired
private CustomerRepository repository;
@StreamListener(Sink.INPUT)
public void save(Customer customer) {
repository.save(customer);
}
}
And the CustomerRepository, in this case, is a MongoRepository from Spring Data:
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface CustomerRepository extends MongoRepository<Customer, Long> {
}
5.4. Stream Definition
Now, both custom applications are ready to be registered on SCDF Server. To accomplish this, compile both projects using the Maven command mvn install.
We then register them using the Spring Cloud Data Flow Shell:
app register --name customer-transform --type processor --uri maven://com.customer:customer-transform:0.0.1-SNAPSHOT
app register --name customer-mongodb-sink --type sink --uri maven://com.customer:customer-mongodb-sink:jar:0.0.1-SNAPSHOT
Finally, letβs check if the applications are stored at SCDF, run the application list command in the shell:
app list
As a result, we should see both applications in the resulting table.
5.4.1. Stream Pipeline Domain-Specific Language β DSL
A DSL defines the configuration and data flow between the applications. The SCDF DSL is simple. In the first word, we define the name of the application, followed by the configurations.
Also, the syntax is a Unix-inspired Pipeline syntax, that uses vertical bars, also known as βpipesβ, to connect multiple applications:
http --port=8181 | log
This creates an HTTP application served in port 8181 which sends any received body payload to a log.
Now, letβs see how to create the DSL stream definition of the JDBC Source.
5.4.2. JDBC Source Stream Definition
The key configurations for the JDBC Source are query and update. query will select unread records while update will change a flag to prevent the current records from being reread.
Also, weβll define the JDBC Source to poll in a fixed delay of 30 seconds and polling maximum 1000 rows. Finally, weβll define the configurations of connection, like driver, username, password and connection URL:
jdbc
--query='SELECT id, customer_name FROM public.customer WHERE imported = false'
--update='UPDATE public.customer SET imported = true WHERE id in (:id)'
--max-rows-per-poll=1000
--fixed-delay=30 --time-unit=SECONDS
--driver-class-name=org.postgresql.Driver
--url=jdbc:postgresql://localhost:5432/crm
--username=postgres
--password=postgres
More JDBC Source configuration properties can be found here.
5.4.3. Customer MongoDB Sink Stream Definition
As we didnβt define the connection configurations in application.properties of customer-mongodb-sink, weβll configure through DSL parameters.
Our application is fully based on the MongoDataAutoConfiguration. You can check out the other possible configurations here. Basically, weβll define the spring.data.mongodb.uri:
customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main
5.4.4. Create and Deploy the Stream
First, to create the final stream definition, go back to the Shell and execute the following command (without line breaks, they have just been inserted for readability):
stream create --name jdbc-to-mongodb
--definition "jdbc
--query='SELECT id, customer_name FROM public.customer WHERE imported=false'
--fixed-delay=30
--max-rows-per-poll=1000
--update='UPDATE customer SET imported=true WHERE id in (:id)'
--time-unit=SECONDS
--password=postgres
--driver-class-name=org.postgresql.Driver
--username=postgres
--url=jdbc:postgresql://localhost:5432/crm | customer-transform | customer-mongodb-sink
--spring.data.mongodb.uri=mongodb://localhost/main"
This stream DSL defines a stream named jdbc-to-mongodb. Next, weβll deploy the stream by its name:
stream deploy --name jdbc-to-mongodb
Finally, we should see the locations of all available logs in the log output:
Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink
Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-transform
Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc
6. Conclusion
In this article, weβve seen a full example of an ETL data pipeline using Spring Cloud Data Flow.
Most noteworthy, we saw the configurations of an application starter, created an ETL stream pipeline using the Spring Cloud Data Flow Shell and implemented custom applications for our reading, transforming and writing data.
