Understanding RxJS through the lens of the Observer Pattern

Dennis Gijzen - Mar 21, 2023

When being a front-end developer, a very popular library for working with asynchronous code is the 'RxJS library'. The library is an extension of the 'Observer pattern' which is a software design pattern used to react on triggers from instances of objects under surveillance. Such as for example clicks of the mouse or network responses.

Code is asynchronous when its time of execution does not follow the linear flow of the surrounding/enclosing code block. To get around the problem of running dependent pieces of code on time, when not knowing when this time exactly is, we can let triggers be situational thresholds on which we react to run other pieces of code (or not) the moment these triggers happen. It is a common software problem that is easily solved. Although RxJS might be in the stack of many developers we sometimes get so used to calling the high-end methods of the libraries we forget what it is all about. This article aims to give an overview to better the understanding of the underlying structure of RxJS which would hopefully lead to a better coding experience. We will be looking into the observer pattern but before we go there let's look at a more stripped-down version of the observer pattern which is using asynchronous callbacks. Callbacks make for a good start to begin explaining the workings of the observer pattern and from this work our way up to the basic structure of RxJS.

What is a callback?

A callback is a reference to a method that is passed as an argument to an instance of an object. This method can be invoked as a reaction of any observed event inside the object. It is nothing special really. An example could be something like this.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
class KeyboardEvent { private callback?: () => void; public setCallbackOnKeyboardEvent(callback: () => void): void { this.callback = callback; } public keyboardTriggerEvent(): void { this.callback?.(); } } const keyboardEvent = new KeyboardEvent(); keyboardEvent.setCallbackOnKeyboardEvent(() => console.log('Bonjour!')); keyBoardEvent.keyboardTriggerEvent(); // Mocking an event here.

Output:

1
"Bonjour!"

We have a class that listens to keyboard events, each moment a keyboard event is triggered we call the pre-set callback. Not that difficult. The callback in turn could cascade some other methods depending on your requirements (or creativity). We can somewhat rudely say that we are now observing the keyboard event with another piece of code. But it is not necessarily perfect, currently we can only set one callback method but what if we would like to have multiple pieces of code acting on this, multiple callbacks independently being triggered. To account for this we could go back to the code and change it a bit and have the option to assign multiple callbacks to the same event.

1 2 3 4 5 6 7 8 9 10 11
class KeyboardEvent { private listOfCallbacks: (() => void)[] = []; public setCallbackOnKeyboardEvent(callback: () => void): void { this.listOfCallbacks.push(callback); } private keyboardTriggerEvent(): void { this.listOfCallbacks.forEach((callback) => callback()); } }

This is becoming better. Still, we can only append the list and it would be nice to stop listening too when we don't want certain pieces of code to trigger anymore. We again could go back and extend it to make sure we can also unsubscribe to the list.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
class KeyboardEvent { private listOfCallbacks: (() => void)[] = []; public setCallbackOnKeyboardEvent(callback: () => void): void { this.listOfCallbacks.push(callback); } public removeCallbackOnKeyboardEvent(callbackToBeRemoved: () => void): void { this.listOfCallbacks = this.listOfCallbacks.filter((callback) => callbackToBeRemoved !== callback); } private keyboardTriggerEvent(): void { this.listOfCallbacks.forEach((callback) => callback()); } }

Now we just need to make sure we always unsubscribe if we don't care anymore about the code triggering. Doesn't it already start to sound somewhat familiar to the process of RxJS? We have come close now to implementing an observer pattern, that pattern on which RxJS is broadly based upon.

Observer Pattern

Going from the above piece of code, with just merely callbacks, to the observer pattern would be by using Subjects and Observer interfaces. These Subjects would be objects that would react to an event and trigger the callbacks from the other objects that implemented the Observer interface. We could translate to something like this:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
class Observer { private _name: string; constructor(public name: string) { this._name = name; } public act(str: string): void { console.log(`${this._name} acts on update: ${str}.`); } } class Subject { private _state!: string; private readonly observers: Observer[] = [] public set state(newState: string) { this._state = newState; this.notifyAllObservers(); } public attach(observer: Observer): void { this.observers.push(observer); } public detach(observerToRemove: Observer): void { this.observers = this.observers.filter((observer) => observer !== observerToRemove); } private notifyAllObservers(): void { this.observers.forEach((observer) => observer.act(this._state)); } }

Let's try this out and evaluate the results:

1 2 3 4 5 6 7 8 9 10
const subject = new Subject(); const observerA = new Observer('observerA'); const observerB = new Observer('observerB'); subject.attach(observerA); subject.attach(observerB); subject.state = 'Completed 2 cycles of attaching'; subject.detach(observerA); subject.state = 'Completed 1 cycle of detaching';

