Порядок операторов в RxJs

6 минут / Опубликован на Habr

TL;DR: Порядок важен. Операторы довольно атомарны и зачастую очень просты, но это не мешает им объединяться в сложные последовательности, в которых легко допустить ошибку. Давайте разберемся.

Будет очень много marble диаграмм, извините. Пара ресурсов для тех, кто не в курсе про marble диаграммы: How to Read an RxJS Marble Diagram и Testing RxJS Code with Marble Diagrams. В некоторых очень спорных местах я добавил подсказки: подскажет, когда конкретно произошло событие; означает, что произошло два синхронных события подряд.

Простая последовательность

Начнем с обычного интервала.

interval(1000).subscribe(console.log);

Его диаграмма будет такой:

// interval(1000)
-----0-----1-----2-----3-----4--->

Интервал выводит числа по возрастанию, давайте прокачаем его и добавим немного случайности с помощью Math.random():

interval(1000)
.pipe(map(() => Math.random()))
.subscribe(console.log);

Для упрощения диаграммы, будем считать, что Math.random() возвращает целые числа.

Обновим диаграмму:

// interval(1000)
-----0-----1-----2-----3-----4--->
// map(() => Math.random())
-----1-----6-----3-----2-----9--->

Ждать первое значение целую секунду — это долго. Хочется сразу после подписки видеть значение в консоли. Есть прекрасный оператор startWith. Добавим его в нашу цепочку операторов:

interval(1000)
.pipe(
map(() => Math.random()),
startWith(-1),
)
.subscribe(console.log);

Можете предположить результат?

Помните, я говорил, что порядок важен. Это как раз хороший пример для данного утверждения. Давайте посмотрим на диаграмму:

// interval(1000)
-----0-----1-----2-----3-----4--->
// map(() => Math.random())
-----1-----6-----3-----2-----9--->
// startWith(-1)
(-1)-1-----6-----3-----2-----9--->
↑ ↑

Упс. Значение добавляется после нашей функции случайности. Название немного сбивает, и, мне кажется, что используя startWith в первый раз, я поставил этот оператор не туда.

Как это работает: под капотом оператор создаёт новый стрим, который передается в следующий оператор. Поэтому получается, что startWith принимает стрим, который пришел из map, и уже в этот стрим записывается первое значение.

Окей, теперь зная это, поправим код так, чтобы все цифры проходили через map, в том числе и результат startWith.

interval(1000)
.pipe(
startWith(-1),
map(() => Math.random()),
)
.subscribe(console.log);

Получим такую диаграмму:

// interval(1000)
-----0-----1-----2-----3-----4--->
// startWith(-1)
(-1)-0-----1-----2-----3-----4--->
↑ ↑
// map(() => Math.random())
2----1-----6-----3-----2-----9--->

Перфекто.

У меня есть к вам вопрос: что будет в консоли (или как будет выглядеть диаграмма) при такой последовательности операторов?

interval(1000)
.pipe(
startWith(-1),
map(() => Math.random()),
startWith('a'),
)
.subscribe(console.log);

Операторы же выполняются по порядку. Ведь так? Ведь так!?

Да, все так. Только надо внимательно следить за происходящим и помнить: каждый оператор создает новый поток. Разберем по шагам:

  1. Создаем поток из interval(1000);
  2. К этому потоку startWith добавляет в самое начало -1;
  3. Выполняем Math.random();
  4. К потоку из предыдущего шага следующий startWith в самое начало добавляет 'a';
  5. Сразу после подписки мы увидим 'a', следом за ним будет результат Math.random(), который выполнился из-за -1. Все это будет происходить синхронно.
  6. Остальные значения будут выводиться асинхронно в консоли раз в секунду.

Диаграмма все упростит (надеюсь):

// interval(1000)
-----0-----1-----2-----3-----4--->
// startWith(-1)
(-1)-0-----1-----2-----3-----4--->
↑ ↑
// map(() => Math.random())
2----1-----6-----3-----2-----9--->
// startWith('a')
a2---1-----6-----3-----2-----9--->

