Extensions to CompletableFuture

Some might complain that the java.util.concurrent.CompletionStage/java.util.concurrent.CompletableFuture API surface is too large and difficult to wrap your head around. But actually many methods are similar and certain use cases are verbose to express with the given methods. Therefore we provide a couple of extension methods to make certain actions more convenient on CompletableFuture. These extension methods are provided via the class de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions.

The first thing one usually notices is that there are three methods that to handle the success case case on CompletableFuture: thenApply, thenAccept, and thenRun. These methods are only named differently, because the Java compiler cannot figure out which functional interface a lambda is conforming to if a method is overloaded with two or more versions with different functional interface parameters. Interestingly Xtend does not have this restrictions and can figure out pretty well which overloaded version of a method is called, based on inspection of the lambda passed to the method.
Therefore the CompletableFutureExtensions class provides then methods simply redirecting to the JDK methods.

Example:

import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
// ...
val pool = Executors.newSingleThreadExecutor
val fut = CompletableFuture.supplyAsync([
	new Random().nextInt(1000)
],pool).then [ // thenApply, since has input and output value
	it / 10.0
].then [ // thenAccept, since has input, but expression does not return value
	System.out.println('''Random percent: «it»''')
].then [| // thenRun, since lambda does not take input
	System.out.println("The end.")
]
Tip
You may have noticed that the syntax for spawning a supplier via CompletableFuture#supplyAsync on a custom executor does not look elegant, since the pool parameter is the last one. So the lambda cannot be written behind the closing parenthesis of the parameter list. Have a look at the section Async Computations for a more Xtend style API.
Note
Currently there there are no thenAsync versions of the then methods implemented, but they are planned to be provided in the future.

The extension methods starting with when register a callback on a CompletableFuture which is invoked when it is completed an in a certain state, depending on the method. The returned future will always be completed with the original value (successfully or exceptionally), except if the callback throws an exception. In this case the returned future will be completed exceptionally with the exception thrown by the callback. If the callback is registered before completion of the future, the callback is invoked on the thread completing the future. If the callback is registered after completion of the future, the callback is invoked on the thread registering the callback. The async version of the when methods are always completed on the executor passed to the method, or on the common ForkJoinPool for the async version which does not take an executor as argument.

The extension method whenCancelled allows registering a callback on a CompletableFuture. The callback is invoked when the future was completed via cancellation.

Example:

import java.util.concurrent.CompletableFuture
import static extension de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions.*
// ...
val toCancel = new CompletableFuture
toCancel.whenCancelled [|
	println("I've been canceled")
]
toCancel.cancel

The method whenException registers a callback which is invoked when the future is completed exceptionally.

Example:

import java.util.concurrent.CompletableFuture
import static extension de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions.*
// ...
CompletableFuture.supplyAsync [
	throw new IllegalStateException
].whenException [
	println('''failed with «it.class» and cause «it.cause.class»''')
]

The recoverWith extension method is similar to the thenCompose method, but for the exceptional case. The registered callback of type (Throwable)⇒CompletionStage<? extends R> will be invoked if the future the callback is registered on completes exceptionally. The callback will be called with the exception the original future was completed with exceptionally. The future returned from the callback will be used to complete the future returned from the recoverWith extension method. This means if the original future completes successfully, the result will be used to complete the future returned from the recoverWith method. Otherwise the result of the recovery callback will be forwarded to the overall result future (no matter if the result is successful or exceptional).

Example:

import java.util.concurrent.CompletableFuture
import static extension de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions.*
// ...
CompletableFuture.supplyAsync [
	throw new IllegalStateException("Boom!")
].recoverWith [
	if(it.cause instanceof IllegalStateException)
		CompletableFuture.supplyAsync [
			"I was expecting you! Here is your asynchronous backup value."
		]
	else
		throw new IllegalArgumentException("Did not expect this!", it)
].thenAccept [
	println(it)
]

There are also recoverWithAsync versions where the recovery callback will always be executed on a given executor.

It may be useful to abort a computation and get a default value instead. This can be done using the handleCancellation extension method and canceling the original future.
The handleCancellation extension method is called with a supplier function which provides a result value when the source future is cancelled. If the original future completes successfully, the returned future will simply be completed with the same value. If the original future was cancelled (or completed with a java.util.concurrent.CancellationException), the given callback is called. If the callback completes successfully, the result will be set on the resulting future. If the callback throws an exception, this exception will be set as exceptional result to the resulting future. If the original future was completed exceptionally with a different exception, the same exception will be set as the exceptional result to the returned future.

import java.util.concurrent.CompletableFuture
import static extension de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions.*
// ...
val lateVal = CompletableFuture.supplyAsync [
	// Do not do this at home!
	// We are blocking the common pool
	Thread.sleep(1000)
	"here is some belated value."
]

lateVal.handleCancellation [
	"Here is some default value."
].thenAccept [
	println(it)
]

