[Rxjs] Rxjs store for angular (rx-ng-store)1

kelly woo
4 min readFeb 8, 2022

--

Photo by Pierre Gui on Unsplash

Rxjs is powerful. It is good with operators and it gives cozy place to cache the data, BehaviorSubject.

Most big apps use well-structured store like ngrx or redux based other state manage libraries, but sometimes you app is just enough with rxjs if you don’t need that much of systematic structures.

I had a chance to make snippets with react and recoil(will write about it next time), and loved how simply the library tries to deal with api and cache. This has been influenced by react suspense & recoil.

Anyone who wants to know the result only, here is the repo and the test page.
(This content is out of date. Just read article to understand how poorly you can start and continue to make it better. :)
You can check current status of this project from the article at the end of this article.

Caching

We will cache every response from api at BehaviorSubject and need the data to designate the status of the api so the interface we are looking for is the following.

export type RxQueryCacheState<A> = {
ts: number; // to check the last updated
data: A; // api response
loading: boolean; // is it loading-fetching now?
error: Error | null; // has error?
};
// cache$ = new BehaviorSubject<RxQueryCacheState<A>>(initState);
// yes initState... This is something we have to receive as well
// null is an easy start,
// but then it varies the code and I don't want it.

Also we have to receive the parameters to fetch. So we need the start point which to map to api. For that we make

fetch$ = new Subject<B>();

From fetch$ to cache$ we will implement query part.

export class RxQueryCache<A, B>{
private cache$:BehaviorSubject<RxQueryCacheState<A>>
private fetch$ = new Subject<B>();
constructor(
private query: (s: B) => Observable<A>,
private options: RxQueryOption<A, B>
) {
this.cache$ =
new BehaviorSubject(initState);
this.fetch$

.pipe(
debounceTime(0), // to catch multiple calls at a time
switchMap((param?: any) => {
const state = this.cache$.getValue();

// loading
this.cache$.next({ ...state, loading: true });

// yes we need api which return Observable..
return this.query(param).pipe(
tap((res) => {
this.cache$.next({
...this.cache$.getValue(),
data: res,
loading: false,
ts: Date.now(),
error: null,
});

}),
// stream should go on
catchError((err) => {
this.cache$.next({
...this.cache$.getValue(),
error: err,
loading: false,
ts: Date.now(),
});

// skip current turn we merge with EMPTY.
return merge(EMPTY, this.fetch$);
})
);
})
).subscribe();
if (this.options.prefetch) {
// To fetch with registration.
this.fetch(this.options.prefetch.data)
}
}
....
}

Create Manager or Bridge

Now you can hire a manager to control these respective caches. It is okay to use respectively but for me, always this works better.

@Injectable({ providedIn: 'root' })
export class RxNgQueryStore<A extends RxNgState> {
private state: {[key in keyof A]: RxQueryCache<any>} = {};

registerStore(
query: (s: any) => Observable<any>,
options: RxQueryOption<any, any>
) {
const key = options.key as keyof A;
if (this.state[key]) {
console.warn(
`${key} store already exists. retrieve the existing.
If you want new store, choose different key`
);
return;
}
const store = new RxQueryCache(query, options);
this.state[key] = store;
}
}

Though this class, you can access to the store everywhere in the application.
Now it is time to inject the queryStore into the module.

Add to NgModule

As you know, we can not miss the best part of angular, DI, ‘d better to access to these caches by dependency injection rather than direct.

I‘d like to give access to create store in 3ways.
The first one is through registerStore method.
Second is this, with module import.

const STORE = new RxNgQueryStore<any>();
const StoreInitToken = new InjectionToken('@@@@::ng_query_init');
export const StoreDevToken = new InjectionToken('@@@@::ng_is_dev');

@NgModule({
providers: [
{
provide: RxNgQueryStore,
useFactory: (init?: any, isDev?: boolean) => {
STORE.isDev = isDev || false;
return STORE;
},
deps: [[new Optional(), StoreInitToken], [new Optional(), StoreDevToken]],
},
],
})
export class RxNgQueryModule {
static getStore() {
return STORE;
}


static withInitStore<A>(
source?: (...args: any[]) => RxNgParam<A>[],
deps?: any[]
): ModuleWithProviders<RxNgQueryModule> {
return {
ngModule: RxNgQueryModule,
providers: [
{
provide: StoreInitToken,
useFactory(injector: Injector) {
if (source) {
const initSource = deps
? source(...deps.map((type) => injector.get(type)))
: source();
initSource.forEach((sc) => {
STORE.registerStore(sc.query, sc.options);
});
}
return true;
},
deps: [Injector],
},
{
provide: RxNgQueryStore,
useFactory(init: any, isDev = false) {
STORE.isDev = isDev;
return STORE;
},
deps: [StoreInitToken, [new Optional(), StoreDevToken]],
},
],
};
}
}

The logic of init store is allocated to the StoreInitToken and to call it is added to the deps.

Now all caches can be shared throughout the pages and services by ngModule.
The third to register store is by decorator.

@Injectable()
@RxQueryService()
export class HistoryCacheService {
constructor(private apiService: ApiService) {
}

// prefetch for fetching with registration
@RxNgQuery({key: 'history', initState: [], prefetch: {data: null}})
fetchHistory() {
return this.apiService.fetchHistory();
}
}

Still it needs many touches and have to deal with the errors not yet discovered or poorly mended. Also it reset all previous data..but some might need to be caching and need to distinguish which have to be kept and which have to go.

I know there are many state management libraries and it can be trashed soon or later but if you think this project can be useful, help me to make it better.

:) Thank you.

--

--