AngularRxJS

How to Create Your Own Observable and How Observables Work Under the Hood

observables

Observables are a cornerstone of RxJS and reactive programming, allowing developers to work with asynchronous data streams effectively. In this article, we’ll explore how Observables work under the hood and guide you through the process of creating your own Observable.


How Observables Work Under the Hood

An Observable is a representation of a stream of data that can be observed over time. Unlike promises, which handle a single value, Observables can emit multiple values or events. Here’s how they work internally:

1. Core Components of an Observable

  • Producer: The source of data or events.
  • Observer: The entity that listens to or reacts to data emitted by the Observable.
  • Subscription: The link between the Observable and the Observer. It allows you to start, pause, or stop observing.

2. Lifecycle of an Observable

Observables follow a simple lifecycle:

  • Creation: An Observable is instantiated.
  • Execution: The Observable emits data when subscribed to.
  • Notification: The Observer reacts to emitted data, errors, or completion events.
  • Cleanup: Resources are released when the Observable completes or is unsubscribed.

3. Internals

  • Observers: An Observable pushes values to its observers using three types of notifications:
    • Next: Emits a value to the Observer.
    • Error: Notifies the Observer of an error, stopping the stream.
    • Complete: Signals that the Observable has finished emitting values.
  • Unicast Nature: Each subscription to an Observable creates a unique execution path. This ensures that each observer gets its own copy of emitted data.

How to Create Your Own Observable

Creating a custom Observable is straightforward. RxJS provides the Observable class, allowing developers to define their custom behavior.

1. Basic Observable

Here’s how to create a basic Observable that emits values:

typescriptCopy codeimport { Observable } from 'rxjs';
const myObservable = new Observable((subscriber) => {
  subscriber.next('Hello');
  subscriber.next('World');
  subscriber.complete();
});
// Subscribing to the Observable
myObservable.subscribe({
  next: (value) => console.log(`Received: ${value}`),
  complete: () => console.log('Observable completed'),
});
// Output:
// Received: Hello
// Received: World
// Observable completed

Explanation

  • The Observable constructor takes a function with a subscriber parameter.
  • subscriber.next(value) sends a value to the Observer.
  • subscriber.complete() notifies the Observer that the Observable has finished emitting values.
  • If an error occurs, use subscriber.error(error).

2. Custom Observable with Error Handling

Let’s create an Observable that emits values and handles errors:

typescriptCopy codeconst myObservableWithError = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  // Simulate an error
  if (Math.random() > 0.5) {
    subscriber.error('An error occurred!');
  } else {
    subscriber.next(3);
    subscriber.complete();
  }
});
myObservableWithError.subscribe({
  next: (value) => console.log(`Value: ${value}`),
  error: (err) => console.error(`Error: ${err}`),
  complete: () => console.log('Completed successfully'),
});

Explanation

  • Errors are sent to the error callback in the Observer and terminate the Observable.
  • Emission stops once subscriber.error or subscriber.complete is called.

3. Observable with Delayed Emissions

You can create an Observable that emits values over time:

typescriptCopy codeconst delayedObservable = new Observable((subscriber) => {
  let count = 0;
  const intervalId = setInterval(() => {
    subscriber.next(count++);
    if (count > 5) {
      subscriber.complete();
      clearInterval(intervalId);
    }
  }, 1000);
  // Cleanup logic
  return () => {
    clearInterval(intervalId);
    console.log('Observable unsubscribed');
  };
});
const subscription = delayedObservable.subscribe({
  next: (value) => console.log(`Value: ${value}`),
  complete: () => console.log('Observable completed'),
});
// Unsubscribe after 3 seconds
setTimeout(() => {
  subscription.unsubscribe();
}, 3000);
// Output (varies due to timing):
// Value: 0
// Value: 1
// Value: 2
// Observable unsubscribed

Explanation

  • Observables can manage time-based data using setInterval or similar constructs.
  • The return value of the function passed to the Observable constructor is the cleanup logic, executed when the subscription is unsubscribed.

4. Advanced Example: User Interaction Observable

Create an Observable that listens to user interactions (e.g., mouse clicks):

typescriptCopy codeimport { fromEvent } from 'rxjs';
const clickObservable = fromEvent(document, 'click');
clickObservable.subscribe((event) => {
  console.log(`Mouse clicked at: (${event.clientX}, ${event.clientY})`);
});

Explanation

  • fromEvent is a utility function that simplifies creating Observables for DOM events.
  • The Observable emits an event object for every user click.

Operators and Transformations

Observables in RxJS gain much of their power from operators. Operators like map, filter, and merge allow you to transform and combine streams of data.

Example with map:

typescriptCopy codeimport { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
const numbersObservable = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});
numbersObservable.pipe(
  map((value) => value * 2)
).subscribe((value) => console.log(`Transformed value: ${value}`));
// Output:
// Transformed value: 2
// Transformed value: 4
// Transformed value: 6

Key Takeaways

  1. Observables are lazy and only execute when subscribed to.
  2. They emit three types of notifications: next, error, and complete.
  3. Custom Observables can be created using the Observable class.
  4. Cleanup logic is essential for avoiding memory leaks, especially in long-running or infinite streams.
  5. Operators enhance Observables by providing a declarative way to transform and combine streams.

By mastering Observables and their internals, you can fully leverage the power of RxJS to handle asynchronous and event-based data streams efficiently.

Leave a Reply

Your email address will not be published. Required fields are marked *