import { Observable, OperatorFunction, timer } from 'rxjs';
import { tap } from 'rxjs/operators';

/**
 * Execute a function if the source observable takes more than the specified time
 * Example:
 * ```
 * somePostRequest.pipe(
 *   executeDelayed(10000, ()=>{myPopup.open("Wow, this is taking a while")})
 * )
 * ```
 */
export function executeDelayed<T>(
  /** in milliseconds, how long to wait before triggering the side effect */
  delay: number,
  /** The function to trigger if the delay is met */
  fn: () => void,
  thisArg?: any
): OperatorFunction<T, T> {
  return function executeDelayedOperation(
    source: Observable<T>
  ): Observable<T> {
    let timerSub = timer(delay).subscribe(() => fn());
    return source.pipe(
      tap(
        () => {
          timerSub.unsubscribe();
          timerSub = timer(delay).subscribe(() => fn());
        },
        () => timerSub.unsubscribe(), // unsubscribe on error
        () => timerSub.unsubscribe()
      )
    );
  };
}
