Understanding Async Javascript & Observables - Part 2

Async Javascript Mar 31, 2020

This is the Part 2 of the series of blog posts on Async Javascript & Observables. In Part 1, I had discussed about understanding Observables from its first principles, its origins and had given a sneak-peek of its async programming style. In PART 2, I will be going through the following:

  • Deep dive into how events are converted into Observable  
  • The basic pattern or design behind all RxJS operators
  • Walk-through of map + flatten operators

Converting Events into Observables

The fromEvent RxJS operator converts any DOM event into an Observable. And, Observable of course is the most basic building block of RxJS.  For understanding purposes, I'm using this stripped down version of  the Observable class. (Here's its actual implementation)

export class Observable {
  constructor(subscribe) {
    if (subscribe) {
      this._subscribe = subscribe;
    }
  }

  /**
   * @param {Observer|Function} observerOrNext (optional)
   * @param {Function} error (optional)
   * @params {Function} complete (optional)
   * from successful operation
   */
  subscribe(observerOrNext, error, complete) {
    if (typeof observerOrNext === 'function') {
      return this._subscribe({
        next: observerOrNext,
        error: error,
        complete: complete
      });
    } else {
      return this._subscribe(observerOrNext);
    }
  }
}
Observable class

Basically, the only thing needed to construct an Observable is our subscribe function. i.e our callback implementation which will be called, when the Observable is actually subscribed. The subscribe method takes in three parameters where the first param could be an Observer object which contains, next, error and complete methods or the next function handler.
Now let us go through how we can take an event based API and convert it into an Observable.

function fromEvent(dom, eventName) {
  return new Observable(function subscribe(observer) {
    var handler = $event => observer.next($event);

    // Add event listener to the dom
    dom.addEventListener(eventName, handler);

    // Subscription
    return {
      unsubscribe: () => {
        dom.removeEventListener(eventName, handler);
      }
    };
  });
}
Stripped down implementation of fromEvent RxJS operator

Let's understand what's happening here.

Firstly, we are creating new instance of Observable and returning it. And, when our Observable is actually subscribed, the fromEvent  subscribe callback implementation will be executed.

Now let's understand what's happening in the callback implementation;
1.We are adding in event listener to the DOM
2.When the event fires, the handler is called, where our handler passes the event   object to the observer.
3. Lastly, it returns a subscription object, so that the one who subscribed to our Observable can unsubscribe as well.

Transforming Observables - map operator

We saw how we can convert events into Observables using fromEvent. Now, we will see how we can do transform operation like map. Many of us would be familiar with how we can map over a Javascript array. It is not any harder to write map over an Observable as we will see below:

function map(projectionFunction) {
  return function mapOperation(sourceObs) {
    // new mapped observable
    return new Observable(function subscribe(observer) {
      const sourceSubsription = sourceObs.subscribe(
        nextValue => {
          const transformedValue = projectionFunction(nextValue);
          observer.next(transformedValue);
        },
        error => {
          observer.error(error);
        },
        complete => {
          observer.complete();
        }
      );
      return sourceSubsription;
    });
  };
}
Stripped down implementation of map RxJS operator

Here, firstly the map function returns a function (that takes the source Observable as parameter) which when called returns an Observable ( we can call it mapped Observable). When this mapped Observable is subscribed, we subscribe to actual or the source Observable.
Now, whenever we receive a value from source Observable we hand it over to the observer of mapped Observable but with one change i.e we transform the value by applying the projectionFunction. Similarly, for error and complete events will be handed to the mapped observer.

Similarly we can implement other operators like filter, take and others, the pattern is the same.

Mapping + Flattening RxJS Operators

Mapping is straight forward as long as it doesn't return an Observable.

this.doctorService.getDoctorBySlug(slug).pipe(
  map((doctor: Doctor) => {
    return `${doctor.first_name} ${doctor.last_name}`
  })
).subscribe((fullName: string) => {
  console.log(fullName);
});

Here it prints the doctor's first name and the last name. But things get a bit tricky when map function returns an Observable . Like shown in the below example:

this.getDoctorSlug().pipe(
  map(() => {
    return this.doctorDiscoveryService.getDoctorBySlug(this.doctorSlug);
  })
).subscribe((doctor: Doctor) => {
  console.log(doctor.first_name + doctor.last_name);
});

Here it would throw an error because the doctor returned is actually an Observable

Flattening to the rescue

Flattening is basically subscribing inside a subscribe.

The only way to extract values out of the returned observable will be to subscribe to it.

this.getDoctorSlug().pipe(
  map(() => {
    return this.doctorDiscoveryService.getDoctorBySlug(this.doctorSlug);
  })
).subscribe((resultObs: Observable) => {
  	resultObs.subscribe((doctor: Doctor) => {
    	console.log(`${doctor.first_name} ${doctor;last_name}`);
    });
});

So in the above example, what we have done is to apply a merge strategy. Whenever we flatten an Observable, we need a flattening strategy.

There are four flattening strategies and for each Flattening strategy RxJS has an operator that deals with it:

1. Merge Strategy ( mergeMap ) - here we just keep subscribing to every new observable that we return from the map.

