Как я перестал бояться и полюбил EventLoopFuture
Низкоуровневый фреймворк SwiftNIO, разработанный компанией Эппл, лежит в основе всех крупных серверных фреймворков на языке Свифт — Vapor, Kitura, Smoke.
Однако ни один из этих инструментов не разъясняет в полной мере концепций и особенностей использования самой главной части NIO — EventLoopPromise
и EventLoopFuture
, что вызывает у конечных программистов фундаментальное непонимание всего происходящего, оттуда и вопросы во всех чатах и форумах вроде «А как достать значение из Future?».
Об этом и поговорим.
Статья полиморфная, она может показывать вам примеры кода для SwiftNIO как первой, так и второй версии. Для справки: Vapor 3 использует SwiftNIO 1, а Vapor 4 — SwiftNIO 2. Выбирайте тщательно:
Сперва разберем общую структуру происходящего в серверах на базе SwiftNIO. Любой сервер работает поверх объекта EventLoopGroup
, то есть группы EventLoop
, чаще всего их количество равно количеству ядер в сервере. Если говорить терминами теории массового обслуживания, то эвент луп — это станок, который обрабатывает поток входящих заявок.
Важно: в отличие от классической блокирующей модели обработки заявок, в модели NIO владельцы заявок не стоят у станка, ожидая ответа, а «отходят» от него, а станок сам присылает им уведомление о том, что ответ можно получить. Таким образом станок может собирать очередь заявок произвольного размера, в то время как в блокирующей модели у станка может находиться только один заказчик, что сильно снижает общую пропускную способность всей системы. Кроме того, вся работа в модели NIO разбивается на маленькие задачи, ожидая выполнения которых, можно начать выполнять другую заявку.
Эта формулировка очень важна для понимания всей концепции асинхронных вычислений. Из нее становятся очевидными недостатки классической блокирующей схемы обработки запросов.
Итак, в блокирующей модели мы имеем n
обработчиков запросов (часто, опять же, по количеству ядер системы, это вполне оптимально), которые полностью блокируют свой поток на время выполнения своей заявки, а новые заявки будут ждать снаружи сервера (зависит от конкретных реализаций, nginx в роли реверс-прокси может пулить их у себя). Таким образом этот сервер максимум одновременно может обрабатывать только n
задач.
Сервер на SwiftNIO тоже имеет n
обработчиков по количеству ядер, но все задачи, которые они выполняют, не блокируют свои потоки, а обработчики (те самые эвент лупы) могут складировать у себя задачи в очереди. Такой сервер одновременно может держать невероятное количество задач, которое в общем случае упирается в объем оперативной памяти сервера, который необходим для хранения открытых сокетных соединений клиентов.
Рассмотрим общую схему работы сервера на СвифтНИО с серверным приложением, которое возвращает нам список чего-то из базы:
- (однажды) Сервер стартует и начинает слушать сокет (например, 8080), а часть ядра системы под названием
epoll
(в BSD-системах вроде макоси —kevent
) начинает уведомлять НИО всякий раз, когда на сокете что-то появилось. - (позже) TCP/IP соединение клиента приходит в систему,
epoll
/kevent
уведомляет. - НИО выбирает эвент луп, который будет обрабатывать этот запрос (просто берет следующий из бесконечной карусели) и создает объект
Channel
— канал, который принимает и отправляет I/O данные (в конкретно этом случае — через сеть), а также цепочку обработчиков данных, ходящих по этой цепочке. - Один обработчик расшифровывает голые байты TLS в текст, другой обработчик парсит текст как HTTP, а в самом конце цепочки обработчик фреймворка достает из разобранного HTTP урл и тело запроса, роутит запрос в соответствии с конфигом Вапора/Китуры/Смоука, и запускает конкретный код контроллера приложения.
- Приложение валидирует запрос (а правильный ли аргумент? а не просрочен ли JWT? а родители не пьюще ли?), затем говорит базе «выдай мне список этих вот штук», передает ей
EventLoopPromise
, а сам остается сEventLoopFuture
от этого промиса (мы вернемся к этим непонятным пунктам), где однажды появится список нужных нам штук. - Пока этих штук там нет, эвент луп занимается другими делами, относящимися к другим запросам.
- Однажды база данных возвращает значение,
epoll
/kevent
сообщат об этом NIO, и он вставит результат вEventLoopFuture
. Ровно в этот момент эвент луп возвращается к нашей задаче, формирует ответ и возвращает его. - Полученный выше результат передается по цепочке в обратную сторону: высокоуровневый объект результата сперва пакуется в JSON, затем этот JSON укладывается в тело ответа HTTP, затем HTTP превращается в байты, затем эти байты шифруются в TLS и отправляются этим же каналом (
Channel
) обратно по сети (подчеркнем, что вовсе необязательно, что они будут отправлены именно по сети, это может быть, например, дисковая операция или хранилище в оперативной памяти). - Канал закрывается.
Вернемся к четвертому пункту, который интересует нас больше всего. Даже можем написать условный пример кода контроллера:
app.get("list") { (request: Request) -> EventLoopFuture<[ResponseItem]> in
guard someValidations else {
throw ValidationError
}
return ItemsService
.getMyItemsListFuture(on: request.eventLoop)
.map { (rawList: [MyItem]) -> [ResponseItem] in
var result = [ResponseItem]()
for rawItem in rawList {
result.append(ResponseItem(from: rawItem))
}
return result
}
}
Очень важно понять, что мы написали план выполнения кода, а не код, который сразу выполнится весь целиком.
Обращение к методу ItemsService.getMyItemsListFuture
(который возвращает тип EventLoopFuture<[MyItem]>
) означает, что сервис отправил запрос в базу, но результат будет когда-то, и когда он прийдет, нужно будет произвести с результатом еще какие-то преобразования (map
). Таким образом часть с guard
будет выполнена сразу, как только запрос пришел на сервер, а содержимое map
— когда-нибудь. В этом и смысл асинхронных вычислений по модели СвифтНИО: мы создаем цепочки действий, которые будут выполнены однажды, когда выполнятся предыдущие заложенные обещания действий.
Важно: невозможно получить значениеEventLoopFuture<T>
в том же рантайме, где он был создан, потому что его еще там нет, а единственное, что мы можем сделать — повесить на этот фьючер обработчик события, который выполнится, когда у фьючера появится значениеT
. Результатом навешивания коллбека на фьючер будет еще один фьючер, и так можно делать сколько угодно раз, длина цепочки фьючеров не ограничена.
Вапор ожидает в качестве результата выполнения роута ResponseEncodable
, то есть любой объект или структуру, которая может вернуть EventLoopFuture<Response>
, а когда мы возвращаем обычный объект, то он прозрачно преобразуется в EventLoopFuture
.
Китура работает на более низком уровне, и требует, чтобы пользователь явным образом отправлял результат из роутера в объект response
и вызывал спецфункцию next
. Выглядит это примерно следующим образом:
router.get("list") { request, response, next in
guard someValidations else {
throw ValidationError
}
let future: EventLoopFuture<[ResponseItem]> = ItemsService
.getMyItemsListFuture(on: request.eventLoop)
.map { (rawList: [MyItem]) -> [ResponseItem] in ... }
future.whenSuccess { items in
response.send(items)
next()
}
}
Обращаю внимание на строку future.whenSuccess
. Этот вызов ничего не возвращает, а просто вешает на фьючер обработчик выполнения, и этим отличается от map
или иных методов этого семейства, которые возвращают новый фьючер и позволяют создавать цепочки обработчиков.
Теперь, когда мы разобрались с полным примером, вернемся к самым основам.
Важно:EventLoopPromise
, как иEventLoopFuture
создаются только с помощью методовeventLoop.newPromise
,eventLoop.newSucceededFuture
,eventLoop.newFailedFuture
eventLoop.makePromise
,eventLoop.makeSucceededFuture
,eventLoop.makeFailedFuture
илиeventLoop.submit
, поскольку всякий промис или фьючер работает строго в пределах своего эвент лупа.
EventLoopPromise<T>
— это объект, который отвечает за успешное или неуспешное выполнение асинхронной операции. Это родительская сущность для фьючера. Всякий промис содержит в себе futureResult
(хранящий, разумеется, еще невыполненный EventLoopFuture<T>
, то есть с тем же типом, что и родительский промис). Промису можно сказать .succeed(result: resultValue)
, а можно сказать .fail(error: error)
. Зачем нужны промисы в таком виде? В первую очередь они нужны для мест, которые не поддерживают СвифтНИО напрямую. Например, драйвера для баз данных:
func get(key: String, on eventLoop: EventLoop) -> EventLoopFuture<Bytes?> {
let promise: EventLoopPromise<Bytes?> = eventLoop.newPromisemakePromise()
self.getInternalAsync(key) { maybeBytes in
promise.succeed(result: maybeBytes)
}
return promise.futureResult
}
Метод getInternalAsync
уходит в собственную асинхронность хранилища и тело анонимной функции будет выполнено где-то когда-то, то есть этот метод полностью неблокирующий, но нам большего и не нужно, ведь мы сами оперируем отложенными вычислениями. Создаем промис снаружи асинхронности, передаем его внутрь, где он когда-нибудь будет выполнен, а возвращаем будущий результат.
EventLoopFuture
— хранилище для будущего результата, которое появится внутри однажды. Воспользоваться этим результатом сразу нельзя, потому что его там, скорее всего, еще нет. Есть два способа дождаться этого результата.
Первый описан выше, через map
, функцию, которая принимает в себя конкретное значение фьючера, а возвращает какое-то новое, преобразованное значение. Результатом функции map
является новый фьючер.
Есть и другие методы. Например, в случае, если после одной асинхронной операции нам нужно обратиться к другой (скажем, отправить данные по сети), мы можем воспользоваться методом thenflatMap
.
Важно: разница междуmap
иthenflatMap
заключается в том, чтоmap
в качестве результата ожидает конкретное значение, аthenflatMap
— фьючер на конкретное значение. Таким образом,thenflatMap
, как видно из его названия, перед продолжением дожидается появления значения, или, иными словами, как бы разглаживает значение из фьючера.
Пример такого кода (я специально разбил его на отдельные фьючеры для более простого понимания происходящего, только внимательно следите за названиями констант фьючеров):
/// Для упрощения чтения здесь и далее `Future` будет коротким алиасом
/// для `EventLoopFuture`.
/// Его не следует путать с похожими идентификаторами `Future` из других
/// асинхронных фреймворков.
typealias Future = EventLoopFuture
let futureFoo: Future<Foo> = FooService.getFooFuture()
let futureBar: Future<Bar> = futureFoo.map { (foo: Foo) -> Bar in
return foo.getBar()
}
let futureBaz: Future<Baz> = futureBar.thenflatMap { (bar: Bar) -> Future<Baz> in
return BazService.getBazFuture(from: bar)
}
Разумеется, такой код лучше читается как цепочка, поскольку легко потеряться в объявлениях фьючеров и напортачить с очередностью событий:
let futureBaz: Future<Baz> = FooService
.getFooFuture()
.map { (foo: Foo) -> Bar in
return foo.getBar()
}
.thenflatMap { (bar: Bar) -> Future<Baz> in
return BazService.getBazFuture(from: bar)
}
Конечным этапом ожидания фьючера, как уже было сказано выше, является навешивание обработчиков событий whenSuccess
или , whenFailurewhenFail
или whenComplete
(он принимает в качестве результата монаду Result
, которая может быть как значением, так и ошибкой).
future.whenSuccess { value in
print("Value is \(value)")
}
future.whenFailurewhenFail { error in
dump("Error: \(error)")
}
future.whenComplete { (result: Result<Success, Failure>) in
dump("Result is \(result)")
}
Вторым способом дождаться значения из фьючера является блокирующее ожидание:
let futureBaz: Future<Baz> = ...
let baz: Baz = try futureBaz.wait()
Таким образом весь текущий тред остановится, ожидая окончания выполнения всей цепочки фьючеров.
Важно: никогда нельзя вызывать этот метод внутри контекстаEventLoop
, поскольку он остановит весь поток, на котором работает эвент луп. Хуже всего, если сложится ситуация, когдаwait()
вызван на двух эвент лупах, которые ожидают выполнения задач друг от друга. Эта ситуация называется неразрешимым дедлоком, и выводит из строя оба эвент лупа.
Важно:wait()
можно вызывать только в потоках, которые не имеют отношения к эвент лупам. Например, в главном потоке, перед запуском сервера. Или в контекстеDispatchQueue.async
.
Вы могли заметить в последнем примере с wait()
, что этот вызов требует try
. Это означает, что при выполнении цепочки фьючеров могли произойти ошибки. Разберемся с ними. Допустим, нам требуется прервать выполнение всей цепочки фьючеров, если где-то что-то пошло не так (скажем, не прошли валидацию входные данные).
Разберем полный пример
enum E: Error {
case NilString
case EmptyString
case UnknownError
case VeryUnknownError
}
let futureString: Future<String> = SomeService
.getStringFuture()
.thenThrowingflatMapThrowing { (maybeString: String?) throws -> String in
guard let string = maybeString else {
throw E.NilString
}
guard string.count > 0 else {
throw E.EmptyString
}
return string
}
.thenIfErrorThrowingflatMapErrorThrowing { (error: Error) throws -> String in
if error is E.EmptyString {
return "some empty string"
} else {
throw UnknownError
}
}
.mapIfErrorrecover { (error: Error) -> String in
if error is E.UnknownError {
return "unknown error"
} else {
throw VeryUnknownError
}
}
.map { (string: String) -> String in
return "My string is '\(string)'"
}
Метод thenThrowingflatMapThrowing
работает точно так же, как и map
, с той разницей, что может выкинуть исключение.
Ремарка: почему метод называется thenThrowingflatMapThrowing
, а не mapThrowing
? Он же не разглаживает фьючер в значение? Длинная причина: https://github.com/apple/swift-nio/issues/663#issuecomment-457155219. Короткая причина: считайте, что префикс thenflat
означает не только разглаживание фьючера в значение, но и кидающего фьючера в значение.
Если в thenThrowingflatMapThrowing
будет выкинуто исключение, то текущий фьючер примет значение соответствующей ошибки, и если в цепочке не будет обработчиков ошибок, то конечный результат также войдет в состояние ошибки.
Обработчики ошибок thenIfErrorThrowingflatMapErrorThrowing
и mapIfErrorrecover
очень похожи (как мап и thenThrowingflatMapThrowing
), с той лишь разницей, что thenIfErrorThrowingflatMapErrorThrowing
может снова выкинуть исключение, а mapIfErrorrecover
нет. Эти обработчики используются для попытки восстановить значение и продолжить цепочку фьючеров, если в цепочке перед ними была выкинута ошибка. Если обработчика нет, или если он сам выкинул исключение, то вся последующая цепочка не выполнится (есть исключения, смотри ниже).
В цепочке можно установить более одного обработчика ошибок, который может восстановить результат и продолжить выполнение цепочки фьючеров (до следующей ошибки).
Случаются ситуации, когда требуется дождаться выполнения нескольких фьючеров, и только тогда продолжать работать. Это достигается с помощью методов and
, fold
или reduce
.
Разберем примеры использования каждого из этих методов. Начнем с and
:
let futureFoo: Future<Foo> = FooService.getFooFuture()
let futureFooBar: Future<(Foo, Bar)> = futureFoo
.and(BarService.getBarFuture())
.map { (foo: Foo, bar: Bar) -> (Foo, Bar) in
...
}
Как видно, у любого фьючера можно вызвать метод and
, который принимает еще один фьючер, и создает новый фьючер, который будет выполнен только когда они оба успешно выполнятся. Итоговое значение этого фьючера будет кортежем из двух значений. К сожалению, кортежи в Свифте — не самый гибкий инструмент, и если мы добавим к получившемуся фьючеру еще один and
, то итоговый фьючер будет иметь тип не Future<(Foo, Bar, Baz)>
(как хотелось бы), а Future<((Foo, Bar), Baz)>
.
Пример с fold
более сложный и нишевый:
let eventLoop = eventLoopGroup.next()
let future0: Future<Int> = eventLoop.newSucceededFuturemakeSucceededFuture(result: 0)
let futures: [Future<Int>] = (0..<10).map { eventLoop.newSucceededFuturemakeSucceededFuture(result: $0) }
let futureSum: Future<Int> = future0.fold(futures) { (result: Int, next: Int) -> Future<Int> in
return eventLoop.newSucceededFuturemakeSucceededFuture(result: result + next)
}
Результирующим значением будет 45.
У статического метода reduce
есть две версии, приведем их обе:
let eventLoop = eventLoopGroup.next()
var result: [String] = ["first"]
let resultFuture: Future<[String]> = Future<[String]>.reduce(
into: result,
[
eventLoop.newSucceededFuturemakeSucceededFuture(result: "second"),
eventLoop.newSucceededFuturemakeSucceededFuture(result: "last"),
],
on: eventLoop
) { (carry: inout [String], currentString: String) -> Void in
carry.append("\(currentString) item")
}
// Или с иммутабельным аккумулятором,
// но увеличенным расходом памяти на
// новые аллокации итогового массива
let resultFuture: Future<[String]> = Future<[String]>.reduce(
[String](["first"]),
[
eventLoop.newSucceededFuturemakeSucceededFuture(result: "second"),
eventLoop.newSucceededFuturemakeSucceededFuture(result: "last"),
],
on: eventLoop
) { (carry: [String], currentString: String) -> [String] in
var newCarry = carry
newCarry.append("\(currentString) item")
return newCarry
}
Оба примера на выходе дают массив строк:
first item
second item
last item
Как известно (или было неизвестно, но вот уже нет), массивы и словари в Свифте не являются потокобезопасными (thread-safe), что означает, что параллельное чтение из разных тредов вполне безопасно, а вот одновременная запись может закончиться плачевно. Для решения этой задачи (скажем, при обращении к рантайм-кешу) бывает необходимо осуществить атомарный доступ к ресурсу, то есть создать условия, когда обращение к ресурсу будет выполняться строго из известного потока. Чтобы заставить выполнить какое-то действие фьючера на известном эвентлупе, существует метод hopTohop
. Его использование выглядит примерно так:
let userCacheEventLoop = UserCacheService.eventLoop
let future: Future<User?> = UserService
.getUserFuture(by: ID)
.hopTohop(to: userCacheEventLoop)
.map { (maybeUser: User?) -> User
if let user = maybeUser {
UserService.store(user)
}
return maybeUser
}
Метод UserService.getUserFuture
загружает юзера на неизвестном эвент лупе, после чего мы выполняем переход (прыгаем) на заранее известный эвент луп, и все дальнейшие фьючеры будут выполняться именно на нем. В конце формирования цепочки фьючеров желательно перепрыгнуть обратно на оригинальный эвент луп, с которого мы начали, но это имеет реальный смысл в высоконагруженных системах.
Чтобы лучше понять концепцию отложенных вычислений на эвент лупах, покажу еще один пример работы с эвент лупами:
let eventLoop = eventLoopGroup.next()
eventLoop.execute {
/// Этот код будет выполнен в контексте эвент лупа.
/// Разумеется, он должен быть неблокирующим.
/// Метод `execute` не подразумевает возврат значения или чейнинг
/// фьючеров, его можно сравнить с `DispatchQueue.async { ... }`
}
let future: Future<Int> = eventLoop.submit { () -> Int
/// В отличие от метода `execute`, этот код предполагает конкретное
/// возвращаемое значение, а сам метод `submit` создает и
/// возвращает фьючер соответствующего типа.
return 123
}
/// Аналогичный фьючер можно создать вот так:
let future: Future<Int> = eventLoop.newSucceededFuturemakeSucceededFuture(result: 123)
future.map { int in
return "My integer result is \(int)"
}.map { string in
return "My string result is '\(string)'"
}.whenSuccess { resultString in
print(resultString)
}
В этом скромном эссе я постарался максимально понятно и в меру подробно разъяснить общую философию работы с отложенными вычислениями на эвент лупах и дать необходимый диалектический минимум для работы с эвент лупами, промисами и фьючерами.
Если у вас возникли вопросы, если вы нашли ошибку или опечатку, если вы считаете, что я не рассказал о каком-то важном нюансе работы с фьючерами, смело пишите в комментарии. Кстати, сервисы авторизации и комментов написаны на Свифте и работают строго на эвент лупах :)
_____________
Материалы для самостоятельного чтения:
-
Документация
EventLoopFuture
:https://apple.github.io/swift-nio/docs/current/NIOCore/Classes/EventLoopFuture.html
https://apple.github.io/swift-nio/docs/1.14.1/NIO/Classes/EventLoopFuture.html
-
Документация
EventLoopPromise
:https://apple.github.io/swift-nio/docs/current/NIOCore/Structs/EventLoopPromise.html
https://apple.github.io/swift-nio/docs/1.14.1/NIO/Structs/EventLoopPromise.html
-
Документация
EventLoop
:https://apple.github.io/swift-nio/docs/current/NIOCore/Protocols/EventLoop.html
https://apple.github.io/swift-nio/docs/1.14.1/NIO/Protocols/EventLoop.html
-
Документация
ChannelPipeline
(как обрабатываются данные вChannel
):https://apple.github.io/swift-nio/docs/current/NIOCore/Classes/ChannelPipeline.html
https://apple.github.io/swift-nio/docs/1.14.1/NIO/Classes/ChannelPipeline.html
Комменты на апрув