Techwondoe

Reactive Programming - Part 2.

R.P Part2
29 Nov 2022

Just to summarise in the first part of this blog series these are the following topics we touched upon

  • Introduction to reactive programming

  • Difference between reactive programming vs multi-threaded applications

  • Different core reactive programming libraries used in Java

Apart from this we also made a start on writing a sample application in Java using the concepts of reactive programming. I strongly advise if you haven't read part 1 of this series; stop here and go read that first and then come back.

Continuing with our sample application

In the last blog we did the initial project setup and also the database setup for our User Crud service. Now let's setup the different layers of MVC

Setting up the model

  • As we are using Lombok a lot of boilerplate code will get auto-generated when we use the right annotations, let us first set up our user model as below

@Data
@AllArgsConstructor
@NoArgsConstructor
@Table("users")
public class User {
    @Id
    private Integer id;
    private String name;
    private int age;
    private double salary;
}

Repository Layer

  • Next, to perform all database operations we need to create a repository layer that will take care of interactions with the underlying DB

  • If you are familiar with Spring Data you would know that we just need to create an interface and extend CrudRepository or PagingAndSortingRepository and then we would be able to interact with the table with basic functions already implemented like findAll, save etc.

  • With reactive programming, there is one slight difference. Instead of extending the above two repositories, we extend the ReactiveCrudRepository and instead of the return type being List or Optional, use Flux and Mono publishers.

  • With usual CrudRepository

public interface UserRepository extends CrudRepository<User,Integer> {
    List<User> findUsersByAge(int age);
}
  • With Reactive programming

public interface UserRepository extends ReactiveCrudRepository<User,Integer> {
    Flux<User> findUsersByAge(int age);
}

Service Layer

  • Anyone familiar with Java would know that the Repository layer takes care of interaction with the database, whereas the service layer takes care of business logic

  • So let’s go ahead and implement a simple UserService that would contain any additional logic if needed by the user

  • Let’s first auto-wire UserRepository in the service layer

private final UserRepository userRepository;
public UserService(UserRepository userRepository) {
    this.userRepository = userRepository;
}
  • Next, let’s go ahead and implement basic Crud operation i.e Create, Read, Update, Delete as show below

public Mono<User> createUser(User user){
    return userRepository.save(user);
}
public Flux<User> getAllUsers(){
    return userRepository.findAll();
}
public Mono<User> findById(Integer userId){
    return userRepository.findById(userId);
}
public Mono<User> updateUser(Integer userId,  User user){
    return userRepository.findById(userId)
            .flatMap(dbUser -> {
                dbUser.setAge(user.getAge());
                dbUser.setSalary(user.getSalary());
                return userRepository.save(dbUser);
        });
}
public Mono<User> deleteUser(Integer userId){
    return userRepository.findById(userId)
            .flatMap(existingUser -> userRepository.delete(existingUser)
                    .then(Mono.just(existingUser)));
}
  • You may have noticed where ever I want to return 0..1 results I have used mono and where ever I want to return 0..n results I have used Flux.

  • Let’s also add the custom method we implemented in the repository layer

public Flux<User> findUsersByAge(int age){
    return userRepository.findUsersByAge(age);
}
  • There is no business logic required for this example, hence this all looks pretty straightforward but if at all we need to add some sort of business logic this is the layer where you should be adding it, as part of best practices.

  • Next, let’s implement one more method and discuss that in a bit detail

public Flux<User> fetchUsers(List<Integer> userIds) {
    return Flux.fromIterable(userIds)
            .parallel()
            .runOn(Schedulers.boundedElastic())
            .flatMap(this::findById)
            .ordered((u1, u2) -> u2.getId() - u1.getId());
}
  • So what did we do here?

    • We are trying to fetch the details of multiple users simultaneously and return the result as a list of users.

    • First, we created a Flux from list of userIds and called the parallel method to indicate parallel execution using ParallelFlux

    • Next, we decided a runOn config for schedulers, in this case, we have used boundedElastic which is considered good for long-lived tasks and spawns thread on-demand with a ceiling on the number of created threads. Which is basically the only difference between elastic and boundedElastic

    • You can find more details here to decide which config would work best for your use case here

    • Next, we invoke flatMap to run the findById method which returns a ParallelFlux and then finally to convert from ParallelFlux to a regular Flux we have added an ordered method with a custom comparator.

  • Now that we have added all these methods our basic service layer is now ready to hook up with the controller

