How I Learned to Stop Worrying and Love the EventLoopFuture

Low-level framework SwiftNIO developed by Apple corporation is a foundation for all major server-side Swift frameworks — Vapor, Kitura, Smoke.

However, none of these instruments comprehensively clarify main concepts and principles of usage of the most important part of NIO — EventLoopPromise and EventLoopFuture, which leads to fundamental misunderstanding and confusion, and therefore questions like “How to get value from Future?” are asked in all kinds of forums and chats.

This is what we're gonna talk about today in this small tutorial.

This article is polymorphic, it can show you examples of code for SwiftNIO versions 1 and 2. If you're unsure: Vapor 3 uses SwiftNIO 1, and Vapor 4 — SwiftNIO 2. Choose carefully:

First let's deal with the basic principles of SwiftNIO servers. Any server works on top of an EventLoopGroup, that is a group of EventLoops. Most commonly, the number of event loops is equal to number of CPU cores. If we're speaking in queue theory terms, event loop is an abstract handler (machine) that processes a queue of incoming tasks.

Important: unlike classic blocking tasks processing model, in NIO model tasks owners don't “stand” next the handler waiting for the result, but instead “step out”, and handler sends a notification when the result is ready. Therefore the handler can gather a very large task queue, whereas in blocking model only one client can “stand” near the handler, which heavily reduces overall system bandwidth. Besides, all work is split into smaller tasks in NIO, and while waiting for one task to be done by external async service, handler can work on another request.

This definition is very important for understanding the concept of asynchronous calculations. It makes it quite obvious why is classic blocking request handling system so ineffective.

So in blocking system we have n machines (nodes) (again, quite often by the number of CPU cores, which is reasonable), which completely block own thread during request processing, and new tasks would wait outside the server (it depends on stack implementation, nginx serving as reverse-proxy can pool requests itself). Therefore such service can handle only n simultaneous requests.

A SwiftNIO server also has n processing nodes by number of CPU cores (configurable, of course), but none of requests block their threads, and machines (that very event loops) can put tasks in queues. Such server can handle an enormous number of simultaneous requests, and it's generally capped by physical RAM which is required for storing client socket connections.

Let's study a general flow of a generic SwiftNIO based web application which returns a list of something from DB:

  1. (once upon a time) Server starts and listens to a socket (say, 8080), and a part of OS core called epoll (in BSD-systems like macOS — kevent) starts notifying NIO every time something happens on socket.
  2. (later) Client TCP/IP connection hits the system, epoll/kevent notifies.
  3. NIO chooses an event loop which will handle this request (just picks a new one from an endless carousel) and creates a Channel object which reads and writes I/O data (in this particular case — over network), as well as a pipeline of handlers for data travelling through this pipeline forth and back.
  4. One handler decodes raw bytes from TLS into plain text (let's not overcomplicate and pretend that we're dealing with HTTP 1.1), other handler parses text as HTML, and in the very end of pipeline a high-level framework handler picks URL and request from parsed HTTP, dispatches the request and routes it into application according to Vapor/Kitura/Smoke config, therefore, finally, running the application code.
  5. The application validates the request (are all arguments correct? is JWT expired? whyuwannakillme?), then tells the DB “pls gimme a list of these thingies”, passes it an EventLoopPromise, and is left with pending EventLoopFuture from this promise (we will get back to it, don't worry), which eventually should have a list of our thingies.
  6. While it doesn't have them thingies yet, our event loop can do other things (like, related to other requests).
  7. Once the DB returns the result, epoll/kevent notifies NIO about it, and it would populate our EventLoopFuture with this result. In that exact moment our event loop returns to our request, forms the response, and returns it.
  8. Acquired result is passed back through the pipeline: high-level response object is first packed to JSON, the JSON is packed into HTTP response body, HTTP response is reduced into bytes, bytes are encrypted into TLS and sent by that very channel (Channel object, remember?) back by the network (let's emphasize though that it's quite unnecessary that these bytes would be sent by network, because it might actually be a disk or RAM storage).
  9. Channel is closed, roll credits.

Let's return to the fourth point, which is obviously the point of interest for us. We can even write a controller in pseudocode:

app.get("list") { (request: Request) -> EventLoopFuture<[ResponseItem]> in
    guard someValidations else {
        throw ValidationError
    }

    return ItemsService
        .getMyItemsListFuture(on: request.eventLoop)
        .map { (rawList: [MyItem]) -> [ResponseItem] in
            var result = [ResponseItem]()
    
            for rawItem in rawList {
                result.append(ResponseItem(from: rawItem))
            }
    
            return result
        }
}

It's very important to understand that we wrote only execution plan, but not the code that would execute all at once (it will execute once).

ItemsService.getMyItemsListFuture method call (which returns EventLoopFuture<[MyItem]> type) means that service sent a request to a DB, but the result is expected in some point of future, and when it's complete, we would want to do some transformations with the result (map). Therefore the part with guard will be executed immediately, once the request has entered the controller method, but inners of map — sometime. This is the point of asynchronous computations by NIO model: we create chains of actions which will eventually be executed, when previous action promises are executed. Future chain execution is done immediately (so fast, in fact, that it's totally neglectable, like 0.00001 seconds).

