paint-brush
Streaming Data From a REST API Using Spring Webfluxby@mad0gre
12,012 reads
12,012 reads

Streaming Data From a REST API Using Spring Webflux

by Victor BoaventuraApril 26th, 2023
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

In this article, we have explored the power of Spring Webflux for handling large volumes of data from REST APIs, using the 4chan public API as our data source. We learned how to fetch, stream, and process data using Spring Webflux, and examined the project structure, dependencies, and various features such as Gradle project configuration, starting and stopping threads, and custom Spring configuration. We also discussed the challenges of testing Webflux applications and provided examples from our project's unit tests to overcome these challenges. We highlighted the importance of handling asynchronous behavior, error handling, and timeouts in testing and provided tips for writing effective tests for Webflux applications. By understanding and applying the concepts presented in this article, you will be better equipped to harness the capabilities of Spring Webflux in your own applications. As a result, you can build efficient, data-driven applications that effectively process and analyze large volumes of data from REST APIs.
featured image - Streaming Data From a REST API Using Spring Webflux
Victor Boaventura HackerNoon profile picture


In today's fast-paced and data-driven world, efficiently handling and processing large volumes of data can be crucial. Spring Webflux, a part of the Spring framework, provides a non-blocking, reactive approach to handling data streams, ensuring low resource consumption and high throughput. This article explores the power of Spring Webflux and its practical applications.


We will use the 4chan public API as our data source in this tutorial. The choice of the 4chan API is not an endorsement of the platform but a practical decision based on its public availability, lack of authentication requirements, and the nature of its data, which is suitable for demonstrating the capabilities of Spring Webflux.


Throughout the article, you will learn how to fetch data from the 4chan API, stream it continuously using Spring Webflux, and process the data to gain insights into the platform's metrics.


The git repository for this guide can be found here.


Dependencies and Project Structure


The 4chan Metrics Collector project is organized into two Gradle submodules: app and api-client. The settings.gradle.kts file is responsible for including these submodules in the project.


The build.gradle.kts file serves as the project's main build configuration. It defines the plugins, Java compatibility, configurations, repositories, and common dependencies for all submodules.


Notable dependencies for this project include:

  • Spring Boot Actuator
  • Spring Boot WebFlux
  • Spring Boot AOP
  • Lombok
  • Micrometer Prometheus Registry


The app submodule, configured in app/build.gradle.kts, has a dependency on the api-client submodule. The app submodule is a Spring Boot application responsible for running the metrics collector. The api-client submodule, configured in api-client/build.gradle.kts, is a library containing the implementation of the API client used by the app submodule to access 4chan public API.


While the primary focus of this article is on using Spring Webflux for streaming data from the 4chan API, the project also demonstrates several other valuable features, such as:


  • Gradle project with Kotlin-based configuration
  • Starting and stopping threads at application startup and shutdown
  • Custom Spring configuration
  • Using Docker Compose to start the application, Prometheus, and Grafana together
  • Publishing metrics to Prometheus
  • Using Grafana to graph published metrics


Fetching, Streaming, and Processing Data with Spring Webflux

In this section, we will explore how the application fetches data from the 4chan API, streams the data using Spring Webflux, and processes it to collect and publish metrics.

Overview

The process starts with the MetricsCollector class, which schedules periodic requests to the 4chan API based on the configured boards in the application.yml file. For each board, a Flux is created to emit data, and these Flux instances are merged into a single Flux that the application subscribes to. As the data streams through the subscription, the application publishes metrics to Prometheus.


The diagram below describes the flow of the application - Each configured board continuously emits messages containing metadata about their active threads obtained from the 4chan public API, those streams are merged into a single continuous stream for all active threads across the configured boards, and as each message is processed in the subscription, metrics are published to Prometheus.




Create Flux and Subscribe to Data Stream

@PostConstruct
public void produce() {
  boards.stream()
      .map(board -> Flux.create(emit(board)))
      .reduce(Flux::merge)
      .orElse(Flux.empty())
      .flatMap(Function.identity())
      .doOnNext(this::publishThreadMetrics)
      .subscribe();
}


The entry point of the application is the produce method in the MetricsCollector class. Thanks to the @PostConstruct annotation, this method executes when the application starts up. In this method, all configured boards are streamed and each is mapped into a Flux responsible for emitting active thread data for the board.


At this point, we have a stream of separated Flux objects, that is then reduced into a single Flux containing all active threads for all configured boards. Finally, it subscribes to start the process of data consumption, and as each next element goes through the stream, thread metrics are published accordingly.


Emitting Elements to the Flux

The emit method in the MetricsCollector class configures a scheduler to execute requests periodically:


private Consumer<FluxSink<Flux<FourChanThread>>> emit(final String board) {
  return emitter -> scheduler.scheduleAtFixedRate(
      execute(emitter, board),
      random.nextInt(REQUEST_DELAY_SECONDS),
      REQUEST_INTERVAL_SECONDS,
      TimeUnit.SECONDS);
}


The random delay ensures that not all threads issue requests to the 4chan API simultaneously.


The execute method invokes the ApiClient's getThreads method to fetch data from the 4chan API:


private Runnable execute(final FluxSink<Flux<FourChanThread>> emitter, final String board) {
  return () -> {
    emitter.next(apiClient.getThreads(board));
  };
}


Fetching Data from a Public API

