RxJS Fundamentals

A Very Short Introduction to Reactive Programming with RxJS and TypeScript

George Marklow

--

Photo by Kelly Sikkema on Unsplash

RxJS will make you learn to love reactive programming in JavaScript, especially if you’re working with a complex application that’s got lots things that need to happen asynchronously.

It implements the Observer Pattern — a Behavioural Pattern in software engineering — in which objects subscribe to a stream of events coming from publishers. Think of it like when an Instagram Influencer posts media to their channel, then subscribers are notified that new photos and videos that are ready to watch.

In RxJS:

  • An observable is like an Influencer — it’s a producer function that publishes a stream (or series) of values.
  • An observer function runs blocks of code to respond to changes to the observable’s state. Think of it as an object that monitors a stream.
  • A subscription connects an observer to an observable.

In this article, we’ll briefly cover the basics of creating observables, observers and subscriptions.

Getting Started

Before you continue, please install Node and VS Code if you haven’t got these already.

  • Open the Terminal (⌘ + spacebar)
  • Navigate to where you keep code projects
  • create a new folder called rxjs-tutorial
  • Run the npm init command, pressing ENTER to skip through all the questions. This will create a package.json file, which holds project metadata that manages dependencies, versions of those dependencies and script definitions:
npm init

Note: npm stands for Node Package Manager and comes installed with Node.js. It installs packages for Node projects.

  • Next, install RxJS, TypeScript and ts-node using the following commands, installing the packages for development purposes only:
npm i --save-dev rxjs
npm i --save-dev typescript
npm i --save-dev ts-node

Note: TypeScript and ts-node are not essential for getting started with RxJS; however, I like the strong typing offered by TypeScript, and the ease of executing one-off TypeScript files with ts-node.

  • Create a single TypeScript file called test.ts, which will serve as a playground for working with RxJS:
touch test.ts
  • Finally, open the project from VS Code (macOS: see here to install the VS Code launcher from the Terminal):
code .

To confirm everything’s set up correctly, add a console log to the test.ts file:

console.log(‘ok’);

Then, open the Terminal in VS Code (⌘ +J), run the following command, checking that the terminal prints “ok” as shown below:

npx ts-node test.ts

Note: npx stands for Node Package Execute, which is package runner that executes packages from the command line.

Creating an Observable

Let’s create an observable, subscribe to that observable, and print the output to the console.

Create an Order class and, for simplicity, create a single property, Id:

  • First, install the npm package uuid, which will generate random order ids:
npm i uuid --save-dev
  • Then, import Observable from the RxJS library and the required UUID generator function:
import { Observable } from ‘rxjs’;
import { v4 as uuidv4 } from 'uuid';
  • Next, create an Order class that generates a unique id for each order:
class Order {
Id: string;
constructor() {
this.Id = uuidv4();
}
}
  • Create an orders observable, which publishes order messages. Use the generic Observable<T> to specify that only Order objects are emitted from this observable. Using the next function, send two new order objects immediately to observers, then send a 3rd order after a 1-second delay:
const orders = new Observable<Order>(order => {
order. next(new Order());
order. next(new Order());
setTimeout(() => {
order. next(new Order());
}, 1000);
});
  • Finally, subscribe to this observable to print the order ids that arrive:
orders.subscribe(order => console.log(order.Id));
  • Run the code and notice that two orders come through immediately, followed by a third soon after as expected:
efb16e3e-2433–4dd1–93e4–51432abca000
050158d0-d1c4–42b7–9d60–1d56d582c56a
a3795947–279e-4adb-8016–2d40c73d0ada
  • Stop the observable from publishing any more orders by invoking its complete method. Once called, no more orders will be sent. To demonstrate this, change the orders observable to the following and notice that a third order is never published:
const orders = new Observable<Order>(order => {
order.next(new Order());
order.next(new Order());
setTimeout(() => {
order.next(new Order());
}, 1000);
order. complete();
});

Handling Errors and Completion

Here’s how to recognise when exceptions come from the observable, as well as how to acknowledge when the stream of data has finished.

  • Update the subscription method with next, error and complete blocks to handle each of these cases:
orders.subscribe({
next(order) {
console.log(order.Id);
},
error (err) {
console.error(`Error: ${err}`);
},
complete() {
console.log(‘All orders processed’);
}
});
  • Update the orders observable to return three orders, then stop the stream using complete:
const orders = new Observable<Order>(order => {
order.next(new Order());
order.next(new Order());
order.next(new Order());
order. complete();
});
  • Run the npx command again — I got the following:
67ee824c-fb15–460f-852e-9752c763cd54
16a49693–4210–480d-b82f-33078825cbdc
9c8ee331–7e2f-450e-b72d-fe794cda2935
All orders processed
  • Now replace the third order with an error message:
const orders = new Observable<Order>(order => {
order.next(new Order());
order.next(new Order());
order. error(‘bad order’);
order.complete();
});

Notice this time that the “All orders processed” message never appears.

bd1e62e2-fb36-4386-8e21-3b9945dc2432
3feedab8-6986-4e90-bbe5-2218be04e0f3
Error: bad order

Unsubscribing

Tired of waiting around? Here’s how to unsubscribe from an observable.

  • Assign the subscription to a variable, which gives us a reference to unsubscribe from the observable at a later stage. Also, simplify the subscription method to handle only next cases:
const ordersSubscription = orders.subscribe({
next(order) {
console.log(order.Id);
}
});
  • Update the observable to return two orders immediately, followed by another order 2 seconds later:
const orders = new Observable<Order>(order => {
order.next(new Order());
order.next(new Order());

setTimeout(() => {
order.next(new Order());
}, 2000);
});
  • Finally, at the bottom of the file, add a timeout condition that disconnects the orders subscription after 1 second, remembering to use a truthy condition to safely unsubscribe:
setTimeout(() => {
if (ordersSubscription) {
ordersSubscription.unsubscribe();
}
}, 1000);

This is what I got:

432ff5db-8e82–4b82–9f45-c8f3ad28d1b1
1ce05ae5–6f43–4374-a05b-8f29576249af

Notice that the 3rd order is never picked up because it is published after our subscription is disconnected. To prove this, unsubscribe after 3 seconds, and optionally, add a console log to print “unsubscribed”.

You’ll see a 3rd order appearing after 2 seconds, followed by “unsubscribed”:

cce3f1c7–01fb-4407–8e19–293e8cdc3be3
f112413b-8ecf-4a39–9cd5–23b96f0b40c3
c0a6a590-b64e-4c77-a38f-1cadd0a758d5
unsubscribed

--

--

George Marklow

George is a software engineer, author, blogger, and abstract artist who believes in helping others to make us happier and healthier.