RxJS Better Practice

kelly woo
4 min readJan 24, 2021
Photo by Olav Ahrens Røtne on Unsplash

RxJS is a real life savior when it comes to the app with complex data structuring, helps building solid data pipelines easy and elegant.
Like any other tools, this conveniency and simplicity comes with the chance of messing up if you don’t know the proper use case, that requires the knowledge of which is the Better Practice (still figuring out what is the best).

This is from KnowledgeShare at my team and based on some mistakes we’ve seen. Some are silly and some are critical to the app.
And this is not the official Better Practice, but collected from real cases, it might not apply to your app, but good to know these bad and better cases .

So Let’s share it.

| Split logic with operators

Simple and Easy first.
Do not put every operations in the observer function(next subscriber).
Use Operators to lift the burden of them off and make easy to follow the stream and keep function light.

// can be better
obs$.subscribe((state)=>{
if(!state) return;
const newState = state.map(...);
this.setState(newState);
});
// is better
obs$.pipe(
filter(Boolean),
map((state)=>{...})
).subscribe((newState)=>{
this.setState(newState);
})

| Flatten subscribe function

We know callback hell, and it applies to rxjs observer function too.
Do not use subscribe inside subscribe. You have plenty of **map and join operators which flatten the stream.

Want to read this story later? Save it in Journal.

// can be better
obs$.subscribe((state)=>{
const obs2$ = ... || ...;
obs2$.subscribe((state2)=>{
const { a } = state;
const { b } = state2;
this.setValue(a * b);
})
});
// is better
obs$.pipe(
switchMap((state)=> {
const obs2$ = ... || ...;
return obs2$.pipe(
map((state2)=>{
return state.a * state2.b
})
)
})
).subscribe((multiplied)=>{
this.setValue(multiplied);
})
// catchError part should be added too..

Also this practice can prevent dangerous code like this.

setArchive(taskId) {
this.getTask(taskId).subscribe(({archive})=>{
if(archive) { // case *1
this.isArchive = true;
} else { // case *2
this.getTaskList().subscribe((list)=>{
this.isArchive = list.length < 1;
});
}
});
}

This code is problematic when it is called several times in a row, because second call with case *1 can be faster than the first call with case *2 and past result would override the value of isArchive. use switchMap Or concatMap Instead.

| Subscription management

It is important to unsubscribe the steam which does not require any more. If certain observables share lifecycle, good to group it or use takeUntil to unsubscribe the group.

// can be better: many variable names lower readability
const subscription1 = obs1$.subscribe(...);
const subscription2 = obs2$.subscribe(...);
const subscription3 = obs3$.subscribe(...);
// unsubscribe
subscription1.unsubscribe();
subscription2.unsubscribe();
subscription3.unsubscribe();
// is better to use subscription..
const subscriptions = obs1$.subscribe(...);
subscriptions.add(obs2$.subscribe(...));
subscriptions.add(obs3$.subscribe(...));
// unsubscribe
subscriptions.unsubscribe();
// or
const unsbuscribeTrigger$ = new Subject();
obs1$.pipe(takeUntil(unsbuscribeTrigger$)).subscribe(...);
obs2$.pipe(takeUntil(unsbuscribeTrigger$)).subscribe(...);
obs3$.pipe(takeUntil(unsbuscribeTrigger$)).subscribe(...);
// unsubscribe
unsubscribeTrigger$.next();
unsubscribeTrigger$.complete();

Or add SubscriptionManager

export class SubscriptionManager {
static G_ID = 1;
idx = 0;
subscriptions: Record<string, Subscription> = {};
instanceId = `SUB_${SubscriptionManager.G_ID++}`;

destroy(): void {
if (!this.subscriptions) return;
this.unsubscribeAll();
this.subscriptions = null;
}

unsubscribeAll(): void {
// to reuse
if (!this.subscriptions) return;
Object.values(this.subscriptions).forEach((sub) => {
if (!sub.closed) {
sub.unsubscribe();
}
});
this.subscriptions = {};
}

unsubscribe(key: string): void {
if (this.subscriptions && this.subscriptions[key]) {
const sub = this.subscriptions[key];
if (!sub.closed) {
sub.unsubscribe();
}
delete this.subscriptions[key];
}
}

add(sub: Subscription, keyData?: string): string {
const key = keyData || `${this.instanceId}_${++this.idx}`;
this.subscriptions[key] = sub;
return key;
}
}

| Unicast and Multicast

Unicast means the source of data is responsible for one observer, on the other hands, multicast takes multiple of them, which means 2 observers(subscribers) require 2 different data stream.

For instance, timer and interval is unicast that means it creates new data source observable everytime it is called. But if you like to keep one observable as the data source you need to multicast the observable.

const a = interval(1000);a.subscirbe((v) => console.log('subscription1:', v));
setTimeout(()=>{a.subscribe((v)=> console.log('subscription2': 'v'))}, 3000);

Subscription1 and 2 gets different value which means they have different data source.

Rxjs makes it easy with share operator. So if you’d like to make it multicast(uni data source), just add share().

const a = interval(1000).pipe(share());a.subscirbe((v) => console.log('subscription1:', v));
setTimeout(()=>{a.subscribe((v)=> console.log('subscription2': 'v'))}, 3000);

You see, why this is important?
At angular with HttpClient.post, it rarely happens, but when you put 2 subscriptions to one httpclient.post()?
Yes, you sent 2 identical post requests!.

So these are some better cases that I fixed with my team. There could be better than this better practice and more better practices with other cases..
I hope it is helpful to you. :)

--

--