RxJS Basics

Updated: 27 October 2023

Overview

  • RxJS is an implementation of ReactiveX in js
  • ReactiveX is a way of programming with observable streams

“ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming” ~ ReactiveX

Installation

  1. Create a new typescript static site project:
1
npx @nabeelvalley/project-init

And select ts-static

1
cd rxjs
  1. Add rxjs
1
yarn add rxjs
  1. Start the site
1
yarn start

Importing RxJS

To import something from RxJS from the index.ts file you can use the following:

1
import { Observable } from 'rxjs'
2
3
console.log(Observable)

This should log the Observable object from RxJS

Observables

A stream is a series of events that are emitted over time. An observable provides a means to emit and respond to stream events

Create an Observable

We can create an observable that emits string values with the following:

1
const observable = new Observable<string>((observer) => {
2
observer.next('hello world') // emit a value
3
})

Subscribe to an Observable

However, we won’t see events emitted unless we have observers, in order to get the value we need to define an observer by with the subscribe method. In this case due to how the observable was defined the value will be of type string:

1
const observer = observable.subscribe((value) => {
2
console.log(value)
3
})

Observable Events

The events that we can respond to on an observable are next, error, and complete

We can additionally also handle errors and completion events by passing them into the subscribe as additional params:

1
const observable = new Observable<string>((observer) => {
2
observer.next('hello world')
3
observer.next('what is the up')
4
observer.error('i am error')
5
observer.complete()
6
// any methods called on the observer after complete will fire
7
observer.next('will not send')
8
})
9
10
observable.subscribe(
11
(value) => console.log(value), // handle next
12
(error) => console.error(error), // handle error
13
() => console.log('complete') // handle complete
14
)

Unsubscribe

To unsubscribe you can use the observer.unsubscribe method:

1
const observable = new Observable<string>((observer) => {
2
observer.next('hello world')
3
let count = 0
4
setInterval(() => {
5
count++
6
observer.next('tick ' + count)
7
}, 1000)
8
})
9
10
const observer = observable.subscribe(
11
(value) => console.log(value),
12
(error) => console.error(error),
13
() => console.log('complete')
14
)
15
16
setTimeout(() => {
17
console.log('unsub')
18
observer.unsubscribe()
19
}, 5000)

Multiple Observers

We can create multiple observers simply by using the observable.subscribe method:

1
const observable = new Observable<string>((observer) => {
2
observer.next('hello world')
3
let count = 0
4
setInterval(() => {
5
count++
6
observer.next('tick ' + count)
7
}, 1000)
8
})
9
10
const observer1 = observable.subscribe(
11
(value) => console.log('observer1', value),
12
(error) => console.error('observer1', error),
13
() => console.log('observer1 complete')
14
)
15
16
const observer2 = observable.subscribe((value) =>
17
console.log('observer2', value)
18
)
19
20
setTimeout(() => {
21
console.log('unsub')
22
observer1.unsubscribe()
23
}, 5000)

Child Subscriptions

If we want to create linked/child observers we can call the observer.add and observer.remove methods to add new observers instead of the observable.subscribe which will ensure that the child observers are unsubscribed when the parent is:

1
const observer = observable.subscribe(
2
(value) => console.log('observer', value),
3
(error) => console.error('observer', error),
4
() => console.log('observer complete')
5
)
6
7
const childObserver = observable.subscribe((value) =>
8
console.log('child', value)
9
)
10
11
// add a child observer
12
observer.add(childObserver)

In the below, the child will not unsubscribe as it’s removed from the parent:

1
setTimeout(() => {
2
// remove a child observer, this will not unsubscribe the child
3
observer.remove(childObserver)
4
}, 3000)
5
6
setTimeout(() => {
7
console.log('unsub')
8
observer.unsubscribe()
9
}, 5000)

However, if we don’t remove it, it will automatically

Hot or Cold

A cold observable is an observable whose producer is only activated once a subscription has been created

For example, we can create a second observer after a second, this observer will receive all the messages from the start but they will come through a second delayed:

1
setTimeout(() => {
2
console.log('unsub')
3
const observerLate = observable.subscribe((value) =>
4
console.log('observerLate', value)
5
)
6
}, 1000)

However, if we want our observer to get the latest updates and not receive older messages we can create a warm/hot observer by piping the share method when we define our observable, this will ensure that the observer created above will receive the latest messages only

1
const observable = new Observable<string>((observer) => {
2
observer.next('hello world')
3
let count = 0
4
setInterval(() => {
5
count++
6
observer.next('tick ' + count)
7
}, 1000)
8
}).pipe(share())

Now, the observerLate will only print out from tick 2 instead of all the ticks

From Event

You can create an observable from DOM events by using the fromEvent function:

1
const observable = fromEvent(document, 'mousemove')

Subjects

A subject is simultaneously an observer and an observable which means we are able to send events using the subject and are also able to subscribe to the subject

1
import { Subject } from 'rxjs'
2
3
const subject = new Subject<string>()
4
5
const observer1 = subject.subscribe((value) => console.log('observer1', value))
6
7
subject.next('first message')

So now we can add observers and send new messages like this:

1
const observer2 = subject.subscribe((value) => console.log('observer2', value))
2
3
subject.next('second message')
4
5
observer2.unsubscribe()
6
7
subject.next('third message')

In the above, observer2 will only react to the second message

There are four types of subjects:

Normal Subject

