import { ObservableInput, OperatorFunction, Observable, Subject, defer, from } from 'rxjs';
import { exhaustMap, finalize, throttle } from 'rxjs/operators';

/**
 * Same as exhaustMap() operator (ignore all inputs whilst the inner Observable is running),
 * but it will memorize if a new value was sent whilst the observable was running, and emit a new observable when it's done with the first one.
 * See https://github.com/ReactiveX/rxjs/issues/5004 for the implementation request on github.
 *
 * @example
 * observable$.pipe(
 *  exhaustMapWithTrailing(() => fetchSomething())
 * );
 *
 * In this example, `fetchSomething()` could take time (ex: 3 seconds).
 * If the input observable sends 3 consecutive values, `fetchSomething()` will be sent for the first value, and memorize that it was sent a second and a third value.
 * Then, when the first value has been processed by `fetchSomething()`, the third value will be sent to `fetchSomething()`.
 */
export function exhaustMapWithTrailing<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R> {
    return (source): Observable<R> =>
        defer(() => {
            const release = new Subject();
            return source.pipe(
                throttle(() => release, { leading: true, trailing: true }),
                exhaustMap((value, index) => from(project(value, index)).pipe(finalize(() => release.next())) as Observable<R>)
            );
        });
}
