#native_company# #native_desc#
#native_cta#

Observables & RxJS


Learning Objectives

  • Know what Observables are.

  • Know what RxJS is and how it relates to Observables.

  • Know what operators are, how to find out about the list of operators and how to understand an operators function by using marble diagrams.

  • Know how to build a simple application using RxJS.

Observables

Streams so far are just a concept, an idea.

We link streams together using operators, so in in our previous example the add function is an operation, specifically it’s an operation which combines two streams to create a third.

Observables is a new primitive type which acts as a blueprint for how we want to create streams, subscribe to them, react to new values, and combine streams together to build new ones.

It’s currently in discussion whether or not Observables make it into the ES7 version of JavaScript.

We are still trying to roll out ES6 so even if it makes it, it will be many years before ES7 becomes something we can code with natively.

Until then we need to use a library that gives us the Observable primitive and that’s where RxJS comes in.

RxJS

RxJS stands for *R*eactive E*x*tensions for *J*ava*S*cript, and it’s a library that gives us an implementation of Observables for JavaScript.

Note

Observables might become a core part of the JavaScript language in the future, so we can think of RxJS as a placeholder for when that arrives.

RxJS is the JavaScript implementation of the ReactiveX API, which can be found here.

The API has multiple implementations in different languages, so if you learn RxJS you’ll know how to write RxJAVA, Rx.NET, RxPY etc…​

Using RxJS with ES6

There are a couple of ways that you can add the RxJS library to your project.

With npm you can use the following command:

npm install rxjs

Alternately, you can also import RxJS from a CDN by including it in your HTML document:

<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>

Note

RxJS 6 is a mandatory dependency starting from Angular 6, so you don’t need to install it manually.

The RxJS library also provides a number of Observable creation functions and operators (to build on the observables foundation) that can be added to your application via import statements like so:

import { map, filter, take } from "rxjs/operators";
import { interval, pipe } from "rxjs";

A Simple Example

Let’s understand the use of RxJS by working through a simple example.

Before we start adding any RxJS code, lets create a main.js file with a barebones Angular implementation. This will hold our RxJS code; Specifically within the ngOnInit function, a life cycle hook that is called once Angular is done creating the component.

import { NgModule, Component } from "@angular/core";
import { BrowserModule } from "@angular/platform-browser";
import { platformBrowserDynamic } from "@angular/platform-browser-dynamic";

@Component({
  selector: "app",
  template: `
    <div>
      <p>Observables & RxJS Example</p>
      <p>(Please check the console)</p>
    </div>
  `
})
class AppComponent {
  constructor() {}

  ngOnInit() {
  }
}

@NgModule({
  imports: [BrowserModule],
  declarations: [AppComponent],
  bootstrap: [AppComponent]
})
class AppModule {}

platformBrowserDynamic().bootstrapModule(AppModule);

Next, we need to get an instance of an RxJS observable. There are numerous methods provided by the RxJS library for this, but for our example we will be using the interval operator.

interval

An observable isn’t a stream. An observable is a blueprint which describes a set of streams and how they are connected together with operations.

Lets assume for our use case, we want our observable to create a single stream and push onto that stream a number every second, incremented by 1.

For this purpose, we can use the interval operator which creates an Observable that emits numbers in sequence based on the provided time interval like so:

import { interval } from 'rxjs';

const numbers = interval(1000);

The operation interval takes as the first param the number of milliseconds between each push of the number onto the stream.

RxJS Interval

subscribe

Note

In RxJS land no one can hear you stream, unless you subscribe.

This observable is cold, that means it’s not currently pushing out numbers.

The observable will become hot and start pushing numbers onto it’s first stream, when it gets it’s first subscriber, like so:

import { interval } from 'rxjs';

const numbers = interval(1000);

numbers.subscribe(value => console.log("Subscriber: " + value));

By calling subscribe onto an observable it:

  1. Turns the observable hot so it starts producing.

  2. Lets us pass in a callback function so we react when anything is pushed onto the final stream in the observable chain.

Our application now starts printing out:

Subscriber: 0
Subscriber: 1
Subscriber: 2
Subscriber: 3
Subscriber: 4
Subscriber: 5
Subscriber: 6
Subscriber: 7
Subscriber: 8
Subscriber: 9
Subscriber: 10

take

But it just keeps on printing, forever, we just want the first 3 items so we use another operator called take.

