import {
    of,
    Subscription,
    BehaviorSubject,
    MonoTypeOperatorFunction,
    Observable,
    Subject,
    debounceTime,
    finalize,
    merge,
    skip,
    switchMap,
    take,
    tap,
    mergeWith,
} from 'rxjs';

/**
 * @param {number} ttl in seconds
 * @param additionalChanges - operator can take changes from this Observable but will never
 * force changes from it.
 */
export function cache<E>(ttl: number, additionalChanges: Observable<E> = new Subject):
    MonoTypeOperatorFunction<E> {
    let item: E;
    let expired = true;
    const subject = new BehaviorSubject<null>(null);
    let subscribed = false; // should subscribe to source only one time
    let pendingRequestExists = false; // should not subscribe to source if already subscribed
    if (isFinite(ttl)) {
        subject.pipe(
            skip(1), // should not start expiring on init (subject is BehaviourSubject)
            debounceTime(ttl * 1000), // our tests can not mock Date.now(), but can mock setTimeout
            // so cache expiration implemented this way
        ).subscribe(() => expired = true);
    }

    return function cacheOperatorFunction(source: Observable<E>): Observable<E> {
        const sourceForUpdate = source.pipe(
            finalize(() => { // we should mark ourselves as unsubscribed if main source completed
                // in case of using merge before finalize we will never be notified that main
                // source was completed during force subscription
                // It matters for cases when source completes without emitting anything.
                pendingRequestExists = false;
                subscribed = false;
            }),
            mergeWith(additionalChanges),
            tap(newItem => {
                item = newItem;
                expired = false;
                subject.next(null); // trigger emission of current item and start expiration time
                pendingRequestExists = false;
            }),
        );
        const updateItem = (): Observable<E> => {
            let justSubscribed = false; // prevents calling source multiple times in case of
            // synchronous source
            if (!subscribed) {
                pendingRequestExists = true;
                justSubscribed = true;
                subscribed = true;
                sourceForUpdate.subscribe();
            }
            // should not subscribe to source if already subscribed
            if (!pendingRequestExists && !justSubscribed) {
                pendingRequestExists = true;
                return sourceForUpdate.pipe(
                    take(1),
                    switchMap(() => new Subject<E>()),
                );
            } else {
                return new Subject<E>(); // new Subject because we need Observable that will never
                // emit value, but will be able to propagate completion
            }
        };
        return subject.pipe(
            // if cache expired return Observable that will never emit. Then, when new value
            // arrived, of Observable will be emitted and subscriber will unsubscribe from our
            // fake observable
            switchMap(() => expired ? updateItem() : of(item)),
        );
    };
}

// More explicit version of `cache` above.
// TODO: if you are going to make changes in this code - cover it with tests first,
//       similar to the `cache` above.
export function cache2<E>(
    // General behavior is similar - cache the last emitted value for `ttl` seconds.
    ttl: number,
    // Emit&cache then new value on `push`, reset `ttl` timer too.
    push: Observable<any> = new Subject,
    // Force-resubscribe to the source (e.g. make a new request) when `refresh` emits.
    refresh: Observable<any> = new Subject,
    // Forget cached value on `reset`.
    reset: Observable<any> = new Subject,
): MonoTypeOperatorFunction<E> {
    return function cacheOperatorFunction(request: Observable<E>): Observable<E> {
        const emptyCache = Symbol('Empty cache');
        let cachedValue: E|typeof emptyCache = emptyCache;
        let requestSubscription: Subscription|undefined = undefined;
        let refreshSubscription: Subscription|undefined = undefined;
        let resetSubscription: Subscription|undefined = undefined;
        let pushSubscription: Subscription|undefined = undefined;
        const sharedEmitter = new Subject<E>();

        const makeRequest = () => {
            if (!requestSubscription) {
                // If request is not in progress yet, start it.
                requestSubscription = request.pipe(
                    debounceTime(0),  // force async, just in case?
                    finalize(() => requestSubscription = undefined),
                    tap((value) => {
                        cachedValue = value;
                        sharedEmitter.next(value);
                    }),
                ).subscribe();
            }
        }

        return new Observable<E>((subscriber) => {
            if (!resetSubscription) {
                const mergedObservables = isFinite(ttl)
                    ? merge(reset, sharedEmitter.pipe(debounceTime(ttl * 1000)))
                    : reset;

                resetSubscription =  mergedObservables
                    .pipe(tap(() => (cachedValue = emptyCache))).subscribe();
            }
            if (!refreshSubscription) {
                refreshSubscription = refresh.pipe(
                    tap(() => makeRequest()),
                ).subscribe();
            }
            if (!pushSubscription) {
                pushSubscription = push.pipe(
                    tap((value) => {
                        cachedValue = value;
                        sharedEmitter.next(value);
                    }),
                ).subscribe();
            }

            if (cachedValue !== emptyCache) {
                return merge(of(cachedValue), sharedEmitter).subscribe(subscriber);
            } else {
                makeRequest();
                return sharedEmitter.subscribe(subscriber);
            }
        });
    };
}