this.getDoctorSlug().pipe(
  mergeMap(() => {
    return this.doctorDiscoveryService.getDoctorBySlug(this.doctorSlug);
  })
).subscribe((doctor: Doctor) => {
    console.log(`${doctor.first_name} ${doctor;last_name}`);
});

Marble diagram explanation:

  • each value of the source Observable is still being mapped into an inner Observable
  • that inner Observable is subscribed to by mergeMap
  • as the inner Observables emit new values, they are immediately reflected in the output Observable
  • but unlike concatMap, in the case of mergeMap we don't have to wait for the previous inner Observable to complete before triggering the next innner Observable
  • we can have multiple inner Observables overlapping over time, emitting values in parallel like we see highlighted in red in the picture


2. Switch Strategy  ( switchMap ) - unsubscribing from the last mapped observable, when the new one arrives.

  startPollingConsultation(consultationId: string) {
    const pollingInterval = 5000;
    return interval(pollingInterval).pipe(
      startWith(0),
      switchMap(() => {
        return this.getConsultationById(consultationId);
      }),
      catchError(err => {
        return throwError(err);
      })
    );
  }

Marble diagram explanation:

  • the source observable emits values 1, 3 and 5
  • these values are then turned into Observables by applying a mapping function.
  • the mapped inner Observables get subscribed to by switchMap
  • when the inner Observables emit a value, the value gets immediately reflected in the output
  • but if a new value like 5 gets emitted before the previous Observable got a chance to complete, the previous inner Observable (30-30-30) will be unsubscribed from, and its values will no longer be reflected in the output.
  • notice the 30-30-30 inner Observable in red in the diagram above: the last 30 value was not emitted because the 30-30-30 inner Observable got unsubscribed from


3. Concat Strategy ( concatMap )  - Queuing up every new Observable, and subscribing to a new observable only when the last observable completed.

this.paymentService
  .getPaymentById(orderId)
  .pipe(
    concatMap((payment: Payment) => {
      return this.orderService.getOrderById(payment.service_reference_id);
    })
  )
  .subscribe((order: Order) => { this.order = order; });

Marble diagram explanation:

  • each value of the source Observable is still being mapped into an inner Observable
  • that inner Observable is subscribed to by concatMap
  • as the inner Observables emit new values, they are immediately reflected in the output Observable
  • but we have to wait for the previous inner Observable to complete before triggering the next innner Observable
  • That is why you dont see 50 being emitted before the last 30 as it waits till inner observable completes and only then proceeds.


4. Exhaust Strategy ( exhaustMap) - ignores (and never subscribe to) any new mapped Observable while the current Observable is still emitting values. This can used in scenarios like login.
Let’s say you have a login screen with a login button, where you map each click to an login ajax request. If the user clicks more than once on the login button, it will cause multiple calls to the server. exhaustMap can be used here to avoid making multiple calls.

fromEvent(loginBtnElement, 'click').pipe(
 exhaustMap((event: any) => {
	return this.loginService.login()
 })
).subscribe((loginResponse: any) => {
	// on successful login
});

Marble diagram explanation:

  • the value 1 gets emitted, and a inner Observable 10-10-10 is created
  • the Observable 10-10-10 emits all values and completes before the value 3 gets emitted in the source Observable, so all 10-10-10 values where emitted in the output
  • a new value 3 gets emitted in the input, that triggers a new 30-30-30 inner Observable
  • but now, while 30-30-30 is still running, we get a new value 5 emitted in the source Observable
  • this value 5 is discarded by the exhaust strategy, meaning that a 50-50-50 Observable was never created, and so the 50-50-50 values never showed up in the output

Conclusion

So, whether it is animation, async request or events it has the same pattern, we map over a stream and use the right flattening strategy as shown above (which is the hard part). Flattening strategy is all about  co-ordinating concurrency or things happening at the same time. Essentially, all the challenges involved in async programming can be summed up to this question - which flattening strategy to use?

Most problems are really about zooming out and finding the right level of abstraction to tackle them. And, from this two part series of blog posts, we see that events as stream turns out to be the right level of abstraction to tackle challenges involved in async programming.

Join us?
We are always looking out to hire for all roles in our tech team. If challenging problems that drive big impact enthral you, do reach out to us at careers.india@halodoc.com


About Halodoc

Halodoc is the Number #1 all around Healthcare application in Indonesia. Our mission is to simplify and bring quality healthcare across Indonesia, from Sabang to Merauke.
We connect 20,000+ doctors with patients in need through our teleconsultation service, we partner with 1500+ pharmacies in 50 cities to bring medicine to your doorstep, we partner with Indonesia's largest lab provider to provide lab home services, and to top it off we have recently launched a premium appointment service that partners with 500+ hospitals that allows patients to book a doctor appointment inside our application.
We are extremely fortunate to be trusted by our investors, such as the Bill & Melinda Gates foundation, Singtel, UOB Ventures, Allianz, Gojek and many more. We recently closed our Series B round and In total have raised USD$100million for our mission.
Our team work tirelessly to make sure that we create the best healthcare solution personalized for all of our patient's needs, and are continuously on a path to simplify healthcare for Indonesia.