import { Observable } from 'rxjs';
import { exhaustMap, groupBy, mergeMap } from 'rxjs/operators';

/**
 * Operator to fetch resources by id.
 *
 * Requests for entity 1 will run concurrently with requests for entity 2.
 * In addition, when multiple requests are scheduled for entity 1, it will
 * only run the first one.
 */
export function fetchById<O>(request$: (id: string) => Observable<O>): (source$: Observable<string>) => Observable<O> {
  return (source$: Observable<string>) => {
    return source$.pipe(
      groupBy(id => {
        return id;
      }),
      mergeMap(grouped$ => {
        return grouped$.pipe(exhaustMap(request$));
      }),
    );
  };
}
