Get started with RxJS in Angular

Unlocking the Power of Observables and Operators for Effective Data Handling and Event Management in Angular

Patric
15 min readApr 13, 2023

--

Introduction to Reactive Programming and its Role in Modern Web Development

Reactive Programming is a programming paradigm that allows developers to build more responsive and efficient applications by using streams of data and events to trigger actions. It’s particularly useful in modern web development, where applications often deal with large amounts of data, complex user interactions, and asynchronous operations.

In a reactive application, data flows through the application as a stream of events, and the application reacts to these events by triggering appropriate actions. This allows developers to build applications that are more scalable, resilient, and maintainable, as they can more easily handle changing user requirements and unexpected events.

Reactive Programming has become increasingly popular in recent years, and many modern web frameworks and libraries, such as Angular, Vue.js, and React, provide support for reactive programming using libraries such as RxJS, ReactiveX, or Redux. In this article, we’ll explore how to use RxJS, one of the most popular reactive programming libraries in Angular development, to build more responsive and efficient applications.

Understanding RxJS: A Powerful Library for Reactive Programming in Angular Development

RxJS is a library for reactive programming using observables that makes it easier to compose asynchronous or event-based programs by providing a set of powerful operators. It’s a popular library for reactive programming in Angular development because Angular makes use of it extensively for dealing with events, streams of data, and async operations.

At its core, RxJS is built on top of the concept of observables, which are a sequence of values that can be emitted over time, and observers, which can subscribe to these observables to receive notifications of new values. With RxJS, developers can create observables from a wide range of sources, such as user events, timers, HTTP requests, or web sockets, and use operators to transform, filter, or combine them to create more complex data flows.

RxJS is particularly popular in Angular development because Angular makes heavy use of observables to manage states, handle user events, and communicate with backend services. For example, the Angular HttpClient module returns observables when making HTTP requests, and the Angular Router uses observables to handle navigation events. By using RxJS, developers can create more concise, readable, and reusable code for handling these types of scenarios.

In addition to its use in Angular development, RxJS is also a popular choice for reactive programming in other JavaScript frameworks and libraries, such as React, Vue.js, and Node.js. Its popularity is due to its simplicity, power, and flexibility, which make it a useful tool for building reactive applications that are scalable, responsive, and easy to maintain.

Understanding Observables: The Foundation of Reactive Programming in Angular with RxJS

Observables are the core building blocks of RxJS, and they are essential for building reactive applications in Angular. At their core, observables are a way of representing a stream of data or events that can be observed over time.

An observable can be thought of as a function that takes an observer as its argument and returns a subscription. An observer is an object that defines a set of callbacks that the observable will use to notify the observer of new values, errors, and completion events.

When an observer subscribes to an observable, the observable begins emitting values, and the observer’s callbacks are called with these values. The observer can also unsubscribe from the observable to stop receiving values.

Observables can be created from a wide variety of sources, including events, timers, user input, and data from a backend API. Once an observable is created, it can be transformed and combined with other observables using operators to create more complex data flows.

One of the main benefits of using observables is their ability to handle asynchronous operations and complex data flows. Observables can emit values at any time, and subscribers can react to these values in real time. This makes them ideal for handling scenarios such as handling user input, making HTTP requests, or managing the state in an Angular application.

Overall, observables are a powerful tool for building reactive applications in Angular, and RxJS provides a rich set of operators for working with observables flexibly and efficiently. By mastering observables and their associated operators, developers can create more responsive and efficient applications that are easier to maintain and scale.

Exploring RxJS Operators: Essential Tools for Building Reactive Angular Applications

Operators are a key feature of RxJS, and they are used to transform and manipulate observables to create more complex data flows. Operators can be used to filter, map, combine, and transform observables, among other things, and they are essential for building reactive applications in Angular.

RxJS provides a wide range of operators, and some of the most commonly used operators include:

  1. Map: This operator is used to transform the values emitted by an observable by applying a function to each value.
  2. Filter: This operator is used to filter the values emitted by an observable based on a condition.
  3. Merge: This operator is used to combine multiple observables into a single observable that emits all the values from each observable.
  4. DebounceTime: This operator is used to throttle the values emitted by an observable by waiting for a specified time before emitting the latest value.
  5. SwitchMap: This operator is used to transform an observable into another observable, and it cancels the previous observable when a new value is emitted.
  6. CombineLatest: This operator is used to combine multiple observables into a single observable that emits an array of the latest values from each observable.
  7. Retry: This operator is used to resubscribe to an observable when it encounters an error, and it can be used to handle retry logic when making HTTP requests.