Порядок операторов важен. Но иногда бывает не так очевидно, что придет в subscribe, в какой последовательности и почему.

shareReplay

Есть операторы (или группы операторов), которые требуют к себе особого внимания: их неправильное использование может привести к утечкам памяти.

Начнем с группы операторов шаринга состояния. Все примеры будут с оператором shareReplay, но это применимо и к другим операторам и их комбинациям из «sharing группы».

Мой общий совет звучит так: shareReplay должен быть последним оператором в .pipe(). Если вы располагаете его в другом месте, либо вы знаете, зачем он там нужен, либо совершаете ошибку. Разберемся почему.

Взглянем на shareReplay, точнее, на его отсутствие:

const randomTimer = interval(1000).pipe(map(() => Math.random()));

randomTimer.subscribe(console.log);
randomTimer.subscribe(console.log);

Каждый subscribe создает свой поток интервалов, который потом преобразуется с помощью Math.random(). Получается, что каждую секунду мы будем видеть 2 цифры в консоли, но они будут разные.

// subscribe
-----3-----7-----1-----8-----9--->
// subscribe
-----5-----2-----1-----7-----0--->

Если мы сделаем подписку на один из потоков асинхронной:

const randomTimer = interval(1000).pipe(map(() => Math.random()));

randomTimer.subscribe(console.log);
setTimeout(() => {
randomTimer.subscribe(console.log);
}, 500);

то каждая подписка будет писать в консоль число независимо от предыдущей. Будет такая диаграмма результата:

// subscribe
-----3-----7-----1-----8-----9--->
// setTimeout
----5-----2-----1-----7-----0->

Обратите внимание, что вторая подписка начинается не сразу.

Если мы хотим во всех подписках использовать один и тот же интервал, а не создавать его каждый раз заново, то без shareReplay не обойтись:

const randomTimer = interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 }),
map(() => Math.random()),
);

randomTimer.subscribe(console.log);
setTimeout(() => {
randomTimer.subscribe(console.log);
}, 500);

Угадаете, какой будет диаграмма?

// subscribe
-----3-----7-----1-----8-----9--->
// setTimeout
--5-----2-----1-----7-----0--->

Да, мы подписываемся с задержкой в 500мс. Но так как мы шарим интервал, задержка не важна: значения будут отображаться в консоли одновременно. Давайте поменяем интервал на 1500мс:

const randomTimer = interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 }),
map(() => Math.random()),
);

randomTimer.subscribe(console.log);
setTimeout(() => {
randomTimer.subscribe(console.log);
}, 1500);

Диаграмма:

// subscribe
-----3-----7-----1-----8-----9--->
// setTimeout
5--2-----1-----7-----0--->

shareReplay работает таким образом, что запоминает последнее значение, и, если оно было, мы получаем его мгновенно, без задержек. А все последующие значения будут отображаться во время срабатывания таймера.

Идем дальше. А что если нам надо шарить результат и Math.random() в том числе? Надо поместить shareReplay чуть-чуть подальше:

const randomTimer = interval(1000).pipe(
map(() => Math.random()),
shareReplay({ refCount: true, bufferSize: 1 }),
);

randomTimer.subscribe(console.log);
setTimeout(() => {
randomTimer.subscribe(console.log);
}, 1500);

Думаю, что диаграмма окажется вполне очевидной:

// subscribe
-----3-----7-----1-----8-----9--->
// setTimeout
3--7-----1-----8-----9--->

В наших примерах, когда shareReplay находился перед map, это был баг, а не фича. Как я упоминал ранее, операторы, подобные shareReplay, в большинстве случаев надо использовать в конце пайпа. Я обычно добавляю этот оператор в конце каждого пайпа.

takeUntil и автоматическая отписка