Controller Layer

  • Let’s get started by setting up a base controller and auto-wiring our service

@RestController
@RequestMapping("/users")
public class UserController {
    private final UserService userService;
    public UserController(UserService userService) {
        this.userService = userService;
    }
}
  • Now let’s call out all the methods we added to our service layer so that when a user makes a call the required action is sent to our service layer, which in turn sends it to our repository and that in turn sends it to our database

@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<User> create(@RequestBody User user){
    return userService.createUser(user);
}

@GetMapping
public Flux<User> getAllUsers(){
    return userService.getAllUsers();
}

@GetMapping("/{userId}")
public Mono<ResponseEntity<User>> getUserById(@PathVariable Integer userId){
    Mono<User> user = userService.findById(userId);
    return user.map(ResponseEntity::ok)
            .defaultIfEmpty(ResponseEntity.notFound().build());
}

@PutMapping("/{userId}")
public Mono<ResponseEntity<User>> updateUserById(@PathVariable Integer userId, @RequestBody User user){
    return userService.updateUser(userId,user)
        .map(ResponseEntity::ok)
        .defaultIfEmpty(ResponseEntity.badRequest().build());
}

@DeleteMapping("/{userId}")
public Mono<ResponseEntity<Void>> deleteUserById(@PathVariable Integer userId){
    return userService.deleteUser(userId)
        .map( r -> ResponseEntity.ok().<Void>build())
        .defaultIfEmpty(ResponseEntity.notFound().build());
}

@GetMapping("/age/{age}")
public Flux<User> getUsersByAge(@PathVariable int age) {
    return userService.findUsersByAge(age);
}

@PostMapping("/search/id")
public Flux<User> fetchUsersByIds(@RequestBody List<Integer> ids) {
    return userService.fetchUsers(ids);
}
  • Once you have all these methods added you are ready to test out your application

Testing using Postman

Creating New User
  • To create a new user simply pass in the following info

  • E.g. http://localhost:8080/users

{
    "name": "Utsav Sharma",
    "age": 30,
    "salary": 300.00
}
  • In turn, you will get the following response on success

{
    "id": 2,
    "name": "Utsav Sharma",
    "age": 30,
    "salary": 300.0
}
Getting Users
  • To get all users, pass this request URL

  • E.g. http://localhost:8080/users

  • You will get the following response in the result

[
    {
        "id": 2,
        "name": "Utsav Sharma",
        "age": 30,
        "salary": 300.0
    },
    {
        "id": 3,
        "name": "Utsav Sharma",
        "age": 10,
        "salary": 300.0
    }
]
Getting A Specific User
  • To get a specific user, add the id of the user in the request path:

  • E.g. http://localhost:8080/users/2

  • You will get the following response

{
    "id": 2,
    "name": "Utsav Sharma",
    "age": 30,
    "salary": 300.0
}
Updating A Specific User
  • To update a specific user, pass the id in the request path and also pass the updated request payload

  • E.g. http://localhost:8080/users/3

{
    "id": 3,
    "name": "Utsav Sharma",
    "age": 34,
    "salary": 300.0
}
  • On success, you will get the updated response

{
    "id": 3,
    "name": "Utsav Sharma",
    "age": 34,
    "salary": 300.0
}
Deleting A Specific User
  • To delete a specific user, simply use the DELETE verb and pass the id in the request path

  • E.g. http://localhost:8080/users/1

  • On success, you won’t be getting any responses.