These are just a few examples of the many operators available in RxJS. By using these operators, developers can create more efficient, flexible, and scalable data flows that can handle complex scenarios in an Angular application.

Overall, operators are a powerful tool for building reactive applications with RxJS and mastering their use is essential for developing robust and efficient Angular applications.

Here are code examples for the mentioned operators:

Map Operator:

import { Component } from '@angular/core';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';

@Component({
selector: 'app-example',
template: `
<div>{{ data$ | async }}</div>
`,
})
export class ExampleComponent {
data$: Observable<number>;

constructor() {
this.data$ = Observable.of(1, 2, 3, 4, 5).pipe(
map((value: number) => value * 2)
);
}
}

In this example, we import the Observable and map operators from the rxjs library. In the ExampleComponent class, we declare an Observable called data$, which will emit the numbers 1 to 5. We then apply the map operator to the observable, which multiplies each emitted value by 2.

Finally, in the template for the component, we use the async pipe to subscribe to the data$ observable and display its emitted values. The output will be:

2
4
6
8
10

Filter operator:

import { Observable } from 'rxjs';

const myObservable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.next(4);
observer.next(5);
observer.complete();
});

Now let’s use the filter operator to only emit values that are greater than 3:

import { filter } from 'rxjs/operators';

myObservable.pipe(
filter((value) => value > 3)
).subscribe((value) => {
console.log(value); // Output: 4, 5
});

In this example, the filter operator is used to only emit values that are greater than 3. The subscribe method is called with a function that logs the emitted values to the console. When the observable emits the values 1, 2, 3, 4, and 5, only the values 4 and 5 are logged to the console because they are the only values that pass the filter condition.

Merge operator:

import { Component } from '@angular/core';
import { Observable, merge } from 'rxjs';

@Component({
selector: 'app-example',
template: `
<ul>
<li *ngFor="let value of mergedValues">{{ value }}</li>
</ul>
`
})
export class ExampleComponent {
mergedValues: any[] = [];

constructor() {
const observable1$ = new Observable(observer => {
setTimeout(() => {
observer.next('Value from Observable 1');
}, 1000);
});

const observable2$ = new Observable(observer => {
setTimeout(() => {
observer.next('Value from Observable 2');
}, 2000);
});

const mergedObservable$ = merge(observable1$, observable2$);

mergedObservable$.subscribe(value => {
this.mergedValues.push(value);
});
}
}

In this example, we import the merge operator from the rxjs library. We create two observables, each of which emits a value after a delay of one second and two seconds, respectively.

We then use the merge operator to combine these two observables into a single observable. We subscribe to this merged observable and push each emitted value to an array called mergedValues.

Finally, we display the values in this array using the ngFor directive in the template. When you run this example, you should see two list items displaying "Value from Observable 1" and "Value from Observable 2" after one second and two seconds, respectively.

DebounceTime operator:

import { Component, OnInit } from '@angular/core';
import { Observable, Subject } from 'rxjs';
import { debounceTime } from 'rxjs/operators';

@Component({
selector: 'app-example',
template: `
<input type="text" (input)="onSearch($event.target.value)" />
<ul>
<li *ngFor="let result of searchResults">{{ result }}</li>
</ul>
`,
})
export class ExampleComponent implements OnInit {
private searchTerms = new Subject<string>();
searchResults: string[] = [];

ngOnInit(): void {
this.searchTerms.pipe(debounceTime(300)).subscribe((term: string) => {
// perform search using term
// update searchResults array
this.searchResults = ['Result 1', 'Result 2', 'Result 3'];
});
}

onSearch(term: string): void {
this.searchTerms.next(term);
}
}

In this example, we have an input field for the user to enter search terms. Each time the input value changes, we call the onSearch method, which pushes the new value onto a Subject called searchTerms.

In the ngOnInit method, we subscribe to the searchTerms observable and apply the debounceTime operator with a delay of 300 milliseconds. This means that the observable will wait for 300 milliseconds after the user stops typing before emitting the latest search term.