* Сейчас я буду говорить про один из классических для Angular подходов автоматической отписки. Если вы используете RxJs в хуках React'а и/или используете другой подход автоматической отписки, то это правило именно в такой формулировке к вам не применимо. Об использовании takeUntil по другому поговорим чуть позже. На этом подходе можно хорошо показать важность правильного расположения операторов друг между другом.

Перейдем ко второй группе: операторы завершения потока. Примеры будут с takeUntil, но это применимо также и к другим операторам аналогичной группы.

К takeUntil в рамках Angular я тоже применяю одно общее правило использования: takeUntil должен быть последним оператором перед subscribe и должен быть всегда.

Почему так? Разберем на примере одного из типовых применений takeUntil в Angular-компонентах, но с неправильным порядком операторов.

class MyComponent {
private destroy$ = new ReplaySubject(1);

ngOnInit() {
interval(1000)
.pipe(
takeUntil(this.destroy$),
switchMap(() => this.apiService.ping()),
)
.subscribe();
}

ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}

Что тут произойдет? Если в момент того, как срабатывает destroy$, запрос находится ещё в процессе, то мы его не отменим. И, кстати, мы не можем гарантировать, что поток возвращаемый методом ping(), когда-нибудь завершится. А это уже выглядит как утечка памяти.

Надо сделать правильный порядок вещей:

ngOnInit() {
interval(1000)
.pipe(
switchMap(() => this.apiService.ping()),
takeUntil(this.destroy$),
)
.subscribe();
}

Теперь никаких утечек😃.

Я упомянул, что takeUntil для автоматической отписки надо писать прямо перед subscribe, но почему не в конце пайпа, как мы делали с shareReplay? Например, вот так:

ngOnInit() {
const randomTimer = interval(1000).pipe(
map(() => Math.random()),
shareReplay({ refCount: true, bufferSize: 1 }),
takeUntil(this.destroy$),
);
}

Если мы к нашему randomTimer перед подпиской добавим что-нибудь еще, например, знакомый нам switchMap:

ngOnInit() {
const randomTimer = interval(1000).pipe(
map(() => Math.random()),
shareReplay({ refCount: true, bufferSize: 1 }),
takeUntil(this.destroy$),
);

randomTimer.pipe(switchMap(() => this.apiService.ping())).subscribe();
}

Мы сможем снова сказать «привет 👋» утечке памяти. Мы не можем контролировать то, как будет использоваться наш поток в дальнейшем, поэтому механизм автоматической подписки надо реализовывать не во всех пайпах, а только перед вызовом subscribe. Давайте исправим утечку:

ngOnInit() {
const randomTimer = interval(1000).pipe(
map(() => Math.random()),
shareReplay({ refCount: true, bufferSize: 1 }),
);

randomTimer.pipe(
switchMap(() => this.apiService.ping()),
takeUntil(this.destroy$),
).subscribe();
}

Не стоит забывать отписываться от подписок. Даже если вы подписываетесь на один из методов HttpClient. Даже если вы это делаете в сервисе, который providedIn: 'root'. Отписывайтесь. Всегда.

Больше takeUntil

takeUntil можно использовать не только для автоматических отписок. Расширим наш компонент, который делал пинг:

Пропустим стадию гугления различных операторов, код я уже написал:

ngOnInit() {
interval(1000).pipe(
takeUntil(this.mouseIn$),
repeatWhen(() => this.mouseOut$),
switchMap(() => this.apiService.ping()),
takeUntil(this.destroy$),
).subscribe();
}

Пффф... Осталось понять, что тут происходит. Начнем с диаграммы:

// interval
-----0-----1--| -----0---|
// takeUntil(this.mouseIn$)
--------------t-----------|
// repeatWhen
-----------------r--------|
// switchMap
-----+-----+----------+---|
\ \ \
--s| --s| --|
// takeUntil(this.destroy$)
-------------------------d|
// subscribe
---------e-----e----------|