A Normal subject will allow observers to only receive events received after it subscribed, this is what was used above as well:

1
import { Subject } from 'rxjs'
2
3
const subject = new Subject<string>()
4
5
const observer1 = subject.subscribe((value) => console.log('observer1', value))
6
7
subject.next('first message')
8
9
const observer2 = subject.subscribe((value) => console.log('observer2', value))
10
11
subject.next('second message')
12
13
observer2.unsubscribe()
14
15
subject.next('third message')

Behavior Subject

A Behavior subject will allow observers to receive all events received after it subscribed as well as:

  • An initial event for the first observer only
  • The last event fired before a new observer subscribes
1
import { BehaviorSubject } from 'rxjs'
2
3
const subject = new BehaviorSubject<string>('welcome observer1')
4
5
const observer1 = subject.subscribe((value) => console.log('observer1', value))
6
7
subject.next('first message')
8
subject.next('second message')
9
10
const observer2 = subject.subscribe((value) => console.log('observer2', value))
11
12
subject.next('third message')
13
14
observer2.unsubscribe()
15
16
subject.next('fourth message')

In the above, observer1 gets the welcome event as well as the first, second, and third messages, and observer2 gets the second and third messages only even though the second message was fired before observer2 subscribed

Replay Subject

A Replay subject allows you to specify a buffer of events that should be replayed to new subscribers

1
import { ReplaySubject } from 'rxjs'
2
3
const subject = new ReplaySubject<string>(2)
4
5
const observer1 = subject.subscribe((value) => console.log('observer1', value))
6
7
subject.next('first message')
8
subject.next('second message')
9
subject.next('third message')
10
11
const observer2 = subject.subscribe((value) => console.log('observer2', value))
12
13
subject.next('fourth message')
14
15
observer2.unsubscribe()
16
17
subject.next('fifth message')

So in the above, observer2 will receive the second, third, and fourth messages only

Additionally a Replay subject can accept a second optional argument when constructing which is a buffer time in which the events should be replayed. Any events that happen outside of this timeframe will not be replayed

1
import { ReplaySubject } from 'rxjs'
2
3
const subject = new ReplaySubject<string>(20, 200)
4
5
let count = 0
6
7
setInterval(() => {
8
count++
9
subject.next('tick ' + count)
10
}, 100)
11
12
const observer1 = subject.subscribe((value) => console.log('observer1', value))
13
14
setTimeout(() => {
15
const observer2 = subject.subscribe((value) =>
16
console.log('observer2', value)
17
)
18
}, 500)

In the above, observer2 will receive ticks 4 and 5 even though it only starts listening at tick 6. The observer receives a time-based buffer of messages - 200ms in this case - and not a count-based one. The 20 provided to the ReplaySubject constructor is the max number of messages that should be bufferred

Async Subject

The async subject only emits the last value and will only do so once the complete method has been called on the subject

1
import { AsyncSubject } from 'rxjs'
2
3
const subject = new AsyncSubject<string>()
4
5
const observer = subject.subscribe((value) => console.log('observer', value))
6
7
// these will not be read by the observer until the last value is sent when "completed"
8
subject.next('intermediate result')
9
subject.next('final result')
10
11
subject.complete()

In the above example, the observer will only receive the final result once the .complete method is called on the subject, kind of like a promise where there is only one actual result

Operators

Lots of operators, these can be understood using Marble Diagrams

Operators are pure functions that do not modify the observable but will create a new observable

There are two types of operators

  • Static operators - Used to create observables
  • Instance operators - Methods on Observable Instances

Marble Diagrams

Marble diagrams show us how an operator applies to the observable stream over time

  • X represents an error
  • | represents an observable completion

Using Operators

Merge Operator

Used to merge the results from two different observables. This will combine the results from two observables into a single observable

1
import { Observable, merge } from 'rxjs'
2
3
const observable1 = new Observable<string>((subscriber) => {
4
subscriber.next('observable1')
5
})
6
7
const observable2 = new Observable<string>((subscriber) => {
8
subscriber.next('observable2')
9
})
10
11
const mergedObservable = merge(observable1, observable2)
12
13
// will log the data from each of the observables
14
mergedObservable.subscribe(console.log)

The mergedObservable will log the data that is emitted from both the observables

Map Operator

The Map Operator can be used to transform the data from the observable

1
import { Observable, map } from 'rxjs'
2
3
const mapToLength = map<string, number>((value) => value.length)
4
5
const observable = new Observable<string>((subscriber) => {
6
subscriber.next('observable')
7
subscriber.next('observable longer')
8
}).pipe(mapToLength)
9
10
observable.subscribe(console.log)

In the above, we use map to create an observable operator that can be used to get the length of the message passed. In this example we will just log out the lengths of each message, 10 and 17 respectively

SkipUntil

Allows us to emit events from one observable until a second one starts emitting events

1
import { Subject, interval, skipUntil } from 'rxjs'
2
3
// will emit a value every 200ms
4
const subject1 = interval(1000)
5
6
const subject2 = new Subject<string>()
7
8
// executed before 1000ms
9
subject2.next('message 1')
10
11
const skippedObservable = subject2.pipe(skipUntil(subject1))
12
13
const observer = skippedObservable.subscribe(console.log)
14
15
subject2.next('message 2')
16
17
// only these messages will be seen by the subject
18
setInterval(() => subject2.next('message after time'), 2000)