Important: it is impossible to get a resulting value from EventLoopFuture<T> in the same runtime when it was created, because there is no result yet, and the only thing we can do — is to register an event handler on this future, which would execute when future is fullfilled and populated with a T value. The result of event handler addition is a new future, and you can stack as many new handlers as you like. In the very end you will end up with a future which basically hold the whole execution chain inside.

Vapor expects a ResponseEncodable as controller action response type, that is an instance of a class or a structure, which can return an EventLoopFuture<Response>, and in cases we return simple non-future object, it's opaquely converted into EventLoopFuture.

Kitura works on a lower level and demands the programmer to explicitly send a response to response object and call a next closure afterwards. it looks something like this:

router.get("list") { request, response, next in
    guard someValidations else {
        throw ValidationError
    }

    let future: EventLoopFuture<[ResponseItem]> = ItemsService
        .getMyItemsListFuture(on: request.eventLoop)
        .map { (rawList: [MyItem]) -> [ResponseItem] in ... }

    future.whenSuccess { items in
        response.send(items)
        next()
    }
}

Please pay attention to the line with future.whenSuccess. This call doesn't return anything but just registers an event handler on a future, and this is what differs it from map or other methods from this family, which return a new future and allow to chain handlers. You shouldn't use map to do anything except result transformation. If you need to send your result to an external service or something like that, always use whenSuccess.

Now, since we've dealt with the full example, let's get back to the basics.

Important: EventLoopPromise, like EventLoopFuture are created only using eventLoop.makePromise, eventLoop.makeSucceededFuture, eventLoop.makeFailedFuture or eventLoop.submit methods because each promise or future must work within an event loop.

EventLoopPromise<T> — is an object in charge of succesfull or errored notification of asynchronous operation. This is a parent entity for future. Every promise holds within itself a futureResult (that is not yet fullfilled EventLoopFuture<T> with the same type as parent promise). Promise can be succeeded with method .succeed(resultValue) or failed with .fail(error). Why would you need promises in this form? It's necessary for places which do not support SwiftNIO directly, for instance, databases drivers:

func get(key: String, on eventLoop: EventLoop) -> EventLoopFuture<Bytes?> {
    let promise: EventLoopPromise<Bytes?> = eventLoop.makePromise()
    
    self.getInternalAsync(key) { maybeBytes in
        promise.succeed(maybeBytes)
    }
    
    return promise.futureResult
}

Method getInternalAsync goes into own async storage and anonymous func body will be executed somewhere sometime, that is this method is nonblocking, and that's fine, we don't need anything else because we control our asynchronous operations ourselves. We create a promise outside of async operation, pass it inside, it will be fullfilled once, and we return future result to the outside.

EventLoopFuture — is a storage (holder, box, you name it) for a future result which will eventually appear inside that future. You can't use that result immediately because it might not be there yet. There are two ways of waiting for that result.

