import {
	buffer,
	combineLatest,
	defer,
	exhaustAll,
	exhaustMap,
	finalize,
	first,
	from,
	merge,
	mergeAll,
	MonoTypeOperatorFunction,
	Observable,
	ObservableInput,
	OperatorFunction,
	pipe,
	skipUntil,
	Subject,
	Subscriber,
	throttle,
} from 'rxjs';
import { filter, map, switchMap, tap, withLatestFrom } from 'rxjs/operators';
import { Dictionary } from '@ngrx/entity';
import { EvasysLoadingStrategiesEnum } from '@evasys/globals/shared/enums/general/evasys-loadingStrategies.enum';
import { Counter } from '../general/counter';

/**
 * @deprecated use rxjs-function load
 */
export function loadData<T>(
	loadData: () => void,
	loadingState: () => Observable<boolean>,
	loadingStrategy: EvasysLoadingStrategiesEnum,
	many: boolean = true,
	itemCount: number = -1
): MonoTypeOperatorFunction<T[] | Dictionary<T> | T> {
	const counter = new Counter();
	return function <T>(source$: Observable<T[] | Dictionary<T> | T>): Observable<T[] | Dictionary<T> | T> {
		return new Observable((subscriber) => {
			const newSource$ =
				loadingStrategy === EvasysLoadingStrategiesEnum.STATEONLY
					? source$
					: combineLatest([loadingState(), source$]).pipe(
							filter(([loading]) => !loading),
							map(([, source]) => source)
					  );

			const strategyObserver = (strategySubscriber: Subscriber<T | T[] | Dictionary<T>>) => ({
				next: (data: T | T[] | Dictionary<T>) => strategySubscriber.next(data),
				error: (error: any) => strategySubscriber.error(error),
				complete: () => strategySubscriber.complete(),
			});

			switch (loadingStrategy) {
				case EvasysLoadingStrategiesEnum.STATEONLY:
					newSource$.pipe(loadFromStateOnly<T>()).subscribe(strategyObserver(subscriber));
					break;
				case EvasysLoadingStrategiesEnum.APIONLY:
					newSource$.pipe(loadFromAPIOnly<T>(loadData, counter)).subscribe(strategyObserver(subscriber));
					break;
				case EvasysLoadingStrategiesEnum.STATETHENAPI:
					newSource$.pipe(loadFromStateThenAPI<T>(loadData, counter)).subscribe(strategyObserver(subscriber));
					break;
				case EvasysLoadingStrategiesEnum.STATEFALLBACKAPI:
					newSource$
						.pipe(loadFromStateFallbackAPI<T>(loadData, counter, many, itemCount))
						.subscribe(strategyObserver(subscriber));
					break;
			}
		});
	};
}

function loadFromStateOnly<T>(): MonoTypeOperatorFunction<T[] | Dictionary<T> | T> {
	return (source$) => {
		return source$;
	};
}

function loadFromAPIOnly<T>(loadData: () => void, counter: Counter): MonoTypeOperatorFunction<T[] | Dictionary<T> | T> {
	return (source$) => {
		return source$.pipe(
			onIndex(0, loadData, counter),
			filter(() => {
				return counter.Count > 2;
			}),
			switchMap(() => source$)
		);
	};
}

function loadFromStateThenAPI<T>(
	loadData: () => void,
	counter: Counter
): MonoTypeOperatorFunction<T[] | Dictionary<T> | T> {
	return (source$) => {
		return source$.pipe(
			onIndex(0, loadData, counter),
			switchMap(() => source$)
		);
	};
}

function loadFromStateFallbackAPI<T>(
	loadData: () => void,
	counter: Counter,
	many: boolean = true,
	itemCount: number = -1
): MonoTypeOperatorFunction<T[] | Dictionary<T> | T> {
	return (source$) => {
		return source$.pipe(
			onIndex(
				0,
				(data) => {
					if (!many && !data) {
						loadData();
					} else if (many) {
						if (data) {
							const dataArr = Array.isArray(data) ? data : Object.values(data);
							if (
								(itemCount !== -1 && dataArr?.length !== itemCount) ||
								(itemCount === -1 && dataArr?.length === 0)
							) {
								loadData();
							} else if (itemCount !== -1 && dataArr?.length !== itemCount) {
								console.error(
									'Can not load from state with the fallback to the api: The number of items is ' +
										dataArr?.length +
										' although ' +
										itemCount +
										' were expected'
								);
							}
						} else {
							loadData();
						}
					}
				},
				counter
			),
			filter((data, index) => {
				if (!many && data) {
					return true;
				} else if (many) {
					const dataArr = Array.isArray(data) ? data : Object.values(data);
					if (
						(itemCount !== -1 && dataArr?.length === itemCount) ||
						(itemCount === -1 && dataArr?.length > 0) ||
						index > 1
					) {
						return true;
					} else if (itemCount !== -1 && dataArr?.length !== itemCount) {
						console.error(
							'Can not load from state with the fallback to the api (in filter): The number of items is ' +
								dataArr?.length +
								' although ' +
								itemCount +
								' were expected'
						);
					}
				}
				return false;
			}),
			switchMap(() => source$)
		);
	};
}

