Порядок операторов в RxJs
• 6 минут / Опубликован на Habr
TL;DR: Порядок важен. Операторы довольно атомарны и зачастую очень просты, но это не мешает им объединяться в сложные последовательности, в которых легко допустить ошибку. Давайте разберемся.
Будет очень много marble диаграмм, извините. Пара ресурсов для тех, кто не в курсе про marble диаграммы: How to Read an RxJS Marble Diagram и Testing RxJS Code with Marble Diagrams. В некоторых очень спорных местах я добавил подсказки: ↑
подскажет, когда конкретно произошло событие; ⇈
означает, что произошло два синхронных события подряд.
Простая последовательность
Начнем с обычного интервала.
interval(1000).subscribe(console.log);
Его диаграмма будет такой:
// interval(1000)
-----0-----1-----2-----3-----4--->
Интервал выводит числа по возрастанию, давайте прокачаем его и добавим немного случайности с помощью Math.random()
:
interval(1000)
.pipe(map(() => Math.random()))
.subscribe(console.log);
Для упрощения диаграммы, будем считать, что Math.random()
возвращает целые числа.
Обновим диаграмму:
// interval(1000)
-----0-----1-----2-----3-----4--->
// map(() => Math.random())
-----1-----6-----3-----2-----9--->
Ждать первое значение целую секунду — это долго. Хочется сразу после подписки видеть значение в консоли. Есть прекрасный оператор startWith
. Добавим его в нашу цепочку операторов:
interval(1000)
.pipe(
map(() => Math.random()),
startWith(-1),
)
.subscribe(console.log);
Можете предположить результат?
Помните, я говорил, что порядок важен. Это как раз хороший пример для данного утверждения. Давайте посмотрим на диаграмму:
// interval(1000)
-----0-----1-----2-----3-----4--->
// map(() => Math.random())
-----1-----6-----3-----2-----9--->
// startWith(-1)
(-1)-1-----6-----3-----2-----9--->
↑ ↑
Упс. Значение добавляется после нашей функции случайности. Название немного сбивает, и, мне кажется, что используя startWith
в первый раз, я поставил этот оператор не туда.
Как это работает: под капотом оператор создаёт новый стрим, который передается в следующий оператор. Поэтому получается, что startWith
принимает стрим, который пришел из map
, и уже в этот стрим записывается первое значение.
Окей, теперь зная это, поправим код так, чтобы все цифры проходили через map
, в том числе и результат startWith
.
interval(1000)
.pipe(
startWith(-1),
map(() => Math.random()),
)
.subscribe(console.log);
Получим такую диаграмму:
// interval(1000)
-----0-----1-----2-----3-----4--->
// startWith(-1)
(-1)-0-----1-----2-----3-----4--->
↑ ↑
// map(() => Math.random())
2----1-----6-----3-----2-----9--->
Перфекто.
У меня есть к вам вопрос: что будет в консоли (или как будет выглядеть диаграмма) при такой последовательности операторов?
interval(1000)
.pipe(
startWith(-1),
map(() => Math.random()),
startWith('a'),
)
.subscribe(console.log);
Операторы же выполняются по порядку. Ведь так? Ведь так!?
Да, все так. Только надо внимательно следить за происходящим и помнить: каждый оператор создает новый поток. Разберем по шагам:
- Создаем поток из
interval(1000)
; - К этому потоку
startWith
добавляет в самое начало-1
; - Выполняем
Math.random()
; - К потоку из предыдущего шага следующий
startWith
в самое начало добавляет'a'
; - Сразу после подписки мы увидим
'a'
, следом за ним будет результатMath.random()
, который выполнился из-за-1
. Все это будет происходить синхронно. - Остальные значения будут выводиться асинхронно в консоли раз в секунду.
Диаграмма все упростит (надеюсь):
// interval(1000)
-----0-----1-----2-----3-----4--->
// startWith(-1)
(-1)-0-----1-----2-----3-----4--->
↑ ↑
// map(() => Math.random())
2----1-----6-----3-----2-----9--->
// startWith('a')
a2---1-----6-----3-----2-----9--->
⇈
Порядок операторов важен. Но иногда бывает не так очевидно, что придет в subscribe
, в какой последовательности и почему.
shareReplay
Есть операторы (или группы операторов), которые требуют к себе особого внимания: их неправильное использование может привести к утечкам памяти.
Начнем с группы операторов шаринга состояния. Все примеры будут с оператором shareReplay
, но это применимо и к другим операторам и их комбинациям из «sharing группы».
Мой общий совет звучит так: shareReplay
должен быть последним оператором в .pipe()
. Если вы располагаете его в другом месте, либо вы знаете, зачем он там нужен, либо совершаете ошибку. Разберемся почему.
Взглянем на shareReplay
, точнее, на его отсутствие:
const randomTimer = interval(1000).pipe(map(() => Math.random()));
randomTimer.subscribe(console.log);
randomTimer.subscribe(console.log);
Каждый subscribe
создает свой поток интервалов, который потом преобразуется с помощью Math.random()
. Получается, что каждую секунду мы будем видеть 2 цифры в консоли, но они будут разные.
// subscribe
-----3-----7-----1-----8-----9--->
// subscribe
-----5-----2-----1-----7-----0--->
Если мы сделаем подписку на один из потоков асинхронной:
const randomTimer = interval(1000).pipe(map(() => Math.random()));
randomTimer.subscribe(console.log);
setTimeout(() => {
randomTimer.subscribe(console.log);
}, 500);
то каждая подписка будет писать в консоль число независимо от предыдущей. Будет такая диаграмма результата:
// subscribe
-----3-----7-----1-----8-----9--->
// setTimeout
----5-----2-----1-----7-----0->
Обратите внимание, что вторая подписка начинается не сразу.
Если мы хотим во всех подписках использовать один и тот же интервал, а не создавать его каждый раз заново, то без shareReplay
не обойтись:
const randomTimer = interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 }),
map(() => Math.random()),
);
randomTimer.subscribe(console.log);
setTimeout(() => {
randomTimer.subscribe(console.log);
}, 500);
Угадаете, какой будет диаграмма?
// subscribe
-----3-----7-----1-----8-----9--->
// setTimeout
--5-----2-----1-----7-----0--->
Да, мы подписываемся с задержкой в 500мс. Но так как мы шарим интервал, задержка не важна: значения будут отображаться в консоли одновременно. Давайте поменяем интервал на 1500мс:
const randomTimer = interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 }),
map(() => Math.random()),
);
randomTimer.subscribe(console.log);
setTimeout(() => {
randomTimer.subscribe(console.log);
}, 1500);
Диаграмма:
// subscribe
-----3-----7-----1-----8-----9--->
// setTimeout
5--2-----1-----7-----0--->
shareReplay
работает таким образом, что запоминает последнее значение, и, если оно было, мы получаем его мгновенно, без задержек. А все последующие значения будут отображаться во время срабатывания таймера.
Идем дальше. А что если нам надо шарить результат и Math.random()
в том числе? Надо поместить shareReplay
чуть-чуть подальше:
const randomTimer = interval(1000).pipe(
map(() => Math.random()),
shareReplay({ refCount: true, bufferSize: 1 }),
);
randomTimer.subscribe(console.log);
setTimeout(() => {
randomTimer.subscribe(console.log);
}, 1500);
Думаю, что диаграмма окажется вполне очевидной:
// subscribe
-----3-----7-----1-----8-----9--->
// setTimeout
3--7-----1-----8-----9--->
В наших примерах, когда shareReplay
находился перед map
, это был баг, а не фича. Как я упоминал ранее, операторы, подобные shareReplay
, в большинстве случаев надо использовать в конце пайпа. Я обычно добавляю этот оператор в конце каждого пайпа.
takeUntil и автоматическая отписка
* Сейчас я буду говорить про один из классических для Angular подходов автоматической отписки. Если вы используете RxJs в хуках React'а и/или используете другой подход автоматической отписки, то это правило именно в такой формулировке к вам не применимо. Об использовании takeUntil
по другому поговорим чуть позже. На этом подходе можно хорошо показать важность правильного расположения операторов друг между другом.
Перейдем ко второй группе: операторы завершения потока. Примеры будут с takeUntil
, но это применимо также и к другим операторам аналогичной группы.
К takeUntil
в рамках Angular я тоже применяю одно общее правило использования: takeUntil
должен быть последним оператором перед subscribe
и должен быть всегда.
Почему так? Разберем на примере одного из типовых применений takeUntil
в Angular-компонентах, но с неправильным порядком операторов.
class MyComponent {
private destroy$ = new ReplaySubject(1);
ngOnInit() {
interval(1000)
.pipe(
takeUntil(this.destroy$),
switchMap(() => this.apiService.ping()),
)
.subscribe();
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
Что тут произойдет? Если в момент того, как срабатывает destroy$
, запрос находится ещё в процессе, то мы его не отменим. И, кстати, мы не можем гарантировать, что поток возвращаемый методом ping()
, когда-нибудь завершится. А это уже выглядит как утечка памяти.
Надо сделать правильный порядок вещей:
ngOnInit() {
interval(1000)
.pipe(
switchMap(() => this.apiService.ping()),
takeUntil(this.destroy$),
)
.subscribe();
}
Теперь никаких утечек😃.
Я упомянул, что takeUntil
для автоматической отписки надо писать прямо перед subscribe
, но почему не в конце пайпа, как мы делали с shareReplay
? Например, вот так:
ngOnInit() {
const randomTimer = interval(1000).pipe(
map(() => Math.random()),
shareReplay({ refCount: true, bufferSize: 1 }),
takeUntil(this.destroy$),
);
}
Если мы к нашему randomTimer
перед подпиской добавим что-нибудь еще, например, знакомый нам switchMap
:
ngOnInit() {
const randomTimer = interval(1000).pipe(
map(() => Math.random()),
shareReplay({ refCount: true, bufferSize: 1 }),
takeUntil(this.destroy$),
);
randomTimer.pipe(switchMap(() => this.apiService.ping())).subscribe();
}
Мы сможем снова сказать «привет 👋» утечке памяти. Мы не можем контролировать то, как будет использоваться наш поток в дальнейшем, поэтому механизм автоматической подписки надо реализовывать не во всех пайпах, а только перед вызовом subscribe
. Давайте исправим утечку:
ngOnInit() {
const randomTimer = interval(1000).pipe(
map(() => Math.random()),
shareReplay({ refCount: true, bufferSize: 1 }),
);
randomTimer.pipe(
switchMap(() => this.apiService.ping()),
takeUntil(this.destroy$),
).subscribe();
}
Не стоит забывать отписываться от подписок. Даже если вы подписываетесь на один из методов HttpClient
. Даже если вы это делаете в сервисе, который providedIn: 'root'
. Отписывайтесь. Всегда.
Больше takeUntil
takeUntil
можно использовать не только для автоматических отписок. Расширим наш компонент, который делал пинг:
- Пинг происходит только тогда, когда курсор находится вне компонента.
- Как только курсор перемещается на компонент, мы перестаем делать пинг.
- Также не стоит забывать про автоматическую отписку, мы же не хотим утечек памяти.
Пропустим стадию гугления различных операторов, код я уже написал:
ngOnInit() {
interval(1000).pipe(
takeUntil(this.mouseIn$),
repeatWhen(() => this.mouseOut$),
switchMap(() => this.apiService.ping()),
takeUntil(this.destroy$),
).subscribe();
}
Пффф... Осталось понять, что тут происходит. Начнем с диаграммы:
// interval
-----0-----1--| -----0---|
// takeUntil(this.mouseIn$)
--------------t-----------|
// repeatWhen
-----------------r--------|
// switchMap
-----+-----+----------+---|
\ \ \
--s| --s| --|
// takeUntil(this.destroy$)
-------------------------d|
// subscribe
---------e-----e----------|
Ох... Думаю, что легче не стало. Немного текстового пояснения. Сначала мы подписываемся на interval
. Каждый раз когда interval
эммитит значение, наш switchMap
делает пинг. Как только пинг эммитит значение, мы получаем значение в subscribe
. Пока все просто. Как только this.mouseIn$
заэммитит событие, мы отписываемся только от interval
. Обратите внимание, что switchMap
не отменил работу пинга, мы получили его результат после отписки от интервала. После этого как только this.mouseOut$
получает событие, мы заново подписываемся на interval
. И начинаем всю нашу цепочку заново. Стоит только this.destroy$
получить событие, как мы сразу отписываемся от всех предшествующих потоков, и, получается, что мы отписываемся от всего.
С помощью простых комбинаций операторов можно реализовывать довольно сложную логику.
Можно ли сломать порядок?
А что на счет операторов, которые срабатывают в определенный момент? Например finalize
.
const randomTimer = interval(1000).pipe(
map(() => Math.random()),
finalize(() => console.log('finished')),
);
Мы увидим в консоли 'finished'
, как только от подписки отпишутся. Звучит просто. А если вот так?
const randomTimer = interval(1000).pipe(
map(() => Math.random()),
finalize(() => console.log('finished 1')),
shareReplay({ refCount: true, bufferSize: 1 }),
finalize(() => console.log('finished 2')),
);
const a = randomTimer.subscribe();
const b = randomTimer.subscribe();
a.unsubscribe();
Какие предположения? Увидим ли мы 'finished 1'
? А что насчет 'finished 2'
?
Результатом выполнения этого примера в консоли будет только 'finished 2'
.
Чтобы увидеть 'finished 1'
, надо сделать b.unsubscribe()
. Когда мы это сделаем, сначала появится 'finished 1'
, а затем — 'finished 2'
. Ведь ... да, операторы выполняются последовательно.
Почему так происходит? Помните, как работают операторы? Каждый оператор создает новый поток. А еще shareReplay
, который помогает не создавать каждый раз кучу новых потоков, а переиспользовать один текущий. shareReplay({refCount: true, bufferSize: 1})
живет, пока существует хотя бы одна подписка, поэтому первый finalize
срабатывает не сразу. Но действие shareReplay
не распространяется на операторы, которые идут после него. Поэтому каждый unsubscribe
будет триггерить второй finalize
.
Вот еще несколько операторов, которые выполняются «в определенный момент»: defaultIfEmpty
, throwIfEmpty
, retryWhen
, catchError
, repeatWhen
.
Эти операторы не выполняются в общем потоке обычных операторов, но когда приходит их время, они будут следовать заданной последовательности.
Вместо вывода: валидируем порядок
За порядком операторов следить не просто. shareReplay
расположили не в том месте — получили повторяющиеся вычисления. Забыли takeUntil
— получили утечку памяти. В NgRx эффекте поставили first
после switchMap
, а не внутри, — сломали повторное выполнение экшенов. В экшене на удаление использовали switchMap
, а не mergeMap
, — сломали последовательное удаление множества сущностей.
В голове держать все эти нюансы не просто. К счастью, часть этой ментальной нагрузки можно переложить на процессор. Главное правильно настроить линтер. И в этом нам помогут пакеты eslint-plugin-rxjs и eslint-plugin-rxjs-angular.
В eslint-plugin-rxjs
советую включить как минимум рекомендуемые настройки plugin:rxjs/recommended
.
Если вы используете какой-нибудь стор, который построен на RxJs, типа NgRx, то обратите внимание на эти правила: rxjs/no-cyclic-action
, rxjs/no-unsafe-catch
, rxjs/no-unsafe-first
, rxjs/no-unsafe-switchmap
. Это поможет не допускать простых ошибок в эффектах.
А для Angular надо включить то правило, которое соответствует вашей идеологии по автоматическим отпискам. Я предпочитаю rxjs-angular/prefer-takeuntil
.
Если вы строите свое приложение с помощью NgRx, вам поможет еще один плагин eslint-plugin-rxjs-ngrx. В нем уже есть готовые конфигурации, советую включить plugin:ngrx/recommended
или plugin:ngrx/strict
, это поможет избегать типовых ошибок при использовании NgRx.