The ApiClient class uses a WebClient instance to make reactive HTTP requests to the 4chan Public API. The getThreads method returns a Flux<FourChanThread> that represents the data stream of active threads for a specific board.


Each call to the threads endpoint returns a list of pages, and each page contains a list of threads. The call to bodyToMono parses the response JSON into a Publisher of a single list of pages. The subsequent mapping calls are used to unpack the threads from the lists that contain them, generating a Flux of threads. The last mapping is used to enrich the thread metadata with the specific board to which the thread belongs.


public Flux<FourChanThread> getThreads(String board) {
  log.debug("Issuing request to get threads for /{} board", board);

  Endpoint endpoint = Endpoint.THREADS; // Endpoint is an enum that defines the API endpoints and is used to build the endpoint path
  ParameterizedTypeReference<List<FourChanThreadList>> typeReference =
      new ParameterizedTypeReference<>() {
      };

  return webClient.get()
      .uri(endpoint.getEndpoint(board))
      .retrieve()
      .bodyToMono(typeReference)
      .timeout(TIMEOUT)
      .doOnError(throwable -> log.warn("Error when issuing request to get threads for /{} board", board, throwable))
      .flatMapMany(Flux::fromIterable)
      .flatMap(threadList -> Flux.fromIterable(threadList.getThreads()))
      .flatMap(thread -> Mono.just(thread.withBoard(board)));
}



Testing Webflux Applications

Testing Webflux applications can be quite challenging due to the asynchronous and non-blocking nature of reactive programming. However, with the right approach and tools, you can write effective tests that ensure the proper functionality and performance of your application. In this section, we will discuss the challenges of testing Webflux applications, use our project's unit tests as examples, and provide tips for writing effective tests.


Challenges to Testing Webflux Applications

Some of the challenges when testing Webflux applications include:


  1. Asynchronous behavior: Since Webflux applications are built using reactive programming, handling asynchronous behavior in tests can be tricky. You need to ensure that your test cases properly account for the asynchronous nature of the code they're testing.
  2. Error handling: Reactive applications can have complex error-handling scenarios. It's essential to test various error conditions to ensure that your application gracefully handles failures.
  3. Timeouts: Timeouts are a common concern in Webflux applications, and testing how your application handles them is crucial for ensuring reliable operation.
  4. Web Client: It is complicated to mock the reactive WebClient used to make HTTP calls. Without proper care, it is easy to get tangled in a complex structure of methods that need to be stubbed and subsequent objects that need to be mocked.

Unit Testing

In our project, we have two test classes, MetricsCollectorTest and ApiClientTest, that cover different aspects of the application's functionality. These tests serve as examples of how to handle the challenges mentioned above:


Asynchronous behavior

The MetricsCollectorTest class tests the asynchronous behavior of the produce() method using Thread.sleep() to wait for the expected execution time before making assertions.


metricsCollector.produce();
Thread.sleep(TimeUnit.SECONDS.toMillis(REQUEST_DELAY_SECONDS) + 1L);


The ApiClientTest class uses the StepVerifier from the reactor-test library to ensure the expected behavior of the getThreads() method.


Flux<FourChanThread> result = apiClient.getThreads(board);
StepVerifier.create(result)
    .expectNext(...)
    .verifyComplete();


Error handling

The ApiClientTest class tests the error handling of the getThreads() method by simulating a missing board and ensuring that a WebClientResponseException is thrown.


Flux<FourChanThread> result = apiClient.getThreads(board);
StepVerifier.create(result)
    .expectErrorMatches(throwable -> throwable instanceof WebClientResponseException)
    .verify();


Timeouts

The ApiClientTest class tests the timeout handling of the getThreads() method by simulating a delayed response, ensuring that a TimeoutException is thrown.


Flux<FourChanThread> result = apiClient.getThreads(board);
StepVerifier.create(result)
    .expectErrorMatches(throwable -> throwable instanceof TimeoutException)
    .verify();


WebClient

In the ApiClientTest class, we inject a tailored WebClient in the ApiClient instance being tested with a specified Exchange Function. This allows us to control the behavior of the WebClient and create test cases suitable for different scenarios.


private WebClient createWebClient(HttpStatus httpStatus, Duration delay, String body, String contentType) {
  ClientResponse clientResponse = ClientResponse.create(httpStatus, ExchangeStrategies.withDefaults())
      .header(HttpHeaders.CONTENT_TYPE, contentType)
      .body(body)
      .build();

  ExchangeFunction exchangeFunction = request -> Mono
      .just(clientResponse)
      .delayElement(delay);

  return WebClient.builder()
      .exchangeFunction(exchangeFunction)
      .build();
}


Conclusion

In this article, we have explored the power of Spring Webflux for handling large volumes of data from REST APIs, using the 4chan public API as our data source. We learned how to fetch, stream, and process data using Spring Webflux, and examined the project structure, dependencies, and various features such as Gradle project configuration, starting and stopping threads, and custom Spring configuration.


We also discussed the challenges of testing Webflux applications and provided examples from our project's unit tests to overcome these challenges. We highlighted the importance of handling asynchronous behavior, error handling, and timeouts in testing and provided tips for writing effective tests for Webflux applications.


By understanding and applying the concepts presented in this article, you will be better equipped to harness the capabilities of Spring Webflux in your own applications. As a result, you can build efficient, data-driven applications that effectively process and analyze large volumes of data from REST APIs.



Featured image by Snow White