useSWR-esque data fetching with RxJS

Updated: 27 October 2023

Coming from React

In the react-world something I’ve really enjoyed using is the SWR library for fetching data which allows consolidation of data into a hook that can be used to track error and loading states as well as keep the data accessible at all times for consuming components

Implementing this with RxJS

In RxJS we work with data as a stream, when fetching data we often have a resulting stream that will emit a value once the data has been fetched. The problem with this methodology is that we only have the resultant data but errors and loading states are not easily surfaced to consumers of this data unless they hook into the entire lifecycle for the momment the stream is created

Below is an implementation of a class that enables the definition and management of multiple streams that consumers can access for resolving the respective states for a given request. Furthermore, requests are triggered by params observable such that each emission of the observable will trigger the fetcher. Hence, this can be used for loading data but also for working with forms where each submission is a .next to the params stream

1
import { BehaviorSubject, Observable, throwError } from "rxjs";
2
import { catchError, debounceTime, filter, map } from "rxjs/operators";
3
4
/**
5
* Creates a data fetcher class that can maintain state for managing loading and error states
6
*/
7
export class Fetcher<TParams, TData> {
8
public readonly state$ = new BehaviorSubject<LoaderState<TData>>(initial);
9
10
/**
11
* Data will remember previously resolved values while loading and in the event of an error
12
*/
13
public readonly data$ = this.state$.pipe(
14
filter(isDataState),
15
map((state) => state.data)
16
);
17
18
public readonly error$ = this.state$.pipe(
19
map((state) => state.state === "error")
20
);
21
22
public readonly loading$ = this.state$.pipe(
23
map((state) => state.state === "loading")
24
);
25
26
/**
27
* @param dependencies depednencies that will trigger the data to be loaded
28
* @param fetcher function for loading data that should return a stream that
29
* @param debounce time used to debounce requests from the dependency observable
30
*/
31
constructor(
32
private readonly params: Observable<TParams>,
33
private readonly fetcher: (params: TParams) => Observable<TData>,
34
debounce = 0
35
) {
36
this.dependencies.pipe(debounceTime(debounce)).subscribe(this.fetchData);
37
}
38
39
private fetchData = (deps: TParams): void => {
40
this.state$.next(loading);
41
42
this.fetcher(deps)
43
.pipe(
44
catchError(() => {
45
this.state$.next(error);
46
return throwError(() => new Error("Error loading data"));
47
})
48
)
49
.subscribe((data) =>
50
this.state$.next({
51
state: "data",
52
data,
53
})
54
);
55
};
56
}

Additionally, we can define some utilities for the above class to use:

1
// Implementation of a simple state machine for the different staes available in the data fetcher
2
type Status<TState extends string, TData = undefined> = {
3
state: TState;
4
data: TData;
5
};
6
7
type LoadingState = Status<"loading">;
8
type ErrorState = Status<"error">;
9
type InitialState = Status<"initial">;
10
11
type DataState<TData> = Status<"data", TData>;
12
13
type LoaderState<TData> =
14
| DataState<TData>
15
| InitialState
16
| LoadingState
17
| ErrorState;
18
19
// some constatns for working with the states more easily
20
const loading: LoadingState = {
21
state: "loading",
22
data: undefined,
23
};
24
25
const error: ErrorState = {
26
state: "error",
27
data: undefined,
28
};
29
30
const initial: InitialState = {
31
state: "initial",
32
data: undefined,
33
};
34
35
const isDataState = <TData>(
36
state: LoaderState<TData>
37
): state is DataState<TData> => state.state === "data";
38
39
const getData = <TData>(state: LoaderState<TData>): TData | undefined =>
40
isDataState(state) ? state.data : undefined;

Using the Class

We can use the class we defined by creating an instance of it

1
import { from } from "rxjs";
2
3
const search$ = new BehaviourSubject<string>("");
4
5
const usersFetcher = new Fetcher(
6
search$,
7
(search: string) => from(fetch(`https://api.com?search=${search}`)),
8
200
9
);
10
11
const users$ = usersFetcher.data$;
12
const isLoading$ = usersFetcher.loading$;
13
const hasError$ = usersFetcher.error$;

The users$, isLoading$ and hasError$ values above are streams that we can use with our normal RxJS code

Interacting with the data is also pretty neat now, for example we can trigger the data to be refreshed whenever someone updates the search in our UI by doing something like:

1
onSearchChange(event) {
2
this.search$.next(event.target.value || '')
3
}

Which will then trigger the data to be refreshed and all the loading and error states to be updated accordingly