Which would result in:

1 2 3
"observerA acts on update: Completed 2 cycles of attaching." "observerB acts on update: Completed 2 cycles of attaching." "observerB acts on update: Completed 1 cycle of detaching."

Voila! Seems good! We've gotten ourselves an observer pattern. Now let's go the next step and translate this into RxJS.

RxJS

Although RxJS is an extension of the observer pattern there are notably some differences between these two.

The biggest difference to the observer pattern would arguably be that we can stop with moving the reference to the origin of data everywhere and instead only move references to the data stream. In the previous example of the observer pattern, it was not highlighted directly, but we would always need to drag around a reference to the point of origin through our code. Say for example we have an HTMLButton which we listen to for clicks, in the observer pattern we wouldn't only need to have the HTMLButton everywhere were we would like to start listening to its actions, but we would also need the HTMLButton the moment we would like to stop reacting on it and detach. RxJS removes this situation by encapsulating the HTMLButton and giving us an abstraction of its data stream that we instead can move around and subscribe to. Subscribing to it would give us a reference to our own subscription that we can then use to unsubscribe (/detach). Other differences would become apparent if we would look at the main actors inside the RxJS library. While the observer pattern has two main actors, which are the Subjects and the Observers, in RxJS we see three main actors. These are the Subjects, the Observers, and the Observables.

RxJS Observer

Observers in RxJS are those instances that subscribe to RxJS-Subjects and RxJS-Observables. We can compare them with the Observers in the observer pattern. They are quite like their original brother except that they would have two extra callbacks, one for an error and another for a complete method. Which looks like:

1 2 3 4 5
const observer = { next: x => console.log('Observer received a new value: ', x), error: err => console.error('Error, something is wrong', err), complete: () => console.log('Let us round up'), };

But essentially the RxJS-Observer does (after subscribing) what it's brother does: reacting.

Mind that when you would subscribe in RxJS you are most likely used to passing in an anonymous Observer instance which seems like passing in an anonymous method. Like this:

1 2 3
const rxJsSubject = new Subject<string>(); rxJsSubject.subscribe((str) => console.log('Observer received a new string value:', str));

However, it is possible to do this:

1 2 3 4 5 6 7 8
const rxJsSubject = new Subject<string>(); const rxJsObserver: Observer<string> = { next: (str) => console.log('Observer received a new string value: ', str), error: (err) => console.error('Error, something is wrong', err), complete: () => console.log('Let us round up.'), } rxJsSubject.subscribe(rxJsObserver);

The reason we can go with something like a method while subscribing is because the RxJS's Subject.subscribe() takes in a Partial<Observer<T>> and one of the overloads of this subscribe is the subscribe(next: (value: T) => void): Subscription.

1 2
subscribe(observer?: Partial<Observer<T>>): Subscription; subscribe(next: (value: T) => void): Subscription;

Additionally, as done in the anonymous example, you could also specify error & complete in the subscribe directly instead of using only next. This method however has been deprecated and instead you should pass in an Observer instance as argument in such cases.

Observables

The RxJS-Observable. Although the name might catch you a bit of guard if you quickly scheme over it (Observer, Observable -> po-tay-to, po-tah-to?), Observables have a lot more in common with the Subjects than the Observers of the classic observer pattern. In fact, the RxJS-Subjects are a special form of the RxJS-Observables and RxJS-Observers subscribe to Observables. According to the official website RxJS-Observerables are:

“… lazy push collections of multiple values” RxJS - Observable

This description might be a bit too short to grasp it clearly when seeing it for the first time. To explain it more elaborate: In RxJs the Observables are the main class that implements the Subscribable interface which an observer would use to start observing a stream of events or data, may it be synchronous or asynchronous. An observer would use the subscribe method implemented by the Observable to start listening to the stream of values. An Observable would only start emitting these values when it has been subscribed to (lazily). It then pushes the values to the observer. Observables are always unicast and independent to the observer instance, which then always receives its own collection of values. In layman terms you could say an Observable holds a routine list of values it will independently provide to each Observer. Whenever a new Observer subscribes to it the Observable starts to 'fetch' (or 'create') these items from the list and gives it to the Observer one by one. Hence each observer gets all the values completely independently of any other subscriber. As an overview in contrast to the observer pattern's Subject. RxJS-Observable Observer Pattern's Subject Can be subscribed too. Can be subscribed too. Creates a new instance every time of subscribing. Stays the same instance for every new subscription. Pushes values the moment of subscription (lazy) Pushes values independent of subscription (active) Pushes the same values (repeats the process) to each new subscriber Add new subscribers to a list and passes all new values to each subscriber.