export function onIndex<T>(
	index: number,
	method: (data: T[] | Dictionary<T> | T) => void,
	counter: Counter
): MonoTypeOperatorFunction<T[] | Dictionary<T> | T> {
	return (source$) => {
		return source$.pipe(
			tap((v) => {
				if (counter.Count === index) {
					method(v);
				}
				counter.countUp();
			})
		);
	};
}

function asyncFilter<T>(
	asyncCondition: Observable<boolean> | Promise<boolean>
): MonoTypeOperatorFunction<T[] | Dictionary<T> | T> {
	return (source$) => {
		if (asyncCondition instanceof Promise) {
			let conditionResult = false;
			return source$.pipe(
				switchMap(async (x) => {
					try {
						conditionResult = await asyncCondition;
					} catch (err) {
						console.error('Condition in promise of async filter rejected: ', err);
					}
					return x;
				}),
				filter(() => conditionResult)
			);
		} else {
			return source$.pipe(
				withLatestFrom(asyncCondition),
				filter(([, condition]) => condition),
				map(([source]) => source)
			);
		}
	};
}

/**
 * Maps the input values to observables via an accumulator, rejecting all successive input values while the latest
 * returned observable has not completed.
 * The accumulator gets the last output of the latest returned observable as input as with the `scan` operator.
 * `exhaustScan` is to `scan` what rxjs' `exhaustMap` is to `map`.
 *
 * Marble diagram sketch with `inp` as the input, `out` as the output using an `accumulator` that maps according to
 * accumulator: `(acc: number, value: number) => of(acc+value).pipe(delay(5))`
 * i.e. the accumulator function outputs the sum of acc and value after 5 time steps and completes afterward.
 * ```
 * out = inp.pipe(exhaustScan(accumulator, 0));
 * inp: --1--2--------3-4-5-----6---7|
 * out: -------1-----------4---------10|
 * acc:   -----1|     -----4|   -----10|
 *        ^(0,1,0)    ^(1,3,1)  ^(4,6,2)
 * ```
 * Where the accumulator gets called thrice:
 * 1. with the parameters `acc=0` (the `seed`), `value=1`, `index=0` => returns 1 after 5 steps
 * 2. with the parameters `acc=1` (the previous result), `value=3`, `index=1` => returns 4 after 5 steps
 * 3. with the parameters `acc=4` (the previous result), `value=6`, `index=2` => returns 10 after 5 steps
 *
 * The input value 2 gets rejected as the observable returned by the first accumulator call has not yet completed.
 * Analogously, the values 4, 5, and 7 also get rejected.
 *
 * @param accumulator
 *   Map the latest observable output (or initial seed) and the current input observable value together
 *   with the call index to an observable.
 *   Gets called the next time only when the previous output observable completes.
 * @param seed
 *   The initial value for the `acc` parameter when the accumulator gets called the first time.
 */
export function exhaustScan<V, A>(
	accumulator: (acc: A, value: V, index: number) => ObservableInput<A>,
	seed: A
): OperatorFunction<V, A>;
export function exhaustScan<V, A, S>(
	accumulator: (acc: A | S, value: V, index: number) => ObservableInput<A>,
	seed: S
): OperatorFunction<V, A>;
export function exhaustScan<V, A, S>(
	accumulator: (acc: A | S, value: V, index: number) => ObservableInput<A>,
	seed: S
) {
	return (observable: Observable<V>) =>
		defer(() => {
			let acc: A | S = seed;

			return observable.pipe(
				exhaustMap((value, index) => accumulator(acc, value, index)),
				tap((value) => (acc = value))
			);
		});
}

/**
 * Like rxjs' audit operator but emits every time when the durationSelector emits, not just once per input emit.
 */
export function auditAlways<V>(durationSelector: (value: V) => ObservableInput<any>): MonoTypeOperatorFunction<V> {
	return pipe(switchMap((value) => from(durationSelector(value)).pipe(map(() => value))));
}

/**
 * Like rxjs' exhaustAll but, instead of ignoring _all_ inner observables while the currently selected one, it always
 * retains the most recent one and switches over to that after the currently selected one completes.
 */
export function exhaustAllWithTrailing<T>(): OperatorFunction<Observable<T>, T> {
	return (source) =>
		defer(() => {
			const release = new Subject<void>();
			return source.pipe(
				throttle(() => release, {
					leading: true,
					trailing: true,
				}),
				map((inner) => inner.pipe(finalize(() => release.next()))),
				exhaustAll()
			);
		});
}

/**
 * Like rxjs' withLatestFrom but ensures that all values from the source observable get emitted in the output, waiting
 * for an initial value of the input observable if necessary
 */
export function bufferWithLatestFrom<S, T>(input$: Observable<T>): OperatorFunction<S, [S, T]> {
	// TODO: Tests
	return (source$: Observable<S>) => {
		const f = input$.pipe(first());
		return merge(source$.pipe(buffer(f), mergeAll()), source$.pipe(skipUntil(f))).pipe(withLatestFrom(input$));
	};
}
