Reactive Programming - Part 2.
29 Nov 2022
Just to summarise in the first part of this blog series these are the following topics we touched upon
Introduction to reactive programming
Difference between reactive programming vs multi-threaded applications
Different core reactive programming libraries used in Java
Apart from this we also made a start on writing a sample application in Java using the concepts of reactive programming. I strongly advise if you haven't read part 1 of this series; stop here and go read that first and then come back.
Continuing with our sample application
In the last blog we did the initial project setup and also the database setup for our User Crud service. Now let's setup the different layers of MVC
Setting up the model
As we are using Lombok a lot of boilerplate code will get auto-generated when we use the right annotations, let us first set up our user model as below
@Data @AllArgsConstructor @NoArgsConstructor @Table("users") public class User { @Id private Integer id; private String name; private int age; private double salary; }
Repository Layer
Next, to perform all database operations we need to create a repository layer that will take care of interactions with the underlying DB
If you are familiar with Spring Data you would know that we just need to create an interface and extend
CrudRepository
orPagingAndSortingRepository
and then we would be able to interact with the table with basic functions already implemented likefindAll
,save
etc.With reactive programming, there is one slight difference. Instead of extending the above two repositories, we extend the
ReactiveCrudRepository
and instead of the return type beingList
orOptional
, useFlux
andMono
publishers.With usual CrudRepository
public interface UserRepository extends CrudRepository<User,Integer> { List<User> findUsersByAge(int age); }
With Reactive programming
public interface UserRepository extends ReactiveCrudRepository<User,Integer> { Flux<User> findUsersByAge(int age); }
Service Layer
Anyone familiar with Java would know that the Repository layer takes care of interaction with the database, whereas the service layer takes care of business logic
So let’s go ahead and implement a simple
UserService
that would contain any additional logic if needed by the userLet’s first auto-wire
UserRepository
in the service layer
private final UserRepository userRepository; public UserService(UserRepository userRepository) { this.userRepository = userRepository; }
Next, let’s go ahead and implement basic Crud operation i.e Create, Read, Update, Delete as show below
public Mono<User> createUser(User user){ return userRepository.save(user); } public Flux<User> getAllUsers(){ return userRepository.findAll(); } public Mono<User> findById(Integer userId){ return userRepository.findById(userId); } public Mono<User> updateUser(Integer userId, User user){ return userRepository.findById(userId) .flatMap(dbUser -> { dbUser.setAge(user.getAge()); dbUser.setSalary(user.getSalary()); return userRepository.save(dbUser); }); } public Mono<User> deleteUser(Integer userId){ return userRepository.findById(userId) .flatMap(existingUser -> userRepository.delete(existingUser) .then(Mono.just(existingUser))); }
You may have noticed where ever I want to return 0..1 results I have used mono and where ever I want to return 0..n results I have used Flux.
Let’s also add the custom method we implemented in the repository layer
public Flux<User> findUsersByAge(int age){ return userRepository.findUsersByAge(age); }
There is no business logic required for this example, hence this all looks pretty straightforward but if at all we need to add some sort of business logic this is the layer where you should be adding it, as part of best practices.
Next, let’s implement one more method and discuss that in a bit detail
public Flux<User> fetchUsers(List<Integer> userIds) { return Flux.fromIterable(userIds) .parallel() .runOn(Schedulers.boundedElastic()) .flatMap(this::findById) .ordered((u1, u2) -> u2.getId() - u1.getId()); }
So what did we do here?
We are trying to fetch the details of multiple users simultaneously and return the result as a list of users.
First, we created a Flux from list of
userIds
and called the parallel method to indicate parallel execution using ParallelFluxNext, we decided a runOn config for schedulers, in this case, we have used
boundedElastic
which is considered good for long-lived tasks and spawns thread on-demand with a ceiling on the number of created threads. Which is basically the only difference betweenelastic
andboundedElastic
You can find more details here to decide which config would work best for your use case here
Next, we invoke flatMap to run the
findById
method which returns aParallelFlux
and then finally to convert fromParallelFlux
to a regularFlux
we have added an ordered method with a custom comparator.
Now that we have added all these methods our basic service layer is now ready to hook up with the controller
Controller Layer
Let’s get started by setting up a base controller and auto-wiring our service
@RestController @RequestMapping("/users") public class UserController { private final UserService userService; public UserController(UserService userService) { this.userService = userService; } }
Now let’s call out all the methods we added to our service layer so that when a user makes a call the required action is sent to our service layer, which in turn sends it to our repository and that in turn sends it to our database
@PostMapping @ResponseStatus(HttpStatus.CREATED) public Mono<User> create(@RequestBody User user){ return userService.createUser(user); } @GetMapping public Flux<User> getAllUsers(){ return userService.getAllUsers(); } @GetMapping("/{userId}") public Mono<ResponseEntity<User>> getUserById(@PathVariable Integer userId){ Mono<User> user = userService.findById(userId); return user.map(ResponseEntity::ok) .defaultIfEmpty(ResponseEntity.notFound().build()); } @PutMapping("/{userId}") public Mono<ResponseEntity<User>> updateUserById(@PathVariable Integer userId, @RequestBody User user){ return userService.updateUser(userId,user) .map(ResponseEntity::ok) .defaultIfEmpty(ResponseEntity.badRequest().build()); } @DeleteMapping("/{userId}") public Mono<ResponseEntity<Void>> deleteUserById(@PathVariable Integer userId){ return userService.deleteUser(userId) .map( r -> ResponseEntity.ok().<Void>build()) .defaultIfEmpty(ResponseEntity.notFound().build()); } @GetMapping("/age/{age}") public Flux<User> getUsersByAge(@PathVariable int age) { return userService.findUsersByAge(age); } @PostMapping("/search/id") public Flux<User> fetchUsersByIds(@RequestBody List<Integer> ids) { return userService.fetchUsers(ids); }
Once you have all these methods added you are ready to test out your application
Testing using Postman
Creating New User
To create a new user simply pass in the following info
E.g.
http://localhost:8080/users
{ "name": "Utsav Sharma", "age": 30, "salary": 300.00 }
In turn, you will get the following response on success
{ "id": 2, "name": "Utsav Sharma", "age": 30, "salary": 300.0 }
Getting Users
To get all users, pass this request URL
E.g.
http://localhost:8080/users
You will get the following response in the result
[ { "id": 2, "name": "Utsav Sharma", "age": 30, "salary": 300.0 }, { "id": 3, "name": "Utsav Sharma", "age": 10, "salary": 300.0 } ]
Getting A Specific User
To get a specific user, add the id of the user in the request path:
E.g.
http://localhost:8080/users/2
You will get the following response
{ "id": 2, "name": "Utsav Sharma", "age": 30, "salary": 300.0 }
Updating A Specific User
To update a specific user, pass the
id
in the request path and also pass the updated request payloadE.g.
http://localhost:8080/users/3
{ "id": 3, "name": "Utsav Sharma", "age": 34, "salary": 300.0 }
On success, you will get the updated response
{ "id": 3, "name": "Utsav Sharma", "age": 34, "salary": 300.0 }
Deleting A Specific User
To delete a specific user, simply use the
DELETE
verb and pass theid
in the request pathE.g.
http://localhost:8080/users/1
On success, you won’t be getting any responses.
Getting Users By Age
Though we would have implemented this API using request params, just for sake of clarity I created a new API to retrieve the list of users by a specific age
Pass the subresource URL along with the age that we are looking for.
E.g
http://localhost:8080/users/age/34
In response, you will get all users who are aged
34
[ { "id": 3, "name": "Utsav Sharma", "age": 34, "salary": 300.0 } ]
Search By Id
Finally, this API calls in the parallel flux method and retrieves all users with the ids passed in the payload
Just to keep the example simple and the focus on WebFlux, we have created this API in this way. However, we were implementing this in production we would have made some changes
E.g
http://localhost:8080/users /search/id
["2","3"]
Sample response would look like this
[ { "id": 3, "name": "Utsav Sharma", "age": 34, "salary": 300.0 }, { "id": 2, "name": "Utsav Sharma", "age": 30, "salary": 300.0 } ]
Reactive WebClient
WebClient introduced in Spring 5, is a non-blocking client with support for reactive streams. We can use WebClient to create a client to retrieve data from the endpoints provided by the UserController
Let’s start by setting up the base skeleton
UserTestController
@ExtendWith(SpringExtension.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @DirtiesContext @AutoConfigureWebTestClient @Slf4j class UserControllerTest { @Autowired private WebTestClient webTestClient; @Autowired private UserRepository userRepository; @Autowired private DatabaseClient databaseClient; }
Next, we need to set up our database with some raw test data in it, which would be needed for each test case we execute, we can do that simply by using
@BeforeEach
private List<User> initData(){ return Arrays.asList(new User(null,"Utsav Sharma",30,10000), new User(null,"Anshul Agarwal",5,100000), new User(null,"Jaskaran Singh",40,10000000)); } @BeforeEach public void setup(){ List<String> statements = Arrays.asList("DROP TABLE IF EXISTS users ;", "CREATE TABLE users ( id INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, name VARCHAR(100) NOT NULL, age integer, salary decimal);" ); statements.forEach(it -> databaseClient.sql(it) .fetch() .rowsUpdated() .block()); userRepository.deleteAll() .thenMany(Flux.fromIterable(initData())) .flatMap(userRepository::save) .doOnNext(user ->{ System.out.println("User added: " + user); }) .blockLast(); }
Now that we have our test, we can make use of WebClient to call our APIs which gives back responses in
Flux
andMono
. Below are a few examples to understand how this is done
//Test user creation @Test void createUser(){ User user = new User(null,"Balram Rathore",45,5555555); webTestClient.post().uri("/users").contentType(MediaType.valueOf(MediaType.APPLICATION_JSON_VALUE)) .body(Mono.just(user),User.class) .exchange() .expectStatus().isCreated() .expectBody() .jsonPath("$.id").isNotEmpty() .jsonPath("$.name").isEqualTo("Balram Rathore"); }
//Get all users test @Test void getAllUsersValidateResponse(){ Flux<User> userFlux = webTestClient.get().uri("/users").exchange() .expectStatus().isOk() .expectHeader().contentType(MediaType.APPLICATION_JSON_VALUE) .returnResult(User.class) .getResponseBody(); StepVerifier.create(userFlux.log("Values received")) .expectNextCount(3) .verifyComplete(); }
//Get specific user test @Test void getUserById(){ webTestClient.get().uri("/users".concat("/{userId}"),"1") .exchange().expectStatus().isOk() .expectBody() .jsonPath("$.name","Suman Das"); }
You can find more test cases in the Github repository
Summary
So we just looked at how to use reactive programming with Java. There are even further more complex examples and scenarios where-in we can use reactive programming but I will cover that later. For now, just look back and establish a basic understanding of this programming model and see how you can further use this approach in the work that you do.