Ох... Думаю, что легче не стало. Немного текстового пояснения. Сначала мы подписываемся на interval. Каждый раз когда interval эммитит значение, наш switchMap делает пинг. Как только пинг эммитит значение, мы получаем значение в subscribe. Пока все просто. Как только this.mouseIn$ заэммитит событие, мы отписываемся только от interval. Обратите внимание, что switchMap не отменил работу пинга, мы получили его результат после отписки от интервала. После этого как только this.mouseOut$ получает событие, мы заново подписываемся на interval. И начинаем всю нашу цепочку заново. Стоит только this.destroy$ получить событие, как мы сразу отписываемся от всех предшествующих потоков, и, получается, что мы отписываемся от всего.

С помощью простых комбинаций операторов можно реализовывать довольно сложную логику.

Можно ли сломать порядок?

А что на счет операторов, которые срабатывают в определенный момент? Например finalize.

const randomTimer = interval(1000).pipe(
map(() => Math.random()),
finalize(() => console.log('finished')),
);

Мы увидим в консоли 'finished', как только от подписки отпишутся. Звучит просто. А если вот так?

const randomTimer = interval(1000).pipe(
map(() => Math.random()),
finalize(() => console.log('finished 1')),
shareReplay({ refCount: true, bufferSize: 1 }),
finalize(() => console.log('finished 2')),
);

const a = randomTimer.subscribe();
const b = randomTimer.subscribe();

a.unsubscribe();

Какие предположения? Увидим ли мы 'finished 1'? А что насчет 'finished 2'?

Результатом выполнения этого примера в консоли будет только 'finished 2'.

Чтобы увидеть 'finished 1', надо сделать b.unsubscribe(). Когда мы это сделаем, сначала появится 'finished 1', а затем — 'finished 2'. Ведь ... да, операторы выполняются последовательно.

Почему так происходит? Помните, как работают операторы? Каждый оператор создает новый поток. А еще shareReplay, который помогает не создавать каждый раз кучу новых потоков, а переиспользовать один текущий. shareReplay({refCount: true, bufferSize: 1}) живет, пока существует хотя бы одна подписка, поэтому первый finalize срабатывает не сразу. Но действие shareReplay не распространяется на операторы, которые идут после него. Поэтому каждый unsubscribe будет триггерить второй finalize.

Вот еще несколько операторов, которые выполняются «в определенный момент»: defaultIfEmpty, throwIfEmpty, retryWhen, catchError, repeatWhen.

Эти операторы не выполняются в общем потоке обычных операторов, но когда приходит их время, они будут следовать заданной последовательности.

Вместо вывода: валидируем порядок

За порядком операторов следить не просто. shareReplay расположили не в том месте — получили повторяющиеся вычисления. Забыли takeUntil— получили утечку памяти. В NgRx эффекте поставили first после switchMap, а не внутри, — сломали повторное выполнение экшенов. В экшене на удаление использовали switchMap, а не mergeMap, — сломали последовательное удаление множества сущностей.

В голове держать все эти нюансы не просто. К счастью, часть этой ментальной нагрузки можно переложить на процессор. Главное правильно настроить линтер. И в этом нам помогут пакеты eslint-plugin-rxjs и eslint-plugin-rxjs-angular.

В eslint-plugin-rxjs советую включить как минимум рекомендуемые настройки plugin:rxjs/recommended.

Если вы используете какой-нибудь стор, который построен на RxJs, типа NgRx, то обратите внимание на эти правила: rxjs/no-cyclic-action, rxjs/no-unsafe-catch, rxjs/no-unsafe-first, rxjs/no-unsafe-switchmap. Это поможет не допускать простых ошибок в эффектах.

А для Angular надо включить то правило, которое соответствует вашей идеологии по автоматическим отпискам. Я предпочитаю rxjs-angular/prefer-takeuntil.

Если вы строите свое приложение с помощью NgRx, вам поможет еще один плагин eslint-plugin-rxjs-ngrx. В нем уже есть готовые конфигурации, советую включить plugin:ngrx/recommended или plugin:ngrx/strict, это поможет избегать типовых ошибок при использовании NgRx.