Getting Users By Age
  • Though we would have implemented this API using request params, just for sake of clarity I created a new API to retrieve the list of users by a specific age

  • Pass the subresource URL along with the age that we are looking for.

  • E.g http://localhost:8080/users/age/34

  • In response, you will get all users who are aged 34

[
    {
        "id": 3,
        "name": "Utsav Sharma",
        "age": 34,
        "salary": 300.0
    }
]
Search By Id
  • Finally, this API calls in the parallel flux method and retrieves all users with the ids passed in the payload

  • Just to keep the example simple and the focus on WebFlux, we have created this API in this way. However, we were implementing this in production we would have made some changes

  • E.g http://localhost:8080/users /search/id

["2","3"]
  • Sample response would look like this

[
    {
        "id": 3,
        "name": "Utsav Sharma",
        "age": 34,
        "salary": 300.0
    },
    {
        "id": 2,
        "name": "Utsav Sharma",
        "age": 30,
        "salary": 300.0
    }
]

Reactive WebClient

WebClient introduced in Spring 5, is a non-blocking client with support for reactive streams. We can use WebClient to create a client to retrieve data from the endpoints provided by the UserController

  • Let’s start by setting up the base skeleton UserTestController

@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
@AutoConfigureWebTestClient
@Slf4j
class UserControllerTest {
    @Autowired
    private WebTestClient webTestClient;

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private DatabaseClient databaseClient;
}
  • Next, we need to set up our database with some raw test data in it, which would be needed for each test case we execute, we can do that simply by using @BeforeEach

private List<User> initData(){
    return Arrays.asList(new User(null,"Utsav Sharma",30,10000),
        new User(null,"Anshul Agarwal",5,100000),
        new User(null,"Jaskaran Singh",40,10000000));
}

@BeforeEach
public  void setup(){
    List<String> statements = Arrays.asList("DROP TABLE IF EXISTS users ;",
        "CREATE TABLE users ( 
            id INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, 
            name VARCHAR(100) NOT NULL, 
            age integer,
            salary decimal);"
        );

    statements.forEach(it -> databaseClient.sql(it)
            .fetch()
            .rowsUpdated()
            .block());

    userRepository.deleteAll()
            .thenMany(Flux.fromIterable(initData()))
            .flatMap(userRepository::save)
            .doOnNext(user ->{
                System.out.println("User added: " + user);
            })
            .blockLast();
    }
  • Now that we have our test, we can make use of WebClient to call our APIs which gives back responses in Flux and Mono. Below are a few examples to understand how this is done

//Test user creation
@Test
void createUser(){
      User user = new User(null,"Balram Rathore",45,5555555);
      webTestClient.post().uri("/users").contentType(MediaType.valueOf(MediaType.APPLICATION_JSON_VALUE))
    .body(Mono.just(user),User.class)
    .exchange()
    .expectStatus().isCreated()
    .expectBody()
    .jsonPath("$.id").isNotEmpty()
    .jsonPath("$.name").isEqualTo("Balram Rathore");
}
//Get all users test
@Test
    void getAllUsersValidateResponse(){
        Flux<User> userFlux = webTestClient.get().uri("/users").exchange()
                .expectStatus().isOk()
                .expectHeader().contentType(MediaType.APPLICATION_JSON_VALUE)
                .returnResult(User.class)
                .getResponseBody();
        StepVerifier.create(userFlux.log("Values received"))
                .expectNextCount(3)
                .verifyComplete();

    }
//Get specific user test
  @Test
    void getUserById(){
        webTestClient.get().uri("/users".concat("/{userId}"),"1")
                .exchange().expectStatus().isOk()
                .expectBody()
                .jsonPath("$.name","Suman Das");
    }
  • You can find more test cases in the Github repository

Summary

So we just looked at how to use reactive programming with Java. There are even further more complex examples and scenarios where-in we can use reactive programming but I will cover that later. For now, just look back and establish a basic understanding of this programming model and see how you can further use this approach in the work that you do.

References