When a new search term is emitted, we perform the search using the term and update the searchResults array. The *ngFor directive in the template is used to display the results as a list.

Note that this is just a simple example and there are many variations and use cases for the debounceTime operator.

SwitchMap operator:

import { Component } from '@angular/core';
import { Observable } from 'rxjs';
import { switchMap } from 'rxjs/operators';

@Component({
selector: 'my-component',
template: `
<div>
<h2>Posts</h2>
<ul>
<li *ngFor="let post of posts$ | async">{{ post.title }}</li>
</ul>
</div>
`
})
export class MyComponent {
// Declare a property for the current post ID
currentPostId: number;

// Declare a property for the posts observable
posts$: Observable<any>;

constructor(private http: HttpClient) {}

// This method is called when the user selects a new post
selectPost(id: number) {
this.currentPostId = id;

// Use switchMap to cancel the previous HTTP request and start a new one
this.posts$ = this.http.get(`https://jsonplaceholder.typicode.com/posts?userId=${id}`).pipe(
switchMap(posts => {
return this.http.get(`https://jsonplaceholder.typicode.com/comments?postId=${posts[0].id}`);
})
);
}
}

In this example, we have a component with a list of posts, and when the user selects a post, we load the comments for that post using an HTTP request. We use switchMap to cancel any previous HTTP requests that are still in progress and start a new one for the currently selected post.

Note that we also use the async pipe in the template to subscribe to the posts$ observable and automatically update the view when new data is emitted.

CombineLatest operator:

import { Component } from '@angular/core';
import { combineLatest, Observable } from 'rxjs';
import { map } from 'rxjs/operators';

@Component({
selector: 'my-component',
template: `
<h1>Combined Observable Values:</h1>
<ul>
<li *ngFor="let value of combinedValues">{{ value }}</li>
</ul>
`
})
export class MyComponent {
firstObservable$: Observable<string> = // some observable emitting strings
secondObservable$: Observable<number> = // some observable emitting numbers

combinedValues: Array<string | number>;

ngOnInit() {
combineLatest([this.firstObservable$, this.secondObservable$]).pipe(
map(([firstValue, secondValue]) => [firstValue, secondValue])
).subscribe((values) => {
this.combinedValues = values;
});
}
}

In this example, we have two observables, firstObservable$ and secondObservable$, and we want to combine their latest values into a single observable that emits an array of the latest values from each observable.

We use the combineLatest operator to achieve this. We pass an array containing the two observables as an argument to combineLatest, and it returns a new observable that emits an array of the latest values from each observable whenever either of the observables emits a new value.

We then use the map operator to transform the emitted array into a new array that contains the same values. This is just a demonstration of how you can manipulate the emitted values if needed.

Finally, we subscribe to the combined observable and update the combinedValues property on our component with the emitted array of values. We can then display these values in our template using the *ngFor directive.

Retry operator:

import { Component } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs';
import { retry } from 'rxjs/operators';

@Component({
selector: 'app-example',
template: `
<button (click)="getData()">Get Data</button>
<div *ngIf="data">
{{ data }}
</div>
`
})
export class ExampleComponent {
data: any;

constructor(private http: HttpClient) {}

getData(): void {
this.http.get<any>('https://example.com/api/data')
.pipe(
retry(3) // retry the request up to 3 times on error
)
.subscribe(
data => {
this.data = data;
},
error => {
console.log('An error occurred:', error);
}
);
}
}

In this example, we have an Angular component with a button that triggers an HTTP request to retrieve data from an API. The retry operator is used to specifying that the request should be retried up to 3 times if it encounters an error. If the request still fails after 3 retries, the error will be logged to the console.

Note that retry can also take an optional argument to specify the delay between retries, like this: retry(3, 1000) to retry the request 3 times with a 1-second delay between each retry.

Understanding RxJS Subjects: Building Flexible and Scalable Data Flows in Angular

Subjects are observable in RxJS that allow for multiple subscribers to receive the same values. Unlike regular observables, which are unicast (each subscriber receives a separate execution of the observable), subjects are multicast (each subscriber receives the same execution of the observable).

Subjects act as both an observer and an observable. This means that they can be used to both publish and subscribe to values in an application.