So, although extending from the observer pattern RxJS has its own way of doing things. Let's go over an example to try and make this clearer.

1 2 3 4 5 6
const observableGetIngredients = new Observable(observer => { observer.next({kind: 'Potatoes', amount: 1}); observer.next({kind: 'Tomatoes', amount: 4}); observer.next({kind: 'Onions', amount: 2}); observer.complete(); });

We load an Observable with a list of three ingredients that it should 'fetch' or 'create' and give to each new subscriber. If we now would do this:

1 2
observableGetIngredients.subscribe(observerA); observableGetIngredients.subscribe(observerB);

Each observer would get the ingredients independently, the Observable makes each order and fetches a total of 2 potatoes, 8 tomatoes, and 2 onions. Basically. If we would make it a bit more interesting:

1 2 3 4 5 6 7 8 9
const observableGetIngredients = new Observable(observer => { observer.next({kind: 'Potatoes', amount: 1}); observer.next({kind: randomizeSomeRottenTomatoes(4), amount: 4}); observer.next({kind: 'Onions', amount: 2}); observer.complete(); }); observableGetIngredients.subscribe(observerA); observableGetIngredients.subscribe(observerB);

Now it might be that either A or B now ends up with a rotten tomato while the other would not have this problem. Also notice the observer.complete() at the end of the list of the RxJS-Observable? This would do two things: it would trigger the complete method in any subscribed RxJS-Observer and it would trigger the RxJS-Observer to unsubscribe from the RxJS-Observable automagically. Now mind that when dealing with streams this observer.complete() not necessarily does get called, this would depend on the implementation. Detaching any RxJS-Observer from the RxJS-Observable can be done by unsubscribing to the created stream.

1 2 3
const subscriptionToIngredients$ = observableGetIngredients.subscribe(observerA); subscriptionToIngredients$.unsubscribe();

This idea is different from the observer pattern as we do not need to unsubscribe at the instance as we subscribed to. Instead, the moment of subscribing we get a subscription and can take this anywhere and forget about the source before subscribing.

RxJS Subject

The RxJS-Subjects is a special form of the RxJS-Observable and also implements the same Subscribable interface, which includes the error() and complete(), just like the RxJS-Observable. Where it differs is that it does not have a predefined list of values it would fetch or create and then lazily push. Another contrast to an RxJS-Observable is that it has the capacity to multicast and thereby its definition comes closer to the Subject from the observer pattern as the RxJS-Observable. Though it would remain from being a solid copy. First, as the RxJS-Observable, the RxJS-Subject is an abstraction of the origin of data and creates a stream of values. Secondly, compared to the observer pattern, the RxJS-Subjects pushes new values of data only when externally the method next(…) of the RxJS-Subject is called. This is in contrast to the Subject of the observer pattern which holds the origin of data and pushes actively any new value. This behaviour of calling next(…) makes it also different to the RxJS-Observable, which doesn't need to an external activation to get and emit values besides the subscribe. Unsubscribing to a RxJS-Subject is similar as the RxJS-Observable to. As an example:

1 2 3 4 5 6 7 8 9 10 11 12
const numberSubject = new Subject<number>(); const subscriptionToNumberSubjectA$ = numberSubject.subscribe((number) => console.log(`A: Number ${number} received.`)); console.log('Going to emit a number.'); numberSubject.next(1); const subscriptionToNumberSubjectB$ = numberSubject.subscribe((number) => console.log(`B: Number ${number} received.`)); numberSubject.next(2); subscriptionToNumberSubjectA$.unsubscribe(); numberSubject.next(3);

Which would result in:

1 2 3 4 5
"Going to emit a number." "A: Number 1 received." "A: Number 2 received." "B: Number 2 received." "B: Number 3 received."

We can see that it only starts pushing it values when we call next(…). Both subscribers receive the same value. Subscribing does not start the stream anew. And unsubscribing does not stop the stream for other additional subscribers.

Rounding it up

There are a few more differences, but this would lead us away from the general grasping of what is going on. For example, RxJS has other variants of the Subjects such as the BehaviorSubject which you can always request for the latest value and the ReplaySubject which passes along previous values to any new subscriber (read Observer).

In a nutshell: the RxJS extends on the classic observer pattern but differs in a healthy way. The subscribing happens on an abstraction of the source of data creating thereby a stream of data. Observables in the RxJS library are unicast, like a recipe for plate of food, and only start becoming active, cooked, when subscribed to. Subjects are special type of observable that instead can multicast values (same plate of food is shared) but do not fetch anything themselves. While Observables are probably the most used in our code and special operators could turn these Observables into multicast, under the hood of the operators such as shareReplay() Observables are translated into a variant of a Subject. See Multicasting in RxJS for more information.