The mergeMap
operator in RxJS is one of the most versatile tools for working with asynchronous streams. It maps each value emitted by a source Observable to an inner Observable and then merges the outputs of these inner Observables into a single Observable. Unlike concatMap
, which processes one inner Observable at a time, mergeMap
allows all inner Observables to run concurrently, making it ideal for handling parallel operations.
In this article, we’ll explore what mergeMap
is, how it works, its syntax, and practical use cases, along with examples to demonstrate its functionality.
What is mergeMap
?
mergeMap
is a transformation operator that takes a source Observable and maps each emitted value to an inner Observable. It subscribes to all inner Observables simultaneously and merges their emissions into a single Observable.
Key Features
- Concurrent Execution: Allows multiple inner Observables to run at the same time.
- Dynamic Mapping: Maps values from the source Observable to new inner Observables dynamically.
- Flattening: Combines the emissions of all active inner Observables into one stream.
Syntax
typescriptCopy codemergeMap(project: (value: T, index: number) => ObservableInput, resultSelector?: (outerValue, innerValue, outerIndex, innerIndex) => any): OperatorFunction
Parameters
project
: A function that maps each emitted value from the source Observable to an inner Observable.- Takes the emitted value and an optional index as arguments.
resultSelector
(optional): A function to combine the outer and inner Observable values.
Returns
- An Observable that emits values from the inner Observables as they arrive.
Importing mergeMap
To use mergeMap
, import it from the rxjs/operators
package:
typescriptCopy codeimport { mergeMap } from 'rxjs/operators';
How mergeMap
Works
- The source Observable emits a value.
mergeMap
maps this value to an inner Observable using theproject
function.- It subscribes to the inner Observable and emits its values as they arrive.
- Unlike
concatMap
, it doesn’t wait for the previous inner Observable to complete before subscribing to the next one.
Examples
Example 1: Basic Usage
Mapping values to inner Observables and merging their outputs.
typescriptCopy codeimport { of } from 'rxjs';
import { mergeMap, delay } from 'rxjs/operators';
const source$ = of(1, 2, 3);
const result$ = source$.pipe(
mergeMap((value) => of(`Mapped value: ${value}`).pipe(delay(1000 * value)))
);
result$.subscribe((result) => console.log(result));
// Output (order may vary due to concurrency):
// Mapped value: 1 (after 1 second)
// Mapped value: 2 (after 2 seconds)
// Mapped value: 3 (after 3 seconds)
Explanation
- Each value from the source Observable is mapped to an inner Observable.
- Inner Observables are emitted as they complete, potentially in parallel.
Example 2: Fetching Data from Multiple APIs
Use mergeMap
to fetch data from multiple APIs concurrently.
typescriptCopy codeimport { of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
import { ajax } from 'rxjs/ajax';
const apiEndpoints = of('/api/data1', '/api/data2', '/api/data3');
apiEndpoints
.pipe(
mergeMap((url) => ajax.getJSON(url))
)
.subscribe((response) => console.log('Response:', response));
// Output (order may vary due to network delays):
// Response: { ...data1 }
// Response: { ...data2 }
// Response: { ...data3 }
Explanation
- Each API endpoint is mapped to an Observable representing an HTTP GET request.
- All requests are executed concurrently, and their responses are emitted as they arrive.
Example 3: Concurrent File Uploads
Upload multiple files concurrently and log their statuses.
typescriptCopy codeimport { of } from 'rxjs';
import { mergeMap, delay } from 'rxjs/operators';
const files$ = of('file1.txt', 'file2.txt', 'file3.txt');
const uploadFile = (fileName: string) =>
of(`Uploaded ${fileName}`).pipe(delay(1500));
files$
.pipe(
mergeMap((file) => uploadFile(file))
)
.subscribe((status) => console.log(status));
// Output (order may vary):
// Uploaded file1.txt
// Uploaded file2.txt
// Uploaded file3.txt
Explanation
- Each file is mapped to an Observable representing an upload operation.
- All uploads are initiated concurrently, and their statuses are logged as they complete.
Comparison with Other Operators
mergeMap
vs concatMap
mergeMap
: Executes all inner Observables concurrently.concatMap
: Processes one inner Observable at a time, maintaining order.
mergeMap
vs switchMap
mergeMap
: Subscribes to all inner Observables, regardless of the source emissions.switchMap
: Cancels the current inner Observable if a new value is emitted by the source Observable.
Common Use Cases
- Concurrent API Requests: Fetch data from multiple endpoints simultaneously.
- Parallel Data Processing: Process multiple tasks or files in parallel.
- Dynamic Event Handling: Handle overlapping user events like clicks or keypresses.
Best Practices
- Limit Concurrency: Use operators like
mergeMap
with a concurrency limit (viamergeMap
in combination with a custom implementation) for scenarios involving resource constraints. - Error Handling: Use operators like
catchError
to handle errors from inner Observables. - Use When Order Doesn’t Matter: Prefer
mergeMap
when the order of emissions is not critical.
Conclusion
The mergeMap
operator is a powerful tool for handling concurrent operations in RxJS. By mapping values to inner Observables and merging their outputs into a single stream, mergeMap
enables efficient parallel processing of asynchronous tasks. Whether you’re working with API calls, file uploads, or dynamic event handling, mastering mergeMap
is essential for building responsive and scalable reactive applications.
Start incorporating mergeMap
into your RxJS workflows to unlock its full potential!