RxJS subscription management: when to unsubscribe (and when not)

Daan Scheerens - Mar 5, 2020



For many people RxJS subscriptions are a lot like household chores: you don't really like them but there is always this nagging voice in your head telling you not to ignore them. Unlike household chores, however, it is quite easy to pretend subscriptions don't exist and get away with it.

For a while things will work just fine. But every so now and then your application exhibits some minor glitch. And as your application starts growing the glitches increase in both frequency and severity. So, don't be an ostrich, deal with your daemons and accept the fact that subscriptions are part of your life as developer!

Subscriptions are often undervalued, misunderstood and dealt with incorrectly. Continue reading to learn more about their value and also discover that they are actually not that hard to tame.

What is a subscription?

RxJS is all about observables: data streams that can emit events (some carrying data) over time. An observable by itself is not very useful unless you use it:

  • as input to create new observables (via the operators or combine functions)

  • to process the emitted events (via the subscribe function)

When subscribing to an observable, you will be notified whenever an event occurs by invoking the callback function(s) that were passed as argument to the subscribe function. You will continue to receive these events until the observable terminates with a complete or error event. Often, however, your application reaches a certain state in which you no longer wish to receive events before the observable has terminated.

One way to deal with that is by maintaining a boolean variable which flips to true once the application should no longer react to any of the events emitted by your observable. This probably sounds awfully familiar to anyone that has worked with a Promise before.

1 2 3 4 5 6 7 8 9 10
// Let's listen for messages and process them. let stopListening = false; message$.subscribe((message) => { if (!stopListening) { processMessage(message); } }); // And at some point we don't care about the messages any more, so we flip the switch. stopListening = true;

Fortunately, with RxJS you are offered a more convenient method: subscriptions. Whenever you subscribe to an observable, you'll get back a Subscription object. Once you no longer wish to receive events from the observable you can simply use this object to unsubscribe:

1 2 3 4 5
// Let's listen for messages and process them. const messageSubscription = message$.subscribe((message) => processMessage(message)); // And at some point we don't care about the messages any more, so we unsubscribe. messageSubscription.unsubscribe();

Much cleaner.

Why should you care about subscriptions?

Given the fact that RxJS always returns you a Subscription object whenever you subscribe to an observable, do you really need to do anything with it? The short answer is yes, the long answer is it depends. For now, let's stick to the simple answer and assume you should always unsubscribe. This is why:

  1. It prevents unwanted actions in parts of your application that should no longer be active.

  2. It prevents memory leaks.

  3. Resources can be freed up.

Preventing unwanted actions

The first reason to unsubscribe is already illustrated in the code example shown in the previous section. Suppose processMessage is part of a UI component that displays the message in a toast notification. Even after the component is destroyed the message$ stream will continue emitting messages. If we don't unsubscribe, bad things might happen. What exactly happens when the component is disposed of depends on the reference of the DOM element to which the toast notifications are added.

  • The reference still exists and the DOM element is still attached to the document. Toast notifications are still displayed, probably to the surprise of the user, who no longer expects them to appear.

  • There is still a reference to the DOM element to which the toast notifications are added, however that element is no longer attached to the document. Notifications are still created, but the user won't see them.

  • The DOM element reference no longer exists (it was set to undefined or null during destruction). At this point a Cannot read property 'appendChild' of undefined error is thrown and (parts of) the application might crash.

Preventing memory leaks

Another bad thing that can happen if you forget to unsubscribe is that it can cause memory leaks. Although they are quite common, their symptoms are less often observed. In many cases the leaks are just not severe enough to be noticeable.

If they do become a problem, then you'll notice your application gets slower the more you're using it. Eventually the whole application might become completely unresponsive and it crashes. Tracking the source of the problem can be real pain. A great tool to find the leaks is by using the heap snapshot feature from the Chrome DevTools.

How do these memory leaks occur?

Most often because the handler functions passed along with the subscribe call keep a reference to some object that has been destroyed. Since it has been destroyed you expect the garbage collector to do its job and purge the object from the heap memory. However, because there is still a reference to that object (as part of the event handler function), it is not cleaned up by the garbage collector.

