tl;dr:параметр concurrentmergeMap может ограничивать количество одновременно активных внутренних подписок и, таким образом, делает его довольно приличным мьютексом/семафором.

Предположим, вы столкнулись с общей проблемой взаимного исключения: существует общий ресурс, к которому одновременно могут получить доступ только n задачи, но нужно выполнить m > n задач.

Теперь вы можете добавить такую ​​зависимость, как async-mutex, красивую и лаконичную библиотеку для обработки подобных ситуаций. Но если ваша проблема так же проста, как и рассматриваемая, и это отдельный случай в вашем приложении, вы можете вместо этого захотеть использовать RxJS, ваш швейцарский армейский нож асинхронного программирования.

Сразу скажу, что я не хочу рекламировать асинхронный мьютекс. Как раз наоборот: я выбрал эту библиотеку в качестве примера из-за ее простого синтаксиса, который упрощает объяснение;)

Случай N = 1

Ситуация, в которой к вашему общему ресурсу может обращаться только одна задача за раз, может выглядеть так в async-mutex:

Если подумать, task()либо выполняет doStuffWithSharedResource() немедленно, либо ему приходится сначала ждать в очереди.
Подсказка — «Очередь». Мы можем реализовать такое же поведение, используя оператор concatMap RxJS:

task() больше не обрабатывает свои входные данные самостоятельно. Вместо этого он просто ставит их в Subject — очередь, можно сказать. Оттуда значения сопоставляются с doStuffWithSharedResource() с помощью concatMap, что гарантирует, что следующая задача может начаться только после завершения предыдущей. Поскольку в этом примере doStuffWithSharedResource возвращает обещание, мы должны преобразовать его в Observable, используя from().

Дело N › 1

Если вы хотите разрешить несколько одновременных запусков doStuffWithSharedResource(), вы должны использовать семафор в асинхронном мьютексе. Он работает аналогично мьютексу:

Но RxJS тоже прикроет вашу спину. Берем предыдущий пример и заменяем concapMap на mergeMap. mergeMap имеет параметр concurrent, который слишком хорош, чтобы его так тщательно скрывать. Он делает то, что вы ожидаете: он ограничивает количество одновременно активных внутренних подписок.

Вот и все. Базовый семафор, реализованный всего в паре строк с использованием ванильного RxJS.

Обсуждение

Хорошо, решение RxJS может избавить вас от зависимости, но, честно говоря, у него есть свои оговорки:

  • Возможно, это менее читабельно.
  • task() не может вернуть результат расчета без дальнейших наворотов.
  • Он рассматривает только один тип задачи, для которой требуется ресурс.

Это то, что я имею в виду, говоря «использовать его только для решения основных задач».

До следующего раза 🍻