42
loading...
This website collects cookies to deliver better user experience
RxJS is an API for asynchronous programming
with observable streams.
Reactive programming is programming with asynchronous data streams.
A stream is a sequence of ongoing events ordered in time. It can only emit 3 things: a data typed value, an error, or a termination signal.
f
that maps a stream into another stream, i.e. f: S1 → S2
This function we call an operator.switchMap
switchMap
operator is a very popular one that has a handful of practical applications. It is generally used to implement a discard action between the input streams which can save a lot of trouble and computation in practice.Input Observable 1
emits a value, Input Observable 2
emits all of its values unless Input Observable 1
emits a new value before the Input Observable 2
completes. If you look at the output observable you will notice that there are only two 30's. This is because Input Observable 2
could not be complete before Input Observable 1
emitted the value 5. You easily confirm this because the space between 3 and 5 is much less than the size of the axis for Input Observable 2
, suggesting there was only time to emit the first two values. import { Observable } from 'rxjs';
/* Instantiate an observable */
const observable = new Observable(subscriber => {
subscriber.next(1); // pushes a value
subscriber.next(2); // pushes another value synchronously
setTimeout(() => {
subscriber.next(3); // pushes last value after a wait of 1s
subscriber.complete(); // terminates observable stream
}, 1000);
});
/* Subscribing to an observable */
console.log('just before subscribe');
const subscription = observable.subscribe({
// The three possible output captures:
next(x) { console.log('got value ' + x); },
error(err) { console.error('something wrong occurred: ' + err); },
complete() { console.log('done'); }
}); // creates subscription object
console.log('just after subscribe');
/* Unsubscribing to an observable using subscription */
setTimeout(() => {
subscription.unsubscribe();
}, 500);
// Logs:
// just before subscribe
// got value 1
// got value 2
// just after subscribe
setTimeout
.import { Observable } from "rxjs";
// Creating a cold observable
const observable = Observable.create((observer) => {
observer.next(Math.random()); // We explicitly push the value to the stream
});
// Subscription 1
observable.subscribe((data) => {
console.log(data); // 0.24957144215097515 (random number)
});
// Subscription 2
observable.subscribe((data) => {
console.log(data); // 0.004617340049055896 (random number)
});
import { Observable } from "rxjs";
// Coming from an event which is constantly emmit values
const observable = Observable.fromEvent(document, 'click');
// Subscription 1
observable.subscribe((event) => {
console.log(event.clientX); // x position of click
});
// Subscription 2
observable.subscribe((event) => {
console.log(event.clientY); // y position of click
});
import { Observable } from "rxjs";
// Return a basic observable
const simpleObservable = val => Observable.of(val).delay(5000);
// Convert basic observable to promise
const example = sample('First Example')
.toPromise() // Now its a promise
.then(result => {
console.log('From Promise:', result); // After 500ms, output 'First Example'
});
toPromise
method, any observable can be converted to a promise. Note that because it returns a true JS Promise, toPromise
is not a pipable operator, as it does not return an observable.next
, error
, and complete
. The following is an example of a typical Observer object:const observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
// To use it, pass it to a subscribe
observable.subscribe(observer);
import { from } from "rxjs";
import { filter } from "rxjs/operators";
from([1, 2, 3, 4, 5]).pipe(
filter((x) => (x % 2) === 0)
).subscribe(console.log); // [2, 4]
import { from } from "rxjs";
import { filter, take, map } from "rxjs/operators";
from([1, 2, 3, 4, 5]).pipe(
filter((x) => (x % 2) === 0),
take(1),
map((firstValue) => "The first even number was " + firstValue)
).subscribe(console.log);
import { Subject, from } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(1);
subject.next(2);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
const observable = from([1, 2, 3]);
observable.subscribe(subject); // You can subscribe providing a Subject
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
.next()
function to push data into the pipeline. Be wary of overusing them since most problems are solvable with only data transformation and Observables.import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
observeOn(asyncScheduler)
);
console.log('just before subscribe');
observable.subscribe({
next(x) {
console.log('got value ' + x)
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
}
});
console.log('just after subscribe');
// Logs
// just before subscribe
// just after subscribe
// got value 1
// got value 2
// got value 3
// done
value...
were delivered after just after subscription. This is because observeOn(asyncScheduler)
introduces a proxy Observer between the new Observable and the final Observer.