The first one is described above, using map, a function which accepts concrete future value (when it's ready) and returns something new, a transformed value. Method map always returns another future.

There are other methods. Like, when after one async operation we gotta do another async operation (say, send some data over network or load more data from DB), we can use method flatMap.

Important: the difference between map and flatMap is map expects a concrete value as result, whereas flatMap expects a future. Thus, flatMap, as you can see from naming, “waits” for a concrete value before continuing, or in other words, flattens the value from future.

There is an example of such code (I broke it up to different futures on purpose, for better understanding, just pay attention to constants naming):

/// For better readability here and below `Future` will be a short alias
/// for `EventLoopFuture`.
/// Not to be confused with similar `Future` named things
/// from other asynchronous frameworks (like Combine).
typealias Future = EventLoopFuture

let futureFoo: Future<Foo> = FooService.getFooFuture()
let futureBar: Future<Bar> = futureFoo.map { (foo: Foo) -> Bar in
    return foo.getBar()
}
let futureBaz: Future<Baz> = futureBar.flatMap { (bar: Bar) -> Future<Baz> in
    return BazService.getBazFuture(from: bar)
}

Of course, such code is much more readable as a chain, because it's quite easy to get lost in future declarations and mess up event order:

let futureBaz: Future<Baz> = FooService
    .getFooFuture()
    .map { (foo: Foo) -> Bar in
        return foo.getBar()
    }
    .flatMap { (bar: Bar) -> Future<Baz> in
        return BazService.getBazFuture(from: bar)
    }

Final stage of future “waiting”, like I said, is always event handling with whenSuccess, whenFail or whenComplete (it accepts as input a monad type Result which can be either result or an error).

future.whenSuccess { value in
    print("Value is \(value)")
}

future.whenFail { error in
    dump("Error: \(error)")
}

future.whenComplete { (result: Result<Success, Failure>) in
    dump("Result is \(result)")
}

The other way of waiting for a result is blocking waiting:

let futureBaz: Future<Baz> = ...
let baz: Baz = try futureBaz.wait()

Therefore the whole thread will stop until all future chain is complete.

Important: you never call this method within an EventLoop context, because it will stop the whole event loop thread. Worst case scenario is when wait() is called on two event loops waiting for results from each other. This situation is called an unsolvable deadlock and basically disables both event loops for good.

Important: wait() can be called on threads which are unrelated to event loops. For example, in main thread, before server start. Or in DispatchQueue.async context.

You might've noticed in previous example with wait() that this call demands a try. It means that there might be errors during future chain execution. Let's deal with it. Say, we have to stop the whole chain if something went wrong (for instance, input data is invalid).

Complete example.

enum E: Error {
    case NilString
    case EmptyString
    case UnknownError
    case VeryUnknownError
}

let futureString: Future<String> = SomeService
    .getStringFuture()
    .flatMapThrowing { (maybeString: String?) throws -> String in
        guard let string = maybeString else {
            throw E.NilString
        }

        guard string.count > 0 else {
            throw E.EmptyString
        }

        return string
    }
    .flatMapErrorThrowing { (error: Error) throws -> String in
        if error is E.EmptyString {
            return "some empty string"
        } else {
            throw UnknownError
        }
    }
    .recover { (error: Error) -> String in
        if error is E.UnknownError {
            return "unknown error"
        } else {
            throw VeryUnknownError
        }
    }
    .map { (string: String) -> String in
        return "My string is '\(string)'"
    }

Method flatMapThrowing works the same way as map, it just can throw an error.

Remark: why is it called flatMapThrowing, but not mapThrowing? It doesn't flatten a future into value, is it? Long explanation: https://github.com/apple/swift-nio/issues/663#issuecomment-457155219. Short explanation: treat prefix flat as if it not only flattens a future into value, but also a throwing future into value.

If flatMapThrowing throws an error, current future becomes errored, and if future chain doesn't have any recoveries (error handlers), the whole chain will acquire errored state.

Error handlers flatMapErrorThrowing and recover are very similar (like map and flatMapThrowing), except that flatMapErrorThrowing may throw another error, unlike recover. There handlers are used for attempts to recover from errored state and continue chain execution if and error was thrown somewhere earlier in chain. If there is no error handler, or if it thrown an error itself, the rest of chain will be in errored state (with exceptions, see below).

You can set more than one error handler.

There are situations when you have to “wait” for a few futures to be fullfilled, and continue execution after that. It is done with methods and, fold or reduce.

Let's figure these methods out, starting with and:

let futureFoo: Future<Foo> = FooService.getFooFuture()
let futureFooBar: Future<(Foo, Bar)> = futureFoo
    .and(BarService.getBarFuture())
    .map { (foo: Foo, bar: Bar) -> (Foo, Bar) in
        ...
    }

As you can see, you can call method and from any future, which expects another future as argument and returns another future which will be fullfilled only when they both are fullfilled. Resulting value type will be a tuple of both values. Unfortunately, tuples in Swift isn't the most flexible tool, and if we add one more future with and, resulting value will not be of type Future<(Foo, Bar, Baz)>, but Future<((Foo, Bar), Baz)>.

An example with fold is more complex and niche:

let eventLoop = eventLoopGroup.next()
let future0: Future<Int> = eventLoop.makeSucceededFuture(0)
let futures: [Future<Int>] = (0..<10).map { eventLoop.makeSucceededFuture($0) }
let futureSum: Future<Int> = future0.fold(futures) { (result: Int, next: Int) -> Future<Int> in
    return eventLoop.makeSucceededFuture(result + next)
}

Resulting value will be 45.

Static method reduce has two versions, let's see them both:

let eventLoop = eventLoopGroup.next()

var result: [String] = ["first"]
let resultFuture: Future<[String]> = Future<[String]>.reduce(
    into: result,
    [
        eventLoop.makeSucceededFuture("second"),
        eventLoop.makeSucceededFuture("last"),
    ],
    on: eventLoop
) { (carry: inout [String], currentString: String) -> Void in
    carry.append("\(currentString) item")
}

// Or with immutable accumulator,
// but with increased memory consumption
// for new allocations for resulting array

let resultFuture: Future<[String]> = Future<[String]>.reduce(
    [String](["first"]),
    [
        eventLoop.makeSucceededFuture("second"),
        eventLoop.makeSucceededFuture("last"),
    ],
    on: eventLoop
) { (carry: [String], currentString: String) -> [String] in
    var newCarry = carry
    newCarry.append("\(currentString) item")
    return newCarry
}

Both examples result in a following array:

first item
second item
last item

As you might know, arrays and dictionaries in Swift aren't thread-safe, which means that parallel reading from different threads is safe, but parallel write might (and eventually will) end up deplorably. In order to solve this problem (like, for a runtime cache implementation) we have to implement an atomic (sychronized) access to a resource, that is, make it so access to a resource would be done from a certain thread. For that exact purpose there is a method hop. Usage:

let userCacheEventLoop = UserCacheService.eventLoop

let future: Future<User?> = UserService
    .getUserFuture(by: ID)
    .hop(to: userCacheEventLoop)
    .map { (maybeUser: User?) -> User
        if let user = maybeUser {
            UserService.store(user)
        }

        return maybeUser
    }

Method UserService.getUserFuture loads a user on an undetermined event loop, after that we switch (hop) to a certain event loop, and all further futures will be executed on that event loop. In the end of the chain it's desirable to hop to original event loop which started it all, but it's reasonable only for highload systems really.

Let's see another example of async computations using event loops:

let eventLoop = eventLoopGroup.next()

eventLoop.execute {
    /// This code will be executed within event loop context.
    /// Of course, it must be nonblocking.
    /// Method `execute` doesn't imply return value or future chaining
    /// it's very much like `DispatchQueue.async { ... }`
}

let future: Future<Int> = eventLoop.submit { () -> Int
    /// Unlike `execute` method, this code implies
    /// concrete return value, and `submit` method creates and
    /// returns a future of respective type.
    return 123
}
/// Similar future can be created like that:
let future: Future<Int> = eventLoop.makeSucceededFuture(123)

future.map { int in
    return "My integer result is \(int)"
}.map { string in
    return "My string result is '\(string)'"
}.whenSuccess { resultString in
    print(resultString)
}

In this humble essay I tried my best to most comprehensively describe and explain general concepts and philosophy of async computations on event loops, and give you bare dialectic minimum for working with event loops, promises and futures.

If you have questions, if you've found an error, or if you think I didn't explain some important detail about futures, feel free to leave a comment below. By the way, auth and comments services are implemented on Swift and powered by event loops :)

_____________

Self read material:

Tags: swift, swift-nio, vapor, future, promise, EventLoopFuture, EventlLoopPromise