import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
// ...
val pool = Executors.newSingleThreadExecutor
val fut = CompletableFuture.supplyAsync([
new Random().nextInt(1000)
],pool).then [ // thenApply, since has input and output value
it / 10.0
].then [ // thenAccept, since has input, but expression does not return value
System.out.println('''Random percent: «it»''')
].then [| // thenRun, since lambda does not take input
System.out.println("The end.")
]
Extensions to CompletableFuture
Some might complain that the java.util.concurrent.CompletionStage
/java.util.concurrent.CompletableFuture
API surface is too large and difficult to wrap your head around. But actually many methods are similar and certain
use cases are verbose to express with the given methods. Therefore we provide a couple of extension methods to
make certain actions more convenient on CompletableFuture
. These extension methods are provided via the class
de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions
.
The first thing one usually notices is that there are three methods that to handle the success case case
on CompletableFuture
: thenApply
, thenAccept
, and thenRun
. These methods are only named
differently, because the Java compiler cannot figure out which functional interface a lambda is conforming
to if a method is overloaded with two or more versions with different functional interface parameters.
Interestingly Xtend does not have this restrictions and can figure out pretty well which overloaded version
of a method is called, based on inspection of the lambda passed to the method.
Therefore the CompletableFutureExtensions
class provides then
methods simply redirecting to the
JDK methods.
Example:
Tip
|
You may have noticed that the syntax for spawning a supplier via CompletableFuture#supplyAsync on a custom executor does not look elegant, since the pool parameter is the last one. So the lambda cannot be written behind the closing parenthesis of the parameter list. Have a look at the section Async Computations for a more Xtend style API. |
Note
|
Currently there there are no thenAsync versions of the then methods implemented, but they are
planned to be provided in the future.
|
The extension methods starting with when
register a callback on a CompletableFuture
which is invoked
when it is completed an in a certain state, depending on the method. The returned future will always be completed
with the original value (successfully or exceptionally), except if the callback throws an exception. In this case
the returned future will be completed exceptionally with the exception thrown by the callback. If the callback
is registered before completion of the future, the callback is invoked on the thread completing the future. If
the callback is registered after completion of the future, the callback is invoked on the thread registering
the callback. The async version of the when
methods are always completed on the executor passed to the
method, or on the common ForkJoinPool
for the async version which does not take an executor as argument.
The extension method whenCancelled
allows registering a callback on a CompletableFuture
. The callback is
invoked when the future was completed via cancellation.
Example:
import java.util.concurrent.CompletableFuture
import static extension de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions.*
// ...
val toCancel = new CompletableFuture
toCancel.whenCancelled [|
println("I've been canceled")
]
toCancel.cancel
The method whenException
registers a callback which is invoked when the future is completed exceptionally.
Example:
import java.util.concurrent.CompletableFuture
import static extension de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions.*
// ...
CompletableFuture.supplyAsync [
throw new IllegalStateException
].whenException [
println('''failed with «it.class» and cause «it.cause.class»''')
]
The recoverWith
extension method is similar to the thenCompose
method, but for the exceptional case.
The registered callback of type (Throwable)⇒CompletionStage<? extends R>
will be invoked if the future
the callback is registered on completes exceptionally. The callback will be called with the exception the
original future was completed with exceptionally. The future returned from the callback will be used to
complete the future returned from the recoverWith
extension method. This means if the original future
completes successfully, the result will be used to complete the future returned from the recoverWith
method. Otherwise the result of the recovery callback will be forwarded to the overall result future
(no matter if the result is successful or exceptional).
Example:
import java.util.concurrent.CompletableFuture
import static extension de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions.*
// ...
CompletableFuture.supplyAsync [
throw new IllegalStateException("Boom!")
].recoverWith [
if(it.cause instanceof IllegalStateException)
CompletableFuture.supplyAsync [
"I was expecting you! Here is your asynchronous backup value."
]
else
throw new IllegalArgumentException("Did not expect this!", it)
].thenAccept [
println(it)
]
There are also recoverWithAsync
versions where the recovery callback will always be executed on a given
executor.
It may be useful to abort a computation and get a default value instead. This can be done using the
handleCancellation
extension method and canceling the original future.
The handleCancellation
extension method is called with a supplier function which provides a result
value when the source future is cancelled. If the original future completes successfully, the returned
future will simply be completed with the same value. If the original future was cancelled (or completed
with a java.util.concurrent.CancellationException
), the given callback is called. If the callback
completes successfully, the result will be set on the resulting future. If the callback throws an exception,
this exception will be set as exceptional result to the resulting future. If the original future was completed
exceptionally with a different exception, the same exception will be set as the exceptional result
to the returned future.
import java.util.concurrent.CompletableFuture
import static extension de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions.*
// ...
val lateVal = CompletableFuture.supplyAsync [
// Do not do this at home!
// We are blocking the common pool
Thread.sleep(1000)
"here is some belated value."
]
lateVal.handleCancellation [
"Here is some default value."
].thenAccept [
println(it)
]
// let's be impatient
lateVal.cancel
The handleCancellationAsync
variant executes the given handler always on the a provided executor.
Sometimes it is needed to take the result of one CompletableFuture
and forward the result to another
future. This can e.g. be needed when a function is handed a future to complete and gets the actual result
from a method returning a future. For cases like this the forwardTo
extension method can be used.
Example:
import java.util.concurrent.CompletableFuture
import static extension de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions.*
// ...
def void completeWithResult(CompletableFuture<String> res, boolean heavy) {
if(heavy){
doSomeHeavyWork().forwardTo(res)
} else {
res.complete("Some light work")
}
}
def CompletableFuture<String> doSomeHeavyWork() {
CompletableFuture.supplyAsync [
"Did some heavy lifting"
]
}
When returning a CompletableFuture
from a method it may make sense to not return the future itself,
but a copy, which will be completed
When returning a CompletableFuture
from a method which is decoupled from one ore more internal
futures (e.g using the copy
or forwardTo
extension method) it may still make sense to forward
cancellation from the returned future to the futures used internally to abort sub-tasks.
Example:
import java.util.concurrent.CompletableFuture
import static extension de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions.*
// ...
def CompletableFuture<String> someCancellableComposition(Executor executor) {
val result = new CompletableFuture<String>
val CompletableFuture<String> firstStep = firstStep(executor)
result.forwardCancellation(firstStep)
firstStep.thenCompose [
val secondStep = secondStep(executor,it)
result.forwardCancellation(secondStep)
secondStep
].forwardTo(result)
result
}
def CompletableFuture<String> firstStep(Executor executor) {
val result = new CompletableFuture<String>
executor.execute [|
Thread.sleep(10) // evil!
if(result.cancelled) {
println("cancelled in first step")
} else {
result.complete("Some result")
}
]
result
}
def CompletableFuture<String> secondStep(Executor executor, String input) {
val result = new CompletableFuture<String>
executor.execute [|
if(result.cancelled) {
println("cancelled in first step")
} else {
val output = input.toUpperCase
result.complete(output)
}
]
result
}
As you see in the example, the cancellation is forwarded to the two futures that are composed to calculate the overall result. Yet the returned future cannot be used to complete any internal future with a bogus result value.
The extension method cancelOnTimeout
is canceling a given CompletableFuture
when a timeout occurs. Note that this method returns the same future that is passed in.
This method does not return a new future, consider the complex form of orTimeout
(see below) for this effect.
Example:
CompletableFuture.supplyAsync [
Thread.sleep(100) // Never actually do this!
"Wow, so late"
].cancelOnTimeout(50, TimeUnit.MILLISECONDS)
.whenCancelled[|
println("Oh no! It took too long.")
]
Alternatively, a version of cancelOnTimeout
is provided taking a java.time.Duration
as parameter.
Sometimes blocking APIs have to be used, but a future based API should be provided to the user.
In this case it may be desirable that the user can cancel the future to interrupt the thread
performing a blocking operation. This is tricky when running the blocking operations
using a thread pool, since the thread should only be interrupted as long as the operation
associated with the future is running. To support this use case the whenCancelledInterrupt
method is provided.
Example:
val blockOpPool = Executors.newCachedThreadPool // pool for running blocking operations
/// ...
val sleepy = blockOpPool.asyncRun [ CompletableFuture<?> it | (1)
it.whenCancelledInterrupt [|
try {
Thread.sleep(100) // perform blocking operation
} catch (InterruptedException e) {
println("Hey, I was cancelled")
}
]
]
// ...
sleepy.cancel // may interrupt Thread.sleep
-
Here an extension method described in Async Computations is used.
The following functions introduced in JDK 9 on CompletableFuture
have been back-ported
in class de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions
as extension methods:
Note, there is also a overloaded version of orTimeout
which allows more fine grained options on the behavior of
this method. Here is an example for the configuration options:
val slowFut = CompletableFuture.supplyAsync [
Thread.sleep(100) // Never actually do this!
"Phew, so late"
]
val withTimeout = slowFut.orTimeout [
backwardPropagateCancel = false // do not cancel slowFut if withTimeout is cancelled
cancelOriginalOnTimeout = false // do not cancel slowFut on timeout
exceptionProvider = [new TimeoutException] // exception used to complete withTimeout on timeout
scheduler = new ScheduledThreadPoolExecutor(1) // scheduler used for timeout
timeout = (50L -> TimeUnit.MILLISECONDS) // time after which withTimeout is completed exceptionally
tryShutdownScheduler = true // if true tries to shutdown the given scheduler when slowFut completes
]
Tip
|
Related JavaDocs: |