How I Learned to Stop Worrying and Love the EventLoopFuture
Low-level framework SwiftNIO developed by Apple corporation is a foundation for all major server-side Swift frameworks — Vapor, Kitura, Smoke.
However, none of these instruments comprehensively clarify main concepts and principles of usage of the most important part of NIO — EventLoopPromise
and EventLoopFuture
, which leads to fundamental misunderstanding and confusion, and therefore questions like “How to get value from Future?” are asked in all kinds of forums and chats.
This is what we're gonna talk about today in this small tutorial.
This article is polymorphic, it can show you examples of code for SwiftNIO versions 1 and 2. If you're unsure: Vapor 3 uses SwiftNIO 1, and Vapor 4 — SwiftNIO 2. Choose carefully:
First let's deal with the basic principles of SwiftNIO servers. Any server works on top of an EventLoopGroup
, that is a group of EventLoop
s. Most commonly, the number of event loops is equal to number of CPU cores. If we're speaking in queue theory terms, event loop is an abstract handler (machine) that processes a queue of incoming tasks.
Important: unlike classic blocking tasks processing model, in NIO model tasks owners don't “stand” next the handler waiting for the result, but instead “step out”, and handler sends a notification when the result is ready. Therefore the handler can gather a very large task queue, whereas in blocking model only one client can “stand” near the handler, which heavily reduces overall system bandwidth. Besides, all work is split into smaller tasks in NIO, and while waiting for one task to be done by external async service, handler can work on another request.
This definition is very important for understanding the concept of asynchronous calculations. It makes it quite obvious why is classic blocking request handling system so ineffective.
So in blocking system we have n
machines (nodes) (again, quite often by the number of CPU cores, which is reasonable), which completely block own thread during request processing, and new tasks would wait outside the server (it depends on stack implementation, nginx serving as reverse-proxy can pool requests itself). Therefore such service can handle only n
simultaneous requests.
A SwiftNIO server also has n
processing nodes by number of CPU cores (configurable, of course), but none of requests block their threads, and machines (that very event loops) can put tasks in queues. Such server can handle an enormous number of simultaneous requests, and it's generally capped by physical RAM which is required for storing client socket connections.
Let's study a general flow of a generic SwiftNIO based web application which returns a list of something from DB:
- (once upon a time) Server starts and listens to a socket (say, 8080), and a part of OS core called
epoll
(in BSD-systems like macOS —kevent
) starts notifying NIO every time something happens on socket. - (later) Client TCP/IP connection hits the system,
epoll
/kevent
notifies. - NIO chooses an event loop which will handle this request (just picks a new one from an endless carousel) and creates a
Channel
object which reads and writes I/O data (in this particular case — over network), as well as a pipeline of handlers for data travelling through this pipeline forth and back. - One handler decodes raw bytes from TLS into plain text (let's not overcomplicate and pretend that we're dealing with HTTP 1.1), other handler parses text as HTML, and in the very end of pipeline a high-level framework handler picks URL and request from parsed HTTP, dispatches the request and routes it into application according to Vapor/Kitura/Smoke config, therefore, finally, running the application code.
- The application validates the request (are all arguments correct? is JWT expired? whyuwannakillme?), then tells the DB “pls gimme a list of these thingies”, passes it an
EventLoopPromise
, and is left with pendingEventLoopFuture
from this promise (we will get back to it, don't worry), which eventually should have a list of our thingies. - While it doesn't have them thingies yet, our event loop can do other things (like, related to other requests).
- Once the DB returns the result,
epoll
/kevent
notifies NIO about it, and it would populate ourEventLoopFuture
with this result. In that exact moment our event loop returns to our request, forms the response, and returns it. - Acquired result is passed back through the pipeline: high-level response object is first packed to JSON, the JSON is packed into HTTP response body, HTTP response is reduced into bytes, bytes are encrypted into TLS and sent by that very channel (
Channel
object, remember?) back by the network (let's emphasize though that it's quite unnecessary that these bytes would be sent by network, because it might actually be a disk or RAM storage). - Channel is closed, roll credits.
Let's return to the fourth point, which is obviously the point of interest for us. We can even write a controller in pseudocode:
app.get("list") { (request: Request) -> EventLoopFuture<[ResponseItem]> in
guard someValidations else {
throw ValidationError
}
return ItemsService
.getMyItemsListFuture(on: request.eventLoop)
.map { (rawList: [MyItem]) -> [ResponseItem] in
var result = [ResponseItem]()
for rawItem in rawList {
result.append(ResponseItem(from: rawItem))
}
return result
}
}
It's very important to understand that we wrote only execution plan, but not the code that would execute all at once (it will execute once).
ItemsService.getMyItemsListFuture
method call (which returns EventLoopFuture<[MyItem]>
type) means that service sent a request to a DB, but the result is expected in some point of future, and when it's complete, we would want to do some transformations with the result (map
). Therefore the part with guard
will be executed immediately, once the request has entered the controller method, but inners of map
— sometime. This is the point of asynchronous computations by NIO model: we create chains of actions which will eventually be executed, when previous action promises are executed. Future chain execution is done immediately (so fast, in fact, that it's totally neglectable, like 0.00001 seconds).
Important: it is impossible to get a resulting value fromEventLoopFuture<T>
in the same runtime when it was created, because there is no result yet, and the only thing we can do — is to register an event handler on this future, which would execute when future is fullfilled and populated with aT
value. The result of event handler addition is a new future, and you can stack as many new handlers as you like. In the very end you will end up with a future which basically hold the whole execution chain inside.
Vapor expects a ResponseEncodable
as controller action response type, that is an instance of a class or a structure, which can return an EventLoopFuture<Response>
, and in cases we return simple non-future object, it's opaquely converted into EventLoopFuture
.
Kitura works on a lower level and demands the programmer to explicitly send a response to response
object and call a next
closure afterwards. it looks something like this:
router.get("list") { request, response, next in
guard someValidations else {
throw ValidationError
}
let future: EventLoopFuture<[ResponseItem]> = ItemsService
.getMyItemsListFuture(on: request.eventLoop)
.map { (rawList: [MyItem]) -> [ResponseItem] in ... }
future.whenSuccess { items in
response.send(items)
next()
}
}
Please pay attention to the line with future.whenSuccess
. This call doesn't return anything but just registers an event handler on a future, and this is what differs it from map
or other methods from this family, which return a new future and allow to chain handlers. You shouldn't use map
to do anything except result transformation. If you need to send your result to an external service or something like that, always use whenSuccess
.
Now, since we've dealt with the full example, let's get back to the basics.
Important:EventLoopPromise
, likeEventLoopFuture
are created only usingeventLoop.newPromise
,eventLoop.newSucceededFuture
,eventLoop.newFailedFuture
eventLoop.makePromise
,eventLoop.makeSucceededFuture
,eventLoop.makeFailedFuture
oreventLoop.submit
methods because each promise or future must work within an event loop.
EventLoopPromise<T>
— is an object in charge of succesfull or errored notification of asynchronous operation. This is a parent entity for future. Every promise holds within itself a futureResult
(that is not yet fullfilled EventLoopFuture<T>
with the same type as parent promise). Promise can be succeeded with method .succeed(result: resultValue)
or failed with .fail(error: error)
. Why would you need promises in this form? It's necessary for places which do not support SwiftNIO directly, for instance, databases drivers:
func get(key: String, on eventLoop: EventLoop) -> EventLoopFuture<Bytes?> {
let promise: EventLoopPromise<Bytes?> = eventLoop.newPromisemakePromise()
self.getInternalAsync(key) { maybeBytes in
promise.succeed(result: maybeBytes)
}
return promise.futureResult
}
Method getInternalAsync
goes into own async storage and anonymous func body will be executed somewhere sometime, that is this method is nonblocking, and that's fine, we don't need anything else because we control our asynchronous operations ourselves. We create a promise outside of async operation, pass it inside, it will be fullfilled once, and we return future result to the outside.
EventLoopFuture
— is a storage (holder, box, you name it) for a future result which will eventually appear inside that future. You can't use that result immediately because it might not be there yet. There are two ways of waiting for that result.
The first one is described above, using map
, a function which accepts concrete future value (when it's ready) and returns something new, a transformed value. Method map
always returns another future.
There are other methods. Like, when after one async operation we gotta do another async operation (say, send some data over network or load more data from DB), we can use method thenflatMap
.
Important: the difference betweenmap
andthenflatMap
ismap
expects a concrete value as result, whereasthenflatMap
expects a future. Thus,thenflatMap
, as you can see from naming, “waits” for a concrete value before continuing, or in other words, flattens the value from future.
There is an example of such code (I broke it up to different futures on purpose, for better understanding, just pay attention to constants naming):
/// For better readability here and below `Future` will be a short alias
/// for `EventLoopFuture`.
/// Not to be confused with similar `Future` named things
/// from other asynchronous frameworks (like Combine).
typealias Future = EventLoopFuture
let futureFoo: Future<Foo> = FooService.getFooFuture()
let futureBar: Future<Bar> = futureFoo.map { (foo: Foo) -> Bar in
return foo.getBar()
}
let futureBaz: Future<Baz> = futureBar.thenflatMap { (bar: Bar) -> Future<Baz> in
return BazService.getBazFuture(from: bar)
}
Of course, such code is much more readable as a chain, because it's quite easy to get lost in future declarations and mess up event order:
let futureBaz: Future<Baz> = FooService
.getFooFuture()
.map { (foo: Foo) -> Bar in
return foo.getBar()
}
.thenflatMap { (bar: Bar) -> Future<Baz> in
return BazService.getBazFuture(from: bar)
}
Final stage of future “waiting”, like I said, is always event handling with whenSuccess
or , whenFailurewhenFail
or whenComplete
(it accepts as input a monad type Result
which can be either result or an error).
future.whenSuccess { value in
print("Value is \(value)")
}
future.whenFailurewhenFail { error in
dump("Error: \(error)")
}
future.whenComplete { (result: Result<Success, Failure>) in
dump("Result is \(result)")
}
The other way of waiting for a result is blocking waiting:
let futureBaz: Future<Baz> = ...
let baz: Baz = try futureBaz.wait()
Therefore the whole thread will stop until all future chain is complete.
Important: you never call this method within anEventLoop
context, because it will stop the whole event loop thread. Worst case scenario is whenwait()
is called on two event loops waiting for results from each other. This situation is called an unsolvable deadlock and basically disables both event loops for good.
Important:wait()
can be called on threads which are unrelated to event loops. For example, in main thread, before server start. Or inDispatchQueue.async
context.
You might've noticed in previous example with wait()
that this call demands a try
. It means that there might be errors during future chain execution. Let's deal with it. Say, we have to stop the whole chain if something went wrong (for instance, input data is invalid).
Complete example.
enum E: Error {
case NilString
case EmptyString
case UnknownError
case VeryUnknownError
}
let futureString: Future<String> = SomeService
.getStringFuture()
.thenThrowingflatMapThrowing { (maybeString: String?) throws -> String in
guard let string = maybeString else {
throw E.NilString
}
guard string.count > 0 else {
throw E.EmptyString
}
return string
}
.thenIfErrorThrowingflatMapErrorThrowing { (error: Error) throws -> String in
if error is E.EmptyString {
return "some empty string"
} else {
throw UnknownError
}
}
.mapIfErrorrecover { (error: Error) -> String in
if error is E.UnknownError {
return "unknown error"
} else {
throw VeryUnknownError
}
}
.map { (string: String) -> String in
return "My string is '\(string)'"
}
Method thenThrowingflatMapThrowing
works the same way as map
, it just can throw an error.
Remark: why is it called thenThrowingflatMapThrowing
, but not mapThrowing
? It doesn't flatten a future into value, is it? Long explanation: https://github.com/apple/swift-nio/issues/663#issuecomment-457155219. Short explanation: treat prefix thenflat
as if it not only flattens a future into value, but also a throwing future into value.
If thenThrowingflatMapThrowing
throws an error, current future becomes errored, and if future chain doesn't have any recoveries (error handlers), the whole chain will acquire errored state.
Error handlers thenIfErrorThrowingflatMapErrorThrowing
and mapIfErrorrecover
are very similar (like map and thenThrowingflatMapThrowing
), except that thenIfErrorThrowingflatMapErrorThrowing
may throw another error, unlike mapIfErrorrecover
. There handlers are used for attempts to recover from errored state and continue chain execution if and error was thrown somewhere earlier in chain. If there is no error handler, or if it thrown an error itself, the rest of chain will be in errored state (with exceptions, see below).
You can set more than one error handler.
There are situations when you have to “wait” for a few futures to be fullfilled, and continue execution after that. It is done with methods and
, fold
or reduce
.
Let's figure these methods out, starting with and
:
let futureFoo: Future<Foo> = FooService.getFooFuture()
let futureFooBar: Future<(Foo, Bar)> = futureFoo
.and(BarService.getBarFuture())
.map { (foo: Foo, bar: Bar) -> (Foo, Bar) in
...
}
As you can see, you can call method and
from any future, which expects another future as argument and returns another future which will be fullfilled only when they both are fullfilled. Resulting value type will be a tuple of both values. Unfortunately, tuples in Swift isn't the most flexible tool, and if we add one more future with and
, resulting value will not be of type Future<(Foo, Bar, Baz)>
, but Future<((Foo, Bar), Baz)>
.
An example with fold
is more complex and niche:
let eventLoop = eventLoopGroup.next()
let future0: Future<Int> = eventLoop.newSucceededFuturemakeSucceededFuture(result: 0)
let futures: [Future<Int>] = (0..<10).map { eventLoop.newSucceededFuturemakeSucceededFuture(result: $0) }
let futureSum: Future<Int> = future0.fold(futures) { (result: Int, next: Int) -> Future<Int> in
return eventLoop.newSucceededFuturemakeSucceededFuture(result: result + next)
}
Resulting value will be 45.
Static method reduce
has two versions, let's see them both:
let eventLoop = eventLoopGroup.next()
var result: [String] = ["first"]
let resultFuture: Future<[String]> = Future<[String]>.reduce(
into: result,
[
eventLoop.newSucceededFuturemakeSucceededFuture(result: "second"),
eventLoop.newSucceededFuturemakeSucceededFuture(result: "last"),
],
on: eventLoop
) { (carry: inout [String], currentString: String) -> Void in
carry.append("\(currentString) item")
}
// Or with immutable accumulator,
// but with increased memory consumption
// for new allocations for resulting array
let resultFuture: Future<[String]> = Future<[String]>.reduce(
[String](["first"]),
[
eventLoop.newSucceededFuturemakeSucceededFuture(result: "second"),
eventLoop.newSucceededFuturemakeSucceededFuture(result: "last"),
],
on: eventLoop
) { (carry: [String], currentString: String) -> [String] in
var newCarry = carry
newCarry.append("\(currentString) item")
return newCarry
}
Both examples result in a following array:
first item
second item
last item
As you might know, arrays and dictionaries in Swift aren't thread-safe, which means that parallel reading from different threads is safe, but parallel write might (and eventually will) end up deplorably. In order to solve this problem (like, for a runtime cache implementation) we have to implement an atomic (sychronized) access to a resource, that is, make it so access to a resource would be done from a certain thread. For that exact purpose there is a method hopTohop
. Usage:
let userCacheEventLoop = UserCacheService.eventLoop
let future: Future<User?> = UserService
.getUserFuture(by: ID)
.hopTohop(to: userCacheEventLoop)
.map { (maybeUser: User?) -> User
if let user = maybeUser {
UserService.store(user)
}
return maybeUser
}
Method UserService.getUserFuture
loads a user on an undetermined event loop, after that we switch (hop) to a certain event loop, and all further futures will be executed on that event loop. In the end of the chain it's desirable to hop to original event loop which started it all, but it's reasonable only for highload systems really.
Let's see another example of async computations using event loops:
let eventLoop = eventLoopGroup.next()
eventLoop.execute {
/// This code will be executed within event loop context.
/// Of course, it must be nonblocking.
/// Method `execute` doesn't imply return value or future chaining
/// it's very much like `DispatchQueue.async { ... }`
}
let future: Future<Int> = eventLoop.submit { () -> Int
/// Unlike `execute` method, this code implies
/// concrete return value, and `submit` method creates and
/// returns a future of respective type.
return 123
}
/// Similar future can be created like that:
let future: Future<Int> = eventLoop.newSucceededFuturemakeSucceededFuture(result: 123)
future.map { int in
return "My integer result is \(int)"
}.map { string in
return "My string result is '\(string)'"
}.whenSuccess { resultString in
print(resultString)
}
In this humble essay I tried my best to most comprehensively describe and explain general concepts and philosophy of async computations on event loops, and give you bare dialectic minimum for working with event loops, promises and futures.
If you have questions, if you've found an error, or if you think I didn't explain some important detail about futures, feel free to leave a comment below. By the way, auth and comments services are implemented on Swift and powered by event loops :)
_____________
Self read material:
-
Documentation on
EventLoopFuture
:https://apple.github.io/swift-nio/docs/current/NIO/Classes/EventLoopFuture.html
https://apple.github.io/swift-nio/docs/1.14.1/NIO/Classes/EventLoopFuture.html
-
Documentation on
EventLoopPromise
:https://apple.github.io/swift-nio/docs/current/NIO/Structs/EventLoopPromise.html
https://apple.github.io/swift-nio/docs/1.14.1/NIO/Structs/EventLoopPromise.html
-
Documentation on
EventLoop
:https://apple.github.io/swift-nio/docs/current/NIO/Protocols/EventLoop.html
https://apple.github.io/swift-nio/docs/1.14.1/NIO/Protocols/EventLoop.html
-
Documentation on
ChannelPipeline
(how data is processed within aChannel
):https://apple.github.io/swift-nio/docs/current/NIO/Classes/ChannelPipeline.html
https://apple.github.io/swift-nio/docs/1.14.1/NIO/Classes/ChannelPipeline.html
Pending comments