Futures with Akka¶
Introduction¶
Scala’s documentation on futures introduces them in this way:
Futures provide a nice way to reason about performing many operations in parallel – in an efficient and non-blocking way. The idea is simple, a Future is a sort of a placeholder object that you can create for a result that does not yet exist. Generally, the result of the Future is computed concurrently and can be later collected. Composing concurrent tasks in this way tends to result in faster, asynchronous, non-blocking parallel code.
The rest of that page is well worth reading to get an overview of how futures work and what you can do with them.
In Akka, one of the standard patterns for communication between actors is the
ask pattern, in which you send a message to an actor and you expect a
reply. When you call the ask function (which can be written as a question
mark, ?, which acts as an infix operator), it immediately returns a
Future, which will complete when the reply is sent. As the Akka
documentation explains in Use with Actors, it is possible to block the
calling thread until the future completes, using Await.result. However,
they say: ‘Blocking is discouraged though as it will cause performance
problems.’ In particular, by not blocking, you can do several ask requests
in parallel.
One way to avoid blocking is to register a callback on the future, which will be called when it completes (perhaps by another thread), like this:
future.onComplete {
case Success(result) => println(result)
case Failure(ex) => ex.printStackTrace()
}
But this won’t work if you’re writing a method that needs return a value based on the result of a future. In this case, you can register a callback that transforms the result of a future into another future:
val newFuture = future.map(x => x + 1)
However, registering callbacks explicitly gets cumbersome when you need
to work with several futures together. In this case, the most convenient
alternative to blocking is to use Future as a monad. The links above
explain what this means in detail, but the basic idea is that a special
syntax, called a for-comprehension, allows you to write code that
uses futures as if they were complete, without blocking. In reality, a
for-comprehension is syntactic sugar for calling methods like
map, but it’s much easier to write and to read. You can do things
like this:
val fooFuture = (fooActor ? GetFoo("foo")).mapTo[Foo]
val barFuture = (barActor ? GetBar("bar")).mapTo[Bar]
val totalFuture = for {
foo: Foo <- fooFuture
bar: Bar <- barFuture
total = foo.getCount + bar.getCount
} yield total
Here the messages to fooActor and barActor are sent and processed in
parallel, but you’re guaranteed that total won’t be calculated until the
values it needs are available. Note that if you construct fooFuture and
barFuture inside the for comprehension, they won’t be run in parallel
(see Scala for-comprehension with concurrently running futures).
With one line of code, you can even make a list of messages to be sent
to actors, send them all in parallel, get back a list of futures, and
convert it to a single future which will complete when all the results
are available; see org.knora.webapi.util.ActorUtils.parallelAsk.
Handling Errors with Futures¶
The constructors and methods of Future (like those of Try) catch
exceptions, which cause the future to fail. This very useful property of
futures means that you usually don’t need try-catch blocks when
using the Future monad (although it is sometimes helpful to include
them, in order to catch low-level exceptions and wrap them in
higher-level ones). Any exception thrown in code that’s being run
asynchronously by Future (including in the yield expression of a
for comprehension) will be caught, and the result will be a
Future containing a Failure. Also, in the previous example, if
fooActor or barActor returns a Status.Failure message, the
for-comprehension will also yield a failed future.
However, you need to be careful with the first line of the
for-comprehension. For example, this code doesn’t handle exceptions
correctly:
private def doFooQuery(iri: IRI): Future[String] = {
for {
queryResponse <- (storeManager ? SparqlSelectRequest(queries.sparql.v1.txt.getFoo(iri).toString())).mapTo[SparqlSelectResponse]
...
} yield ...
}
The getFoo() method calls a
Twirl template function to
generate SPARQL. The ? operator returns a Future. However, the
template function is not run asynchronously, because it is called
before the Future constructor is called. So if the template function
throws an exception, it won’t be caught here. Instead, you can do this:
private def doFooQuery(iri: IRI): Future[String] = {
for {
queryString <- Future(queries.sparql.v1.txt.getFoo(iri).toString())
queryResponse <- (storeManager ? SparqlSelectRequest(queryString)).mapTo[SparqlSelectResponse]
...
} yield ...
}
Here the Future constructor will call the template function
asynchronously, and catch any exceptions it throws. This is only
necessary if you need to call the template function at the very
beginning of a for-comprehension. In the rest of the for
comprehension, you’ll already implicitly have a Future object.
Using recover on Futures¶
By using recover on a Future, an apt error message can be thrown if the Future fails. This is particularly useful when an an error message should be made more clear depending on the context the Future is used in.
For example, we are asking the resources responder to query for a certain resource in order to process it in a special way. However, the client does not know that the resources responder is sent a request and in case the resource cannot be found, the message sent back from the resources responder (NotFoundException) would not make sense to it. Instead, we would like to handle the message in a way so that it makes sense for the operation the client actually executed. We can do this by calling recover on a Future.
private def mySpecialResourceRequest(iri: IRI, userProfile: UserProfileV1): Future[...] = {
val resourceRequestFuture = for {
resResponse: ResourceFullResponseV1 <- (responderManager ? ResourceFullGetRequestV1(iri = iri, userProfile = userProfile, getIncoming = false)).mapTo[ResourceFullResponseV1]
} yield resResponse
val resourceRequestFutureRecovered = resourceRequestFuture.recover {
case notFound: NotFoundException => throw BadRequestException(s"Special resource handling failed because the resource could not be found: ${notFound.message}")
}
for {
res <- resourceRequestFutureRecovered
...
} yield ...
}
Please note that the content of the Future has to be accessed using <- to make this work correctly. Otherwise the content will never be looked at.
Designing with Futures¶
In the current design, the Knora API Server almost never blocks to wait for a future to complete. The normal flow of control works like this:
- Incoming HTTP requests are handled by an actor called
KnoraHttpService, which delegates them to routing functions (in theroutingpackage). - For each request, a routing function gets a
spray-httpRequestContext, and callsRouteUtils.runJsonRouteto send a message to a supervisor actor to fulfil the request. Having sent the message, therunJsonRoutegets a future in return. It does not block to wait for the future to complete, but instead registers a callback to process the result of the future when it becomes available. - The supervisor forwards the message to be handled by the next available actor in a pool of responder actors that are able to handle that type of message.
- The responder’s
receivemethod receives the message, and calls some private method that produces a reply message inside a future. This usually involves sending messages to other actors usingask, getting futures back, and combining them into a single future containing the reply message. - The responder passes that future to
ActorUtils.future2Message, which registers a callback on it. When the future completes (perhaps in another thread), the callback sends the reply message. In the meantime, the responder doesn’t block, so it can start handling the next request. - When the responder’s reply becomes available (causing the future created by
RouteUtils.runJsonRouteto complete), the callback registered in (2) callscompleteon theRequestContext, which sends an HTTP response to the client.
The basic rule of thumb is this: if you’re writing a method in an actor,
and anything in the method needs to come from a future (e.g. because you
need to use ask to get some information from another actor), have
the method return a future.
Mixing Futures with non-Futures¶
If you have a match ... case or if expression, and one branch
obtains some data in a future, but another branch can produce the data
immediately, you can wrap the result of the latter branch in a future,
so that both branches have the same type:
def getTotalOfFooAndBar(howToGetFoo: String): Future[Int] = {
for {
foo <- howToGetFoo match {
case "askForIt" => (fooActor ? GetFoo("foo")).mapTo[Foo]
case "createIt" => Future(new Foo())
}
bar <- (barActor ? GetBar("bar")).mapTo[Bar]
total = foo.getCount + bar.getCount
} yield total
}
How to Write For-Comprehensions¶
Here are some basic rules for writing for-comprehensions:
- The first line of a
for-comprehension has to be a “generator”, i.e. it has to use the<-operator. If you want to write an assignment (using=) as the first line, the workaround is to wrap the right-hand side in a monad (likeFuture) and use<-instead. - Assignments (using
=) are written withoutval. - You’re not allowed to write statements that throw away their return
values, so if you want to call something like
printlnthat returnsUnit, you have to assign its return value to_.
The yield returns an object of the same type as the generators,
which all have to produce the same type (e.g. Future).
Execution Contexts¶
Whenever you use a future, there has to be an implicit ‘execution context’ in scope. Scala’s documentation on futures says, ‘you can think of execution contexts as thread pools’.
If you don’t have an execution context in scope, you’ll get a compile error
asking you to include one, and suggesting that you could use
import scala.concurrent.ExecutionContext.Implicits.global. Don’t do this, because
the global Scala execution context is not the most efficient option. Instead,
you can use the one provided by the Akka ActorSystem:
implicit val executionContext = system.dispatcher
Akka’s execution contexts can be configured (see Dispatchers). You can see a Listing of the Reference Configuration.