We pass to that operator the number of items we want to take from the first stream. It creates a second stream and only pushes onto it the number of items we’ve requested, like so:

RxJS Take
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

const numbers = interval(1000);
const takeThree = numbers.pipe(take(3));

takeThree.subscribe(value => console.log("Subscriber: " + value));

This now prints out the below, and then stops:

Subscriber: 0
Subscriber: 1
Subscriber: 2

Tip

pipe is an Observable method which is used for composing operators. In more simpler terms, this allows us to chain operators together.

map

Finally, I want to add another operator called map, this takes as input the output stream from take, convert each value to a date and pushes that out onto a third stream like so:

RxJS Map
import { interval } from 'rxjs';
import { take, map } from 'rxjs/operators';

const numbers = interval(1000);
const takeThree = numbers.pipe(
  take(3),
  map((v) => Date.now())
);

takeThree.subscribe(value => console.log("Subscriber: " + value));

Note

Notice the use of pipe to chain multiple operators together.

This now prints out the time in milliseconds, every second, like so:

Subscriber: 1475506794287
Subscriber: 1475506795286
Subscriber: 1475506796285

Tip

Without declaring separate constants, we can also write the same code in a single statement like so:

interval(1000)
  .pipe(
    take(3),
    map(v => Date.now())
  )
  .subscribe(value =>     console.log("Subscriber: " + value));

Other operators

The above example showed a very very small subset of the total number of operators available to you when using RxJS.

The hardest part of learning RxJS is understanding each of these operators and how to use them.

In that regard even though you are writing in JavaScript learning RxJS is closer to learning another language altogether.

You can find a list of the operators by looking at the official documentation here.

The documentation for the operators we just used above is:

Marble Diagrams

Trying to understand an operator by just reading some words is pretty difficult.

This is why in this lecture I’ve tried to use animations as much as possible.

The Rx team use something called a marble diagram to visually describe an operators function.

This is the official marble diagram for the map operator:

Map Marble
  • The line at the top represents time and the marbles with numbers 1, 2 and 3 represent the input stream over time.

  • The line at the bottom represents the output stream after each of the marbles has been processed through the operator.

  • The bit in the middle is the operator, in this example the operator is a map function which multiplies each marble in the input stream by 10 and pushes them to the output stream.

So in the above the value 1 gets pushed out onto the output stream as 10.

These diagrams are actually interactive.

To understand how an operator works we move the marbles around in the input stream and see how this affects the output stream, like so:

Map RxMarble

Marbles for the above operators are Take and Map

Summary

Observables are a blueprint for creating streams and plumbing them together with operators to create observable chains.

RxJS is a library that lets us create and work with observables.

We can subscribe to an observable chain and get a callback every time something is pushed onto the last stream.

By default observables are cold and only become hot when they get their first subscriber.

Learning RxJS involves understanding all the operators that are available and how to use them together.

We use marble diagrams to help explain how an operator works.

In this lecture we used RxJS in isolation, in the next lecture we will look at how to use RxJS in Angular.

Listing

Listing 1. main.ts
import { NgModule, Component } from "@angular/core";
import { BrowserModule } from "@angular/platform-browser";
import { platformBrowserDynamic } from "@angular/platform-browser-dynamic";
import { map, filter, take } from "rxjs/operators";
import { interval, pipe } from "rxjs";

@Component({
  selector: "app",
  template: `
    <div>
      <p>Observables & RxJS Example</p>
      <p>(Please check the console)</p>
    </div>
  `
})
class AppComponent {
  constructor() {}

  ngOnInit() {
    interval(1000)
      .pipe(
        take(3),
        map(v => Date.now())
      )
      .subscribe(value => console.log("Subscriber: " + value));

    /*
  const middleware = pipe(
    take(3),
    map(v => Date.now())
  );

  interval(1000)
    .pipe(middleware)
    .subscribe(value => console.log("Subscriber: " + value));
    */
  }
}

@NgModule({
  imports: [BrowserModule],
  declarations: [AppComponent],
  bootstrap: [AppComponent]
})
class AppModule {}

platformBrowserDynamic().bootstrapModule(AppModule);

Caught a mistake or want to contribute to the book? Edit this page on GitHub!



Advanced JavaScript

This unique course teaches you advanced JavaScript knowledge through a series of interview questions. Bring your JavaScript to the 2021's today.

Level up your JavaScript now!