
Understanding Reactive Programming with Spring WebFlux: Benefits and Use Cases
Reactive programming has become increasingly important in modern backend development, especially when dealing with high-concurrency applications and I/O-intensive operations. Spring WebFlux brings reactive programming to the Spring ecosystem, offering a non-blocking alternative to traditional servlet-based applications.
What is Reactive Programming?
Reactive programming is a programming paradigm that deals with asynchronous data streams and the propagation of changes. Instead of blocking threads while waiting for I/O operations, reactive systems process data as it becomes available, making efficient use of system resources.
Key Concepts
Asynchronous and Non-blocking Traditional blocking I/O keeps threads waiting:
// Blocking approach
public User getUserById(Long id) {
// Thread waits here until database responds
return userRepository.findById(id);
}
Reactive approach frees up threads:
// Reactive approach
public Mono<User> getUserById(Long id) {
// Thread is freed immediately, result comes later
return userRepository.findById(id);
}
Back Pressure Reactive systems can handle situations where data producers are faster than consumers:
@Service
public class UserService {
public Flux<User> getAllUsers() {
return userRepository.findAll()
.limitRate(100) // Process max 100 items at a time
.delayElements(Duration.ofMillis(10)); // Add small delay
}
}
Spring WebFlux vs. Spring MVC
When to Choose WebFlux
WebFlux is ideal for:
- High-concurrency applications (thousands of simultaneous connections)
- I/O-intensive operations (database queries, external API calls)
- Streaming data applications
- Microservices with many external dependencies
Stick with MVC for:
- CPU-intensive operations
- Blocking dependencies that can’t be made reactive
- Teams unfamiliar with reactive programming
- Simple CRUD applications with low concurrency
Performance Comparison
// MVC Controller (blocking)
@RestController
public class UserController {
@GetMapping("/users/{id}")
public ResponseEntity<User> getUser(@PathVariable Long id) {
User user = userService.findById(id); // Blocks thread
return ResponseEntity.ok(user);
}
}
// WebFlux Controller (non-blocking)
@RestController
public class ReactiveUserController {
@GetMapping("/users/{id}")
public Mono<ResponseEntity<User>> getUser(@PathVariable Long id) {
return userService.findById(id)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
}
Core Reactive Types in WebFlux
Mono<T> - Single Value or Empty
Mono represents 0 or 1 element:
@Service
public class UserService {
public Mono<User> createUser(User user) {
return userRepository.save(user)
.doOnSuccess(savedUser ->
log.info("User created: {}", savedUser.getId()))
.doOnError(error ->
log.error("Failed to create user", error));
}
public Mono<User> getUserByEmail(String email) {
return userRepository.findByEmail(email)
.switchIfEmpty(Mono.error(
new UserNotFoundException("User not found: " + email)));
}
}
Flux<T> - Multiple Values
Flux represents 0 to N elements:
@Service
public class UserService {
public Flux<User> getUsersByDepartment(String department) {
return userRepository.findByDepartment(department)
.filter(user -> user.isActive())
.map(this::enrichUserData)
.sort(Comparator.comparing(User::getLastName));
}
public Flux<UserDto> streamUsers() {
return userRepository.findAll()
.map(this::convertToDto)
.delayElements(Duration.ofMillis(100)); // Simulate streaming
}
}
Practical Implementation Examples
1. Reactive REST API
@RestController
@RequestMapping("/api/users")
public class ReactiveUserController {
private final UserService userService;
@GetMapping
public Flux<UserDto> getAllUsers(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size) {
return userService.findAllUsers(page, size)
.map(this::toDto);
}
@GetMapping("/{id}")
public Mono<ResponseEntity<UserDto>> getUser(@PathVariable Long id) {
return userService.findById(id)
.map(this::toDto)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@PostMapping
public Mono<ResponseEntity<UserDto>> createUser(
@Valid @RequestBody CreateUserRequest request) {
return userService.createUser(request)
.map(this::toDto)
.map(user -> ResponseEntity.status(HttpStatus.CREATED).body(user))
.onErrorResume(ValidationException.class,
ex -> Mono.just(ResponseEntity.badRequest().build()));
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<UserDto> streamUsers() {
return userService.streamAllUsers()
.map(this::toDto);
}
}
2. Reactive Database Access with R2DBC
@Repository
public class ReactiveUserRepository {
private final R2dbcEntityTemplate template;
public Mono<User> save(User user) {
return template.insert(user);
}
public Mono<User> findById(Long id) {
return template.selectOne(
Query.query(where("id").is(id)),
User.class
);
}
public Flux<User> findByDepartment(String department) {
return template.select(
Query.query(where("department").is(department)),
User.class
);
}
public Mono<Long> countActiveUsers() {
return template.count(
Query.query(where("active").is(true)),
User.class
);
}
}
3. Combining Multiple Reactive Streams
@Service
public class UserAggregationService {
public Mono<UserProfileDto> getUserProfile(Long userId) {
Mono<User> userMono = userService.findById(userId);
Mono<List<Order>> ordersMono = orderService.findByUserId(userId);
Mono<UserPreferences> preferencesMono =
preferencesService.findByUserId(userId);
return Mono.zip(userMono, ordersMono, preferencesMono)
.map(tuple -> UserProfileDto.builder()
.user(tuple.getT1())
.orders(tuple.getT2())
.preferences(tuple.getT3())
.build());
}
public Flux<UserActivity> getUserActivityStream(Long userId) {
Flux<Order> orders = orderService.findByUserIdStream(userId);
Flux<LoginEvent> logins = loginService.findByUserIdStream(userId);
return Flux.merge(
orders.map(order -> UserActivity.fromOrder(order)),
logins.map(login -> UserActivity.fromLogin(login))
).sort(Comparator.comparing(UserActivity::getTimestamp));
}
}
4. Error Handling in Reactive Streams
@Service
public class RobustUserService {
public Mono<User> findUserWithFallback(Long id) {
return primaryUserService.findById(id)
.timeout(Duration.ofSeconds(3))
.onErrorResume(TimeoutException.class,
ex -> secondaryUserService.findById(id))
.onErrorResume(Exception.class,
ex -> cacheService.getCachedUser(id))
.switchIfEmpty(Mono.error(
new UserNotFoundException("User not found: " + id)));
}
public Mono<User> createUserWithRetry(User user) {
return userRepository.save(user)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(throwable -> throwable instanceof DataIntegrityViolationException)
.doBeforeRetry(signal ->
log.warn("Retrying user creation, attempt: {}",
signal.totalRetries() + 1)));
}
}
Testing Reactive Applications
Unit Testing with StepVerifier
@ExtendWith(SpringExtension.class)
class UserServiceTest {
@Mock
private UserRepository userRepository;
@InjectMocks
private UserService userService;
@Test
void shouldReturnUserWhenFound() {
// Given
User user = new User(1L, "john@example.com", "John Doe");
when(userRepository.findById(1L)).thenReturn(Mono.just(user));
// When & Then
StepVerifier.create(userService.findById(1L))
.expectNext(user)
.verifyComplete();
}
@Test
void shouldReturnEmptyWhenUserNotFound() {
// Given
when(userRepository.findById(999L)).thenReturn(Mono.empty());
// When & Then
StepVerifier.create(userService.findById(999L))
.verifyComplete();
}
@Test
void shouldHandleMultipleUsers() {
// Given
List<User> users = Arrays.asList(
new User(1L, "john@example.com", "John"),
new User(2L, "jane@example.com", "Jane")
);
when(userRepository.findAll()).thenReturn(Flux.fromIterable(users));
// When & Then
StepVerifier.create(userService.findAll())
.expectNext(users.get(0))
.expectNext(users.get(1))
.verifyComplete();
}
}
Integration Testing
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestPropertySource(properties = "spring.r2dbc.url=r2dbc:h2:mem:///testdb")
class UserControllerIntegrationTest {
@Autowired
private WebTestClient webTestClient;
@Autowired
private UserRepository userRepository;
@Test
void shouldCreateUser() {
CreateUserRequest request = new CreateUserRequest("john@example.com", "John Doe");
webTestClient.post()
.uri("/api/users")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(request)
.exchange()
.expectStatus().isCreated()
.expectBody(UserDto.class)
.value(user -> {
assertThat(user.getEmail()).isEqualTo("john@example.com");
assertThat(user.getName()).isEqualTo("John Doe");
});
}
@Test
void shouldStreamUsers() {
// Given
userRepository.saveAll(Arrays.asList(
new User("user1@example.com", "User 1"),
new User("user2@example.com", "User 2")
)).blockLast();
// When & Then
webTestClient.get()
.uri("/api/users/stream")
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.expectStatus().isOk()
.returnResult(UserDto.class)
.getResponseBody()
.take(2)
.as(StepVerifier::create)
.expectNextCount(2)
.verifyComplete();
}
}
Performance Considerations and Best Practices
1. Thread Pool Configuration
# application.yml
spring:
reactor:
netty:
pool:
max-connections: 1000
max-idle-time: 30s
r2dbc:
pool:
initial-size: 10
max-size: 50
max-idle-time: 30m
2. Avoid Blocking Operations
// ❌ Bad: Blocking in reactive stream
public Mono<User> processUser(User user) {
return Mono.just(user)
.map(u -> {
// This blocks the reactive thread!
Thread.sleep(1000);
return u;
});
}
// ✅ Good: Use reactive alternatives
public Mono<User> processUser(User user) {
return Mono.just(user)
.delayElement(Duration.ofSeconds(1))
.map(u -> {
// Non-blocking transformation
return enrichUser(u);
});
}
3. Memory Management
@Service
public class OptimizedUserService {
public Flux<UserDto> getAllUsersOptimized() {
return userRepository.findAll()
.buffer(100) // Process in batches
.flatMap(users ->
Flux.fromIterable(users)
.map(this::toDto)
.subscribeOn(Schedulers.parallel())
);
}
public Flux<User> streamLargeDataset() {
return userRepository.findAll()
.limitRate(50) // Control backpressure
.onBackpressureBuffer(1000) // Buffer up to 1000 items
.share(); // Share among multiple subscribers
}
}
Common Pitfalls and How to Avoid Them
1. Blocking in Reactive Chains
// ❌ Wrong
public Mono<String> processData(String data) {
return webClient.get()
.retrieve()
.bodyToMono(String.class)
.map(result -> {
// This blocks!
return blockingService.process(result);
});
}
// ✅ Correct
public Mono<String> processData(String data) {
return webClient.get()
.retrieve()
.bodyToMono(String.class)
.flatMap(result ->
reactiveService.process(result)
);
}
2. Not Subscribing to Reactive Streams
// ❌ Wrong: Nothing happens without subscription
public void updateUser(User user) {
userRepository.save(user); // This does nothing!
}
// ✅ Correct: Subscribe to execute
public Mono<User> updateUser(User user) {
return userRepository.save(user); // Return Mono for subscription
}
3. Improper Error Handling
// ❌ Wrong: Errors kill the stream
public Flux<User> processUsers() {
return userRepository.findAll()
.map(user -> {
if (user.isInvalid()) {
throw new IllegalArgumentException("Invalid user");
}
return user;
});
}
// ✅ Correct: Handle errors gracefully
public Flux<User> processUsers() {
return userRepository.findAll()
.flatMap(user -> {
if (user.isInvalid()) {
return Mono.empty(); // Skip invalid users
}
return Mono.just(user);
})
.onErrorContinue((error, user) ->
log.warn("Skipping invalid user: {}", user, error));
}
Real-World Use Cases
1. High-Frequency Trading System
@Service
public class TradingService {
public Flux<Trade> streamTrades() {
return marketDataService.getPriceStream()
.buffer(Duration.ofMillis(100))
.flatMap(prices ->
tradingAlgorithm.generateTrades(prices))
.flatMap(trade ->
tradeExecutor.execute(trade))
.share();
}
}
2. Real-Time Chat Application
@Controller
public class ChatController {
@MessageMapping("/chat")
public Flux<ChatMessage> handleChat(Flux<ChatMessage> messages) {
return messages
.flatMap(message ->
chatService.processMessage(message))
.share();
}
}
3. IoT Data Processing
@Service
public class IoTDataProcessor {
public Flux<ProcessedData> processIoTStream() {
return sensorDataRepository.streamData()
.window(Duration.ofMinutes(1))
.flatMap(window ->
window.reduce(this::aggregateData))
.flatMap(aggregated ->
anomalyDetector.detect(aggregated));
}
}
Conclusion
Spring WebFlux and reactive programming offer powerful tools for building high-performance, scalable applications. While the learning curve can be steep, the benefits in terms of resource utilization and scalability are significant for the right use cases.
Key takeaways:
- Use reactive programming for I/O-intensive, high-concurrency applications
- Master Mono and Flux operations for effective stream processing
- Avoid blocking operations in reactive chains
- Test thoroughly with StepVerifier and WebTestClient
- Monitor performance and tune thread pools appropriately
When to consider WebFlux:
- Building microservices with many external dependencies
- Processing streaming data
- Creating real-time applications
- Scaling to thousands of concurrent connections
Start small with reactive programming - convert one service at a time and measure the performance impact. With proper understanding and implementation, reactive programming can significantly improve your application’s scalability and responsiveness.
Ready to go reactive? Start by identifying the most I/O-intensive parts of your current application and consider converting them to reactive patterns. The investment in learning will pay dividends in application performance and scalability.
Share this article
Get more insights delivered to your inbox
Join the discussion
Comments coming soon...