Returning to our earlier example of the component that displays incoming messages as toast notifications, this leak can be as simple as a reference to the component itself:

1 2 3 4 5 6 7 8 9 10 11 12
class MessageComponent { initialize(): void { const message$ = getMessageStream(); this.message$.subscribe((message) => { this.processMessage(); // closure contains `this`: a reference to the component instance }); } processMessage(message): void { /* ... */ } }

On line 7 of the code snippet above the next event handler function contains a reference to this. As a result, that variable becomes part of the closure. Because this refers to the component instance, the component is not garbage collected, even after it has been destroyed!

Releasing resources

Observables always have a source: the thing that produces the events. A HTMLButtonElement for example can be used as the source to generate a stream of click events. For some observables, specific resources are allocated every time they get subscribed to. The most common example of this in browser environments is when making an HTTP request:

1 2 3
import { ajax } from 'rxjs/ajax'; ajax.get('https://www.example.com/').subscribe(console.log);

In this example we're using the RxJS ajax API to perform an HTTP GET request. The result is an observable. So, to print out the response we need to subscribe. When you do, RxJS creates a new XMLHttpRequest under the hood.

As you know: it takes a while for the response to be received During that time, our application might already have decided that it is no longer interested in the response. In that case the right thing to do is: unsubscribe!

Unsubscribing opens up an opportunity: it is possible to abort the request. This will free up one of the available connections! So that is exactly what RxJS does in the unsubscribe cleanup process for an AjaxObservable.

An example of this behavior is shown in the animation below.

Since your browser usually has a limit of 6 parallel connections, making sure to release them as soon as possible can have a big impact on the perceived performance. If you don't unsubscribe, other requests might be queued while waiting for an available connection.

Note that when using HTTP/2 this is less of an issue, because it can multiplex several requests over the same connection. Nonetheless, it is still better to clean up the subscriptions for the other reasons described earlier.

Making an HTTP request is just one example where unsubscribing will help to free up allocated resources. In general, this can be anything. Due to the RxJS architecture an observable source can implement any cleanup logic whenever unsubscribe is called. For example, when RxJS is used in the context of a Node.js application you can model file reading using observable streams. Unsubscribing in this scenario would mean releasing the file handle.

When unsubscribing is not required

So given the reasons mentioned in the previous sections, does that mean it is better to always unsubscribe? In general: yes, you should. However, there are certain scenarios in which is okay not to explicitly unsubscribe. There are even some cases in which it can actually be wrong, but we'll explore those scenarios later. First let's see an example of a case where unsubscribing is not required.

1 2 3 4 5 6 7 8
import { BehaviorSubject } from 'rxjs'; import { first } from 'rxjs/operators'; const message = new BehaviorSubject('Hello RxJS'); message .pipe(first()) .subscribe(console.log);

In this (contrived) example we are subscribing to an observable that is a BehaviorSubject. Due to the nature of this subject we know that it will always emit a value directly after subscribing to it. The first operator will terminate the resulting observable stream after the first next event. Combining these two pieces of information leads us to the logical conclusion that immediately after subscribing to message.pipe(first()) the following happens:

  • The message is printed.

  • The subscription is closed.

So in this example there is clearly no need to unsubscribe.

Syncrhonous termination

When an observable terminates, all subscriptions for that stream are automatically closed and you do not have to unsubscribe. Some observables terminate directly after subscribining (synchronous termination). With this we can define the following general rule:

There is no need to unsubscribe when the source observable synchronously terminates.

This raises the question: how do you know if an observable synchronously terminates? Unfortunately, there is no easy answer: you can only tell by inspecting the context: how was it defined, what does the observable represent and how does it behave. One of the first things you might want to find out is if it keeps a history and whether it synchronously emits that history upon subscription. Signs to look for are: ReplaySubject, BehaviorSubject, of, publishReplay and shareReplay. If you find such a cue and you are only interested in the first emitted value, then applying the first operator will give you an observable that is guaranteed to synchronously terminate.

Asyncrhonous termination

A common misconception about making HTTP requests is that it is not necessary to unsubscribe. Such observables will terminate at some point, either when the response has been received or if there was a connection error. Since the stream terminates we know that the subscription will be closed anyway. No need to unsubscribe, right?

Wrong!

The key here is that it terminates asynchronously, meaning that you still run the risk of having unwanted effects when you're no longer interested in the result. Furthermore, you lose the ability to abort the connections, which can have a severe impact on loading times of other HTTP requests that are queued up.

Does that mean it is always necessary to keep track of a subscription and unsubscribe when an observable terminates asynchronously?

In the vast majority of the cases the answer is yes. However, there is an exception: if (based on the definition of the observable) you are 100% sure that it will terminate before the object that made the subscription is destroyed, then it is okay to ignore the subscription.

Keep in mind though that such cases are rare and it is safer not to depend on this behavior. Your application might be changed later by someone who is not aware of this. That person could change the definition in such a way that termination is no longer guarenteed to occur before the object's destruction.

When NOT to unsubscribe

As mentioned in the previous section, there are situations in which unsubscribing is actually a bad thing. This is the case when the observable represents an action that performs state modification outside of your application. Examples are writing to a file or performing an HTTP PUT request to update an entity in the backend. Unsubscribing from such observables may lead to the action being canceled. Sometimes that may be the desired result, but often this is not the intended effect.

Imagine your application shows a dialog with a form to edit and save some data. The component representing that dialog performs the action of saving the data using an HTTP PUT request. Assuming that the request is modeled as an observable, you'll need to subscribe to actually make the request. Now, if we follow our earlier rules for cleaning up subscriptions: closing the dialog while the HTTP request is pending, will result in the request being canceled. As a result, the changes might not have been saved. Probably not what the user expected!

Unsubscribing in such cases is therefore not desirable. But if you don't unsubscribe this still could lead to unwanted side effects and memory leaks! So, what is the correct course of action?

Make sure all outbound state modifications represented as observables are encapsulated in a service or repository class.

This ensures that the service can take care of making sure the action is always performed, regardless of whether a consumer of the service actually subscribes to the observable. In addition, the service can ensure that the action can be shared with multiple subscribers and won't be canceled when one of the consumers unsubscribes.

As this is very common, it may be useful to have a tool available to easily apply this approach to your services. A nice solution is to create a custom RxJS operator:

1 2 3 4 5 6 7 8 9 10 11 12
import { ConnectableObservable, MonoTypeOperatorFunction, Observable } from 'rxjs'; import { publishReplay } from 'rxjs/operators'; export function commit<T>(): MonoTypeOperatorFunction<T> { return (source$: Observable<T>) => { const output$ = source$.pipe(publishReplay(1)); (output$ as ConnectableObservable<T>).connect(); return output$; } }

Basically, the commit operator turns a cold observable into a hot observable and stores the last emitted value. It performs the following three functions:

  1. It makes sure that the upstream observable (source$) only gets subscribed to once (publishReplay).

  2. The observable is subscribed to directly (connect).

  3. The result is stored for late subscribers (publishReplay(1)).

Returning to our earlier example of a dialog to edit and save an entity, a simplified implementation could look like the following:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
import { Observable } from 'rxjs'; import { ajax } from 'rxjs/ajax'; import { commit } from './custom-rxjs-operators'; export class MyEntityRepository { public update(entity: Entity): Observable<Entity> { const url = `https://my-app.com/api/1.0/my-entities/${entity.id}`; const body = this.serialize(entity); const headers = { 'Content-Type': 'application/json' }; return ajax.put(url, body, headers).pipe( map((response) => this.deserialize(response)), // Makes sure the request is made directly and prevent canceling when unsubscribing. commit() ); } } export class MyEntityDialog { constructor( private readonly myEntityRepository: MyEntityRepository ) { } // Function is called when the save button in the dialog is pressed. public onSave(): void { // Use repository to save the entity. Returns an observable, but no need to subscribe/unsubscribe. this.myEntityRepository.update(this.entity); this.close(); } }

In this example calling update on the MyEntityRepository instance results in an Observable. Since the repository uses the commit operator there is no need to explicitly subscribe to trigger the actual HTTP PUT request. This would only be necessary if the dialog needs to act on the response. If this was the case, then it would also be safe to unsubscribe: the commit operator makes sure that the request won't be canceled.

Managing subscriptions

Keeping track of subscriptions to clean them up at the right time can be a tedious task. Fortunately, there are a few tricks to make this easier.

Your best option is not to subscribe at all!

Ultimately, you'll need to subscribe at some point. However, quite often it is possible to postpone the actual subscription. Instead try to get rid of imperative thinking and try to continue following the declarative/reactive approach: model the desired output as an observable instead. If you find yourself pushing data to a Subject when subscribing to an observable, this usually means you can get rid of the subject. Redefine it as an observable, based on the source observable you were originally subscribing to.

There is a limit to how far you can go (or are willing to go) in postponing observable subscriptions. You'll need to subscribe to have your application produce any output. So once you actually do subscribe, there are two methods to easily keep track of subscriptions and unsubscribe at the right time: an imperative approach and a declarative approach.

Let's start with the imperative approach as it is the simplest. If you take a look at the Subscription class, you can see that is has an add method. This method allows you to add child subscriptions. Calling unsubscribe on the parent will automatically unsubscribe all child subscriptions. You can use this to your advantage by creating one root subscription and add all subscriptions to it that are bound to the same lifecycle:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
import { Subscription, interval } from 'rxjs'; export class SomethingReactive { private readonly subscriptions = new Subscription(); public init(): void { this.subscriptions.add( interval(60000).subscribe(() => alert('A minute has passed!')) ); // More subscriptions... this.subscriptions.add(/* ... */); this.subscriptions.add(/* ... */); this.subscriptions.add(/* ... */); } public destroyed(): void { this.subscriptions.unsubscribe(); } }

If you prefer a more declarative approach, then the takeUntil operator is your friend. This operator creates a new observable from its source which terminates when another specified observable emits a next event. Termination of an observable also closes subscriptions, which is what we're after.

By using the takeUntil operator in conjunction with a stream that represents the lifecycle end event of your class's instance, you can make sure subscriptions are closed at the right time. The lifecycle end can be captured as a stream using a Subject:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
import { Subject, interval } from 'rxjs'; import { takeUntil } from 'rxjs/operators' export class SomethingReactive { private readonly destroy$ = new Subject<void>(); public init(): void { interval(60000) .pipe(takeUntil(this.destroy$)) .subscribe(() => alert('A minute has passed!')) // More subscriptions... } public destroy(): void { this.destroy$.next(); } }

A word of caution: when using the declarative approach for terminating observable streams always make sure the takeUntil operator is the last one in the operator chain.

If this isn't the last operator of the chain, then this might cause the stream to continue anyway. catchError or concat are examples of operators that can prevent the streaming from completing if they follow the takeUntil operator.

As an alternative to manually capturing the lifecycle end event using a Subject you might want to look for framework specific extensions that can provide those streams for you. For Angular applications the @ngneat/until-destroy library is a good choice. This library provides a custom RxJS operator that makes sure the resulting observable terminates when the ngOnDestroy lifecycle event is fired.

Summary

If there is one thing to take away from this article, this it is the following:

Whenever you write .subscribe(, pause for a second and think about what you need to do with the subscription!

Don't just toss the subscription aside, unless you like to experience application glitches and performance issues! In general, you want to keep a reference to subscriptions, so that you can unsubscribe at the right time. This not only prevents unwanted side effects and memory leaks, but can also help release limited resources such as network connections and file handling.

You can follow these rules to determine if you need to unsubscribe or not:

Always unsubscribe, except if one of the following cases applies:

  • The observable is guaranteed to terminate synchronously upon subscription.

  • The stream terminates before or at the same time as the lifecycle end of the object that made the subscription.

  • Unsubscribing would result in the cancellation of an outbound state modification.

If the latter applies to your situation, you probably want to make sure the observable is multicasted. In doing so you'll prevent the action from being executed multiple timesby mistake (once for each subscriber). Likely, you also might want to make sure it is immediately subscribed to when the modification action is issued. This makes sure that the action is performed directly, regardless of whether any consumers will subscribe to the resulting observable. Since such actions are quite common for many applications you can make your life easier by introduction a custom RxJS operator that takes care of this behavior. A reference implementation of this operator, called commit, can be found in this article.