Most modern applications during processing, or handling the request, use one or more external resources. By external resources I mean other services exposing endpoints, like REST for instance, or database engines for storing data. The type of external resource doesn't really matter here. The truth is that application needs to communicate with them through the network to obtain the necessary data to perform computations and respond to the client.
The above diagram shows how such a communication may look like. We can see that to be able to fulfill the single request, the application requires to communicate with PostgreSQL [2], then with Cassandra [4], and with some other service through its REST endpoint [6]. It needs all 3 responses ([3], [5], and [7]) with data before it can return its own response to the client. We have to point out that the flow is sequential. What it means, we wait for the response from the first external service before we query others. This, in turn, leads to a long execution time of the request from the client's perspective.
The remedy in such a situation is, instead of querying services one by one, to run all queries at once and wait for the data at the point where it is really needed. This requires a mechanism that allows returning an object that will promise to hold queried data, once the query is fulfilled. Thankfully, JAVA already comes with a solution and provides the Future in the core library.
The above diagram looks deceptively the same as the previous one, but there are significant differences. Calling remote services ([2], [4], and [6]) becomes asynchronous and non-blocking. Instead of results, Futures are returned immediately, which will hold results eventually.
What is the Future?
A Future
represents the result of an asynchronous computation. It provides methods to:
check if the computation is complete,
wait for its completion, and
retrieve the result of the computation.
Here is the interface of a Future that can be found in the core JAVA library.
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future (Java Platform SE 8 )
In principle, the Future is just kind of a wrapper over the expected result. The result can only be retrieved using the method get()
when the computation has been completed, and if not, it blocks until the computation is done.
Having now a bit of understanding of the Future, let's try to define a simple interface of REST service that will provide asynchronous capabilities in our application.
interface RestService {
Future<Results> execute();
}
RestService
is a simple interface, having one method execute()
that returns Future
. The Future
returns results once the computation is completed. Behind this interface all magic needed to make REST call is hidden. But how to make it asynchronous anyways? The answer is straightforward - delegate computations to a subsequent worker thread. The easiest way to achieve it is to use a thread pool, like ExecutorService.
ExecutorService (Java Platform SE 8 )
Here is a naive implementation of the RestService interface, just to show how to submit tasks into the pool and obtain the Future.
public class SimpleRestService implements RestService {
private static final ExecutorService POOL = Executors.newFixedThreadPool(1);
@Override
public Future<Results> execute() {
return POOL.submit(this::callRestEndpoint);
}
private Results callRestEndpoint() throws InterruptedException {
TimeUnit.SECONDS.sleep(5);
return new Results();
}
}
The real implementation of such a service would need to make a call through the network, which makes it vulnerable to network outages or temporal unavailability of remote hosts. Therefore, our application has to be prepared for handling corner case situations, when computation ends with an exception, and calling the get()
method delivers the exception to the main flow.
private void processResponses(Future<Results> future) {
try {
Results results = future.get();
compute(results);
} catch (RuntimeException e) {
// maybe retry?
...
}
When we encounter such a situation, likely we want to retry. Here we have one Future only, but what if we have multiple services returning multiple Futures. The code starts getting complex and cumbersome. But maybe there is another way around?
Possibly we can try decorating Futures
, so they know whether they need to retry an operation. Yet there is another helpful library for that in JAVA called Google's Guava. It delivers futures that can be decorated or chained - called ListenableFuture
.
ListenableFuture
A ListenableFuture
allows you to register callbacks to be executed once the computation is complete, or if the computation is already complete, immediately. This simple addition makes it possible to efficiently support many operations that the basic Future
interface cannot support.
ListenableFutureExplained
The basic operation added by ListenableFuture
is addListener(Runnable, Executor)
, which specifies that when the computation represented by this Future
is done, the specified Runnable
will be run on the specified Executor
.
The main purpose of ListenableFuture
is to help chain together a graph of asynchronous operations.
Now, let's slightly modify the interface of the REST service to return ListenableFuture
instead.
interface RestService {
ListenableFuture<Results> execute();
}
We need to change ExecutorService
as well, so now the improved implementation of the REST service is as follows.
public class SimpleRestService implements RestService {
private static final ListeningExecutorService POOL =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
@Override
public ListenableFuture<Results> execute() {
return POOL.submit(this::callRestEndpoint);
}
Having ListenableFutures
and ListeningExecutorService
in place, now we can start thinking of a retry mechanism.
Retrying an operation
Modern software systems are mostly distributed and usually cloud-based. So there is no single call over the network, but rather dozens of them. To be resilient and highly available they need to implement mechanisms that will help in recovering when a network failure occurs. One such mechanism is to retry an operation.
In our particular case Futures.catchingAsync(...)
can be really helpful.
Futures (Guava: Google Core Libraries for Java 20.0 API)
So how can we achieve a retry mechanism for RestService
? Answer is easy, let's decorate it and internally use catchingAsync(...)
. The retry mechanism will be characterized by the following:
a maximum number of retries - once it reaches a maximum, retrying is stopped, so we don't fall into an infinite loop,
back-off time - time that needs to pass before executing the next retry,
starting back-off time - an initial value,
back-off factor - a factor that influences how back-off time increases, for instance, in the case of factor 2 and the initial value of 1, we will get the following series: 1, 2, and 4 milliseconds.
Now, let's see what the potential implementation could look like.
public class RetryableRestService implements RestService {
private static final ListeningExecutorService POOL = ThreadPool.newFixedPool();
private final RestService delegate;
private static final int STARTING_BACKOFF = 1;
private static final int BACKOFF_FACTOR = 2;
private static final int MAX_BACKOFF = 10;
private static final int NUM_OF_RETRIES = 3;
RetryableRestService(RestService service) {
this.delegate = service;
}
@Override
public ListenableFuture<Results> execute() {
return executeWithRetry(STARTING_BACKOFF, NUM_OF_RETRIES);
}
private ListenableFuture<Results> executeWithRetry(int backoffTime, int retryCount) {
return Futures.catchingAsync(delegate.execute(), RuntimeException.class, exception -> {
if (retryCount > 0) {
try {
TimeUnit.MILLISECONDS.sleep(backoffTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Futures.immediateFailedFuture(exception);
}
int newBackoffTime = Math.min(backoffTime * BACKOFF_FACTOR, MAX_BACKOFF);
int newRetryCount = retryCount - 1;
return executeWithRetry(newBackoffTime, newRetryCount);
}
return Futures.immediateFailedFuture(exception);
}, POOL);
};
}
RetryableRestService
uses a decorator pattern to employ a retry mechanism. We take RestService
in the constructor and store it to delegate calls. Once the execute()
method is called, we call delegate.execute()
to obtain ListenableFuture
. Then we apply catchingAsync(...)
to register a behavior for an exceptional case as well as pass RuntimeException.class
to indicate what exception we are interested in. In passed lambda
, we track the number of retries made. When we run out of retries, immediateFailedFuture
is returned. If we are good with reties, we call executeWithRetry
with newly computed parameters.
private void processResponses(ListenableFuture<Results> future) {
try {
Results results = future.get();
compute(results);
} catch (RuntimeException e) {
// retries failed, now have to handle unrecoverable case
...
}
When calling get()
method, now we need to take care of unrecoverable cases, but these are the cases where all retries have already failed.
Conclusion
This time we learned what Futures
are and how to employ Futures
to achieve asynchronous computations in our application. We understood the difference between plain Future
and ListenableFuture
. We have also discussed a real use case, a retry mechanism that can be easily implemented using ListenableFuture
.
GitHub - jakub-k-slys/java-sandbox