There are four types of subjects in RxJS:

  1. BehaviorSubject: This subject stores the latest value emitted and emits that value immediately to new subscribers.
  2. ReplaySubject: This subject stores a buffer of previous values emitted and emits those values immediately to new subscribers.
  3. AsyncSubject: This subject only emits the last value when the complete() method is called, making it useful for scenarios where only the final value is needed.
  4. Subject: This is a basic subject that does not have any special behavior.

Subjects can be used in a wide variety of scenarios, including:

  1. Communication between components: Subjects can be used to communicate between components in an Angular application, allowing multiple components to subscribe to the same values.
  2. Caching data: Subjects can be used to cache data that is frequently used in an application, improving performance and reducing the number of HTTP requests.
  3. State management: Subjects can be used to manage the state of an application, allowing multiple components to subscribe to changes in the application’s state.

Overall, subjects are a powerful tool for building reactive applications with RxJS. By using subjects, developers can create more flexible and scalable data flows that can handle complex scenarios in an Angular application.

Here is a usage example of a BehaviorSubject:

import { Injectable } from '@angular/core';
import { BehaviorSubject } from 'rxjs';

@Injectable({
providedIn: 'root'
})
export class ModalService {
private isOpen = new BehaviorSubject<boolean>(false);

get isOpen$() {
return this.isOpen.asObservable();
}

open() {
this.isOpen.next(true);
}

close() {
this.isOpen.next(false);
}
}

In this example, we have created a service called ModalService that uses BehaviorSubject to store the state of whether the modal window is open or closed.

The isOpen property is a private BehaviorSubject instance that holds a boolean value. It is initialized with the default value of false.

The isOpen$ getter returns an observable of the isOpen subject. This allows us to subscribe to changes in the isOpen value.

The open() and close() methods are used to update the isOpen subject with the new value of true or false, respectively.

In your component, you can inject ModalService and use it like this:

import { Component } from '@angular/core';
import { ModalService } from './modal.service';

@Component({
selector: 'app-modal',
template: `
<div class="modal" *ngIf="isOpen$ | async">
<div class="modal-content">
<p>This is the modal window</p>
<button (click)="close()">Close</button>
</div>
</div>
`,
})
export class ModalComponent {
isOpen$ = this.modalService.isOpen$;

constructor(private modalService: ModalService) {}

close() {
this.modalService.close();
}
}

In this component, we have injected ModalService and used the isOpen$ observable to determine whether to display the modal window. We use the async pipe to subscribe to the observable and update the view when the isOpen value changes.

When the user clicks the “Close” button, the close() method of the ModalService is called, which updates the isOpen subject and hides the modal window.

Here is a usage example of a ReplaySubject:

import { Injectable } from '@angular/core';
import { ReplaySubject } from 'rxjs';

@Injectable({
providedIn: 'root'
})
export class AuthService {
private isAuthenticated = new ReplaySubject<boolean>(1);

constructor() {}

// Call this method when the user logs in
login() {
// Your authentication logic here
this.isAuthenticated.next(true);
}

// Call this method when the user logs out
logout() {
// Your logout logic here
this.isAuthenticated.next(false);
}

// Use this method to get the authentication state
getAuthenticationState() {
return this.isAuthenticated.asObservable();
}
}

In the above example, AuthService is a service that manages the user's authentication state. It creates a ReplaySubject called isAuthenticated with a buffer size of 1, which means that it will replay the latest authentication state to new subscribers.

The login() and logout() methods update the authentication state by calling next() on the isAuthenticated subject with a boolean value indicating whether the user is authenticated or not.

The getAuthenticationState() method returns an observable that can be subscribed to by other components or services to receive updates on the authentication state.

To use this service in a component or service, you can inject it and subscribe to the getAuthenticationState() method:

import { Component, OnInit } from '@angular/core';
import { AuthService } from './auth.service';

@Component({
selector: 'app-my-component',
template: `
<div *ngIf="isAuthenticated">User is authenticated</div>
<div *ngIf="!isAuthenticated">User is not authenticated</div>
`
})
export class MyComponent implements OnInit {
isAuthenticated = false;

constructor(private authService: AuthService) {}

ngOnInit() {
this.authService.getAuthenticationState().subscribe(isAuthenticated => {
this.isAuthenticated = isAuthenticated;
});
}
}

