tl;dr:параметр concurrent
mergeMap
может ограничивать количество одновременно активных внутренних подписок и, таким образом, делает его довольно приличным мьютексом/семафором.
Предположим, вы столкнулись с общей проблемой взаимного исключения: существует общий ресурс, к которому одновременно могут получить доступ только n
задачи, но нужно выполнить m > n
задач.
Теперь вы можете добавить такую зависимость, как async-mutex, красивую и лаконичную библиотеку для обработки подобных ситуаций. Но если ваша проблема так же проста, как и рассматриваемая, и это отдельный случай в вашем приложении, вы можете вместо этого захотеть использовать RxJS, ваш швейцарский армейский нож асинхронного программирования.
Сразу скажу, что я не хочу рекламировать асинхронный мьютекс. Как раз наоборот: я выбрал эту библиотеку в качестве примера из-за ее простого синтаксиса, который упрощает объяснение;)
Случай N = 1
Ситуация, в которой к вашему общему ресурсу может обращаться только одна задача за раз, может выглядеть так в async-mutex:
Если подумать, task()
либо выполняет doStuffWithSharedResource()
немедленно, либо ему приходится сначала ждать в очереди.
Подсказка — «Очередь». Мы можем реализовать такое же поведение, используя оператор concatMap
RxJS:
task()
больше не обрабатывает свои входные данные самостоятельно. Вместо этого он просто ставит их в Subject
— очередь, можно сказать. Оттуда значения сопоставляются с doStuffWithSharedResource()
с помощью concatMap
, что гарантирует, что следующая задача может начаться только после завершения предыдущей. Поскольку в этом примере doStuffWithSharedResource
возвращает обещание, мы должны преобразовать его в Observable, используя from()
.
Дело N › 1
Если вы хотите разрешить несколько одновременных запусков doStuffWithSharedResource()
, вы должны использовать семафор в асинхронном мьютексе. Он работает аналогично мьютексу:
Но RxJS тоже прикроет вашу спину. Берем предыдущий пример и заменяем concapMap
на mergeMap
. mergeMap
имеет параметр concurrent
, который слишком хорош, чтобы его так тщательно скрывать. Он делает то, что вы ожидаете: он ограничивает количество одновременно активных внутренних подписок.
Вот и все. Базовый семафор, реализованный всего в паре строк с использованием ванильного RxJS.
Обсуждение
Хорошо, решение RxJS может избавить вас от зависимости, но, честно говоря, у него есть свои оговорки:
- Возможно, это менее читабельно.
task()
не может вернуть результат расчета без дальнейших наворотов.- Он рассматривает только один тип задачи, для которой требуется ресурс.
Это то, что я имею в виду, говоря «использовать его только для решения основных задач».
До следующего раза 🍻