// let's be impatient
lateVal.cancel

The handleCancellationAsync variant executes the given handler always on the a provided executor.

Sometimes it is needed to take the result of one CompletableFuture and forward the result to another future. This can e.g. be needed when a function is handed a future to complete and gets the actual result from a method returning a future. For cases like this the forwardTo extension method can be used.

Example:

import java.util.concurrent.CompletableFuture
import static extension de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions.*
// ...
def void completeWithResult(CompletableFuture<String> res, boolean heavy) {
	if(heavy){
		doSomeHeavyWork().forwardTo(res)
	} else {
		res.complete("Some light work")
	}
}

def CompletableFuture<String> doSomeHeavyWork() {
	CompletableFuture.supplyAsync [
		"Did some heavy lifting"
	]
}

When returning a CompletableFuture from a method it may make sense to not return the future itself, but a copy, which will be completed

When returning a CompletableFuture from a method which is decoupled from one ore more internal futures (e.g using the copy or forwardTo extension method) it may still make sense to forward cancellation from the returned future to the futures used internally to abort sub-tasks.

Example:

import java.util.concurrent.CompletableFuture
import static extension de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions.*
// ...
def CompletableFuture<String> someCancellableComposition(Executor executor) {
	val result = new CompletableFuture<String>
	val CompletableFuture<String> firstStep = firstStep(executor)
	result.forwardCancellation(firstStep)
	firstStep.thenCompose [
		val secondStep = secondStep(executor,it)
		result.forwardCancellation(secondStep)
		secondStep
	].forwardTo(result)

	result
}

def CompletableFuture<String> firstStep(Executor executor) {
	val result = new CompletableFuture<String>
	executor.execute [|
		Thread.sleep(10) // evil!
		if(result.cancelled) {
			println("cancelled in first step")
		} else {
			result.complete("Some result")
		}
	]
	result
}

def CompletableFuture<String> secondStep(Executor executor, String input) {
	val result = new CompletableFuture<String>
	executor.execute [|
		if(result.cancelled) {
			println("cancelled in first step")
		} else {
			val output = input.toUpperCase
			result.complete(output)
		}
	]
	result
}

As you see in the example, the cancellation is forwarded to the two futures that are composed to calculate the overall result. Yet the returned future cannot be used to complete any internal future with a bogus result value.

The extension method cancelOnTimeout is canceling a given CompletableFuture when a timeout occurs. Note that this method returns the same future that is passed in. This method does not return a new future, consider the complex form of orTimeout (see below) for this effect.

Example:

CompletableFuture.supplyAsync [
	Thread.sleep(100) // Never actually do this!
	"Wow, so late"
].cancelOnTimeout(50, TimeUnit.MILLISECONDS)
.whenCancelled[|
	println("Oh no! It took too long.")
]

Alternatively, a version of cancelOnTimeout is provided taking a java.time.Duration as parameter.

Sometimes blocking APIs have to be used, but a future based API should be provided to the user. In this case it may be desirable that the user can cancel the future to interrupt the thread performing a blocking operation. This is tricky when running the blocking operations using a thread pool, since the thread should only be interrupted as long as the operation associated with the future is running. To support this use case the whenCancelledInterrupt method is provided.

Example:

val blockOpPool = Executors.newCachedThreadPool // pool for running blocking operations
/// ...
val sleepy = blockOpPool.asyncRun [ CompletableFuture<?> it | (1)
	it.whenCancelledInterrupt [|
		try {
			Thread.sleep(100) // perform blocking operation
		} catch (InterruptedException e) {
			println("Hey, I was cancelled")
		}
	]
]
// ...
sleepy.cancel // may interrupt Thread.sleep
  1. Here an extension method described in Async Computations is used.

The following functions introduced in JDK 9 on CompletableFuture have been back-ported in class de.fhg.fokus.xtensions.concurrent.CompletableFutureExtensions as extension methods:

Note, there is also a overloaded version of orTimeout which allows more fine grained options on the behavior of this method. Here is an example for the configuration options:

val slowFut = CompletableFuture.supplyAsync [
	Thread.sleep(100) // Never actually do this!
	"Phew, so late"
]
val withTimeout = slowFut.orTimeout [
	backwardPropagateCancel = false // do not cancel slowFut if withTimeout is cancelled
	cancelOriginalOnTimeout = false // do not cancel slowFut on timeout
	exceptionProvider = [new TimeoutException] // exception used to complete withTimeout on timeout
	scheduler = new ScheduledThreadPoolExecutor(1) // scheduler used for timeout
	timeout = (50L -> TimeUnit.MILLISECONDS) // time after which withTimeout is completed exceptionally
	tryShutdownScheduler = true // if true tries to shutdown the given scheduler when slowFut completes
]
Tip

Related JavaDocs:

results matching ""

    No results matching ""