In the above example, MyComponent subscribes to the getAuthenticationState() method of AuthService and updates its isAuthenticated property based on the latest authentication state received from the observable. This allows the component to reactively update its UI based on the authentication state.

Here is a usage example of an AsyncSubject:

import { Injectable } from '@angular/core';
import { AsyncSubject } from 'rxjs';

@Injectable({
providedIn: 'root'
})
export class CalculationService {
private resultSubject = new AsyncSubject<number>();

performCalculation(): AsyncSubject<number> {
// Perform long-running operation
let result = 0;
let i = 0;

const calculate = () => {
for (; i < 1000000000; i++) {
result += i;
this.resultSubject.next(result);
}

// Emit final result and complete subject
this.resultSubject.complete();
};

// Run calculation in separate thread using setImmediate
setImmediate(calculate);

return this.resultSubject;
}
}

In this example, the performCalculation() method runs the long-running calculation in a separate thread using setImmediate, which allows the event loop to continue processing other tasks and prevents blocking the main thread. Intermediate values of the calculation result are emitted using next() inside the loop, demonstrating the nature of AsyncSubject emit values only when it completes. Finally, the complete() method is called to indicate that the calculation is finished and no more results will be emitted.

Other parts of your application can then subscribe to the performCalculation() method to get the final result:

import { Component } from '@angular/core';
import { CalculationService } from './calculation.service';

@Component({
selector: 'app-root',
template: '<div>Result: {{ result }}</div>'
})
export class AppComponent {
result: number;

constructor(private calculationService: CalculationService) {}

ngOnInit() {
this.calculationService.performCalculation().subscribe((result) => {
this.result = result;
});
}
}

In this example, the AppComponent subscribes to the performCalculation() method of the CalculationService to get the final result. The subscribe() method is called with a callback function that sets the result property of the component when the result is emitted.

The Advantages of RxJS in Angular Development: Better Performance, Simpler Code, and More Flexible Data Flows

RxJS offers several advantages for developers when used in Angular development. Some of these advantages include:

  1. Better Performance: RxJS provides efficient ways to handle asynchronous data and events, which can result in better performance for an Angular application. By using RxJS, developers can reduce the number of HTTP requests made by the application, eliminate unnecessary polling, and improve the overall responsiveness of the application.
  2. Simpler Code: RxJS allows developers to write more concise and readable code. By using operators, developers can transform and manipulate data flows in a declarative way, making the code more expressive and easier to understand.
  3. Easier Testing: Because RxJS makes it easier to handle asynchronous data and events, it also makes testing Angular applications simpler. By using RxJS, developers can write tests that are more predictable, easier to understand, and faster to execute.
  4. More Flexible Data Flows: RxJS provides a flexible and powerful way to handle data flows in an Angular application. By using observables, operators, and subjects, developers can create complex data flows that can handle a wide variety of scenarios, from simple data transformations to real-time data streams.
  5. Better Scalability: RxJS makes it easier to build scalable applications by providing ways to handle complex data flows without adding complexity to the code. By using RxJS, developers can create applications that can handle large amounts of data and scale to meet the needs of growing user bases.

Overall, RxJS is a powerful tool for building reactive applications in Angular. By using RxJS, developers can create more performant, simpler, and more scalable applications that are easier to test and maintain.

Conclusion: Exploring the Benefits of RxJS in Angular Development: Simplified Code, Improved Performance, and More

In conclusion, RxJS is a powerful tool for building reactive applications in Angular. It provides a flexible and efficient way to handle asynchronous data and events, allowing developers to create more performant, scalable, and maintainable applications. Observables, operators, and subjects are the building blocks of RxJS, and they offer a wide range of features and functionality that can be used to create complex data flows.

By using RxJS, developers can simplify their code, improve the performance of their applications, and make testing easier. It is a valuable skill for any developer working with Angular, and we encourage readers to learn more about RxJS and its many capabilities.

If you’re interested in learning more about RxJS, there are many resources available, including official documentation, online tutorials, and courses. We hope this article has been helpful in introducing you to RxJS and its benefits, and we encourage you to continue exploring this powerful tool for building reactive Angular applications.

--

--

Patric

Loving web development and learning something new. Always curious about new tools and ideas.