Learn RxJS
  • Introduction
  • Learn RxJS
    • Operators
      • Combination
        • combineAll
        • combineLatest
        • concat
        • concatAll
        • endWith
        • forkJoin
        • merge
        • mergeAll
        • pairwise
        • race
        • startWith
        • withLatestFrom
        • zip
      • Conditional
        • defaultIfEmpty
        • every
        • iif
        • sequenceEqual
      • Creation
        • ajax
        • create
        • defer
        • empty
        • from
        • fromEvent
        • generate
        • interval
        • of
        • range
        • throw
        • timer
      • Error Handling
        • catch / catchError
        • retry
        • retryWhen
      • Multicasting
        • publish
        • multicast
        • share
        • shareReplay
      • Filtering
        • audit
        • auditTime
        • debounce
        • debounceTime
        • distinct
        • distinctUntilChanged
        • distinctUntilKeyChanged
        • filter
        • find
        • first
        • ignoreElements
        • last
        • sample
        • single
        • skip
        • skipUntil
        • skipWhile
        • take
        • takeLast
        • takeUntil
        • takeWhile
        • throttle
        • throttleTime
      • Transformation
        • buffer
        • bufferCount
        • bufferTime
        • bufferToggle
        • bufferWhen
        • concatMap
        • concatMapTo
        • exhaustMap
        • expand
        • groupBy
        • map
        • mapTo
        • mergeMap / flatMap
        • mergeScan
        • partition
        • pluck
        • reduce
        • scan
        • switchMap
        • switchMapTo
        • toArray
        • window
        • windowCount
        • windowTime
        • windowToggle
        • windowWhen
      • Utility
        • tap / do
        • delay
        • delayWhen
        • dematerialize
        • finalize / finally
        • let
        • repeat
        • timeInterval
        • timeout
        • timeoutWith
        • toPromise
      • Full Listing
    • Subjects
      • AsyncSubject
      • BehaviorSubject
      • ReplaySubject
      • Subject
    • Recipes
      • Alphabet Invasion Game
      • Battleship Game
      • Breakout Game
      • Car Racing Game
      • Catch The Dot Game
      • Click Ninja Game
      • Flappy Bird Game
      • Game Loop
      • Horizontal Scroll Indicator
      • Http Polling
      • Lockscreen
      • Matrix Digital Rain
      • Memory Game
      • Mine Sweeper Game
      • Platform Jumper Game
      • Progress Bar
      • Save Indicator
      • Smart Counter
      • Space Invaders Game
      • Stop Watch
      • Swipe To Refresh
      • Tank Battle Game
      • Tetris Game
      • Type Ahead
      • Uncover Image Game
    • Concepts
      • RxJS Primer
      • Get started transforming streams with map, pluck, and mapTo
      • Time based operators comparison
      • RxJS v5 -> v6 Upgrade
Powered by GitBook
On this page
  • When all observables complete, emit the last emitted value from each.
  • Why use forkJoin?
  • Examples
  • Additional Resources
  1. Learn RxJS
  2. Operators
  3. Combination

forkJoin

PreviousendWithNextmerge

Last updated 1 year ago

signature: forkJoin(...args, selector : function): Observable

When all observables complete, emit the last emitted value from each.


💡 If you want corresponding emissions from multiple observables as they occur, try !

⚠ If an inner observable does not complete forkJoin will never emit a value!


Why use forkJoin?

This operator is best used when you have a group of observables and only care about the final emitted value of each. One common use case for this is if you wish to issue multiple requests on page load (or some other event) and only want to take action when a response has been received for all. In this way it is similar to how you might use .

Be aware that if any of the inner observables supplied to forkJoin error you will lose the value of any other observables that would or have already completed if you do not the . If you are only concerned with all inner observables completing successfully you can .

It's also worth noting that if you have an observable that emits more than one item, and you are concerned with the previous emissions forkJoin is not the correct choice. In these cases you may be better off with an operator like or .

Examples

Example 1: Using a dictionary of sources to make AJAX request

( )

// RxJS v6.5+
import { ajax } from 'rxjs/ajax';
import { forkJoin } from 'rxjs';

/*
  when all observables complete, provide the last
  emitted value from each as dictionary
*/
forkJoin(
  // as of RxJS 6.5+ we can use a dictionary of sources
  {
    google: ajax.getJSON('https://api.github.com/users/google'),
    microsoft: ajax.getJSON('https://api.github.com/users/microsoft'),
    users: ajax.getJSON('https://api.github.com/users')
  }
)
  // { google: object, microsoft: object, users: array }
  .subscribe(console.log);

Example 2: Observables completing after different durations

// RxJS v6+
import { interval, forkJoin, of } from 'rxjs';
import { delay, take } from 'rxjs/operators';

const myPromise = val =>
  new Promise(resolve =>
    setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
  );

/*
  when all observables complete, give the last
  emitted value from each as an array
*/
const example = forkJoin({
  //emit 'Hello' immediately
  sourceOne: of('Hello'),
  //emit 'World' after 1 second
  sourceTwo: of('World').pipe(delay(1000)),
  //emit 0 after 1 second
  sourceThree: interval(1000).pipe(take(1)),
  //emit 0...1 in 1 second interval
  sourceFour: interval(1000).pipe(take(2)),
  //promise that resolves to 'Promise Resolved' after 5 seconds
  sourceFive: myPromise('RESULT')
});
/*
 * Output:
 * { 
 *   sourceOne: "Hello", 
 *   sourceTwo: "World", 
 *   sourceThree: 0,
 *   sourceFour: 1,
 *   sourceFive: "Promise Resolved: RESULT"
 * }
 */
const subscribe = example.subscribe(val => console.log(val));

Example 3: Making a variable number of requests (uses deprecated API)

// RxJS v6+
import { mergeMap } from 'rxjs/operators';
import { forkJoin, of } from 'rxjs';

const myPromise = val =>
  new Promise(resolve =>
    setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
  );

const source = of([1, 2, 3, 4, 5]);
//emit array of all 5 results
const example = source.pipe(mergeMap(q => forkJoin(...q.map(myPromise))));
/*
  output:
  [
   "Promise Resolved: 1",
   "Promise Resolved: 2",
   "Promise Resolved: 3",
   "Promise Resolved: 4",
   "Promise Resolved: 5"
  ]
*/
const subscribe = example.subscribe(val => console.log(val));

Example 4: Handling errors on outside

// RxJS v6+
import { delay, catchError } from 'rxjs/operators';
import { forkJoin, of, throwError } from 'rxjs';

/*
  If any inner observables error, the error result
  will be emitted by catchError.
*/
const example = forkJoin({
  // emit 'Hello' immediately
  sourceOne: of('Hello'),
  // emit 'World' after 1 second
  sourceTwo: of('World').pipe(delay(1000)),
  // throw error
  sourceThree: throwError('This will error')
}).pipe(catchError(error => of(error)));

// output: 'This will Error'
const subscribe = example.subscribe(val => console.log(val));

Example 5: Getting successful results when one inner observable errors

// RxJS v6+
import { delay, catchError } from 'rxjs/operators';
import { forkJoin, of, throwError } from 'rxjs';

/*
  Emit values from successfully completed
  inner observables.
*/
const example = forkJoin({
  // emit 'Hello' immediately
  sourceOne: of('Hello'),
  // emit 'World' after 1 second
  sourceTwo: of('World').pipe(delay(1000)),
  // throw error
  sourceThree: throwError('This will error').pipe(catchError(error => of(error)))
});

/*
 * Output:
 * {
 *   sourceOne: "Hello",
 *   sourceTwo: "World",
 *   sourceThree: "This will error"
 * }
 */
const subscribe = example.subscribe(val => console.log(val));

Example 6: forkJoin in Angular

@Injectable()
export class MyService {
  makeRequest(value: string, delayDuration: number) {
    // simulate http request
    return of(`Complete: ${value}`).pipe(
      delay(delayDuration)
    );
  }
}

@Component({
  selector: 'my-app',
  template: `
    <div>
      <h2>forkJoin Example</h2>
      <ul>
        <li> {{propOne}} </li>
        <li> {{propTwo}} </li>
        <li> {{propThree}} </li>
      </ul>
    </div>
  `,
})
export class App {
  public propOne: string;
  public propTwo: string;
  public propThree: string;
  constructor(private _myService: MyService) {}

  ngOnInit() {
    // simulate 3 requests with different delays
    forkJoin({
      requestOne: this._myService.makeRequest('Request One', 2000),
      requestTwo: this._myService.makeRequest('Request Two', 1000),
      requestThree: this._myService.makeRequest('Request Three', 3000)
    })
    .subscribe(({requestOne, requestTwo, requestThree}) => {
      this.propOne = requestOne;
      this.propTwo = requestTwo;
      this.propThree = requestThree;
    });
  }
}

Additional Resources


( )

( )

( )

( )

( )

📰 - Official docs

- In Depth Dev Reference

📁 Source Code:

StackBlitz
StackBlitz
StackBlitz
StackBlitz
plunker
forkJoin
forkJoin
https://github.com/ReactiveX/rxjs/blob/master/src/internal/observable/forkJoin.ts
zip
Promise.all
catch
error correctly on the inner observable
catch the error on the outside
combineLatest
zip
StackBlitz