import {
  BehaviorSubject,
  debounceTime,
  lastValueFrom,
  merge,
  Observable,
  ObservableInput,
  of,
  pairwise,
  Subject
} from "rxjs";
import {Injector} from "@angular/core";
import {IMonitor, IMonitorService, ITrigger} from "@nexnox-web/libs/core-shared/src";
import {isEqual, union} from "lodash";
import {catchError, filter, map, mergeMap, take, tap} from "rxjs/operators";

export class MonitorSet<T> {

  private monitors: IMonitor<T>[] = [];
  private keys: string[] = [];
  private changedKey: string;
  private bypassMaster = false;
  private valueChanges$: Observable<T>;
  private triggerSubject: Subject<ITrigger> = new Subject<ITrigger>();

  constructor(
    private injector: Injector,
    private model$: Observable<T>,
    private loadingSubject: BehaviorSubject<boolean>,
    private bypassSubject: BehaviorSubject<boolean>
  ) {
    this._initialize();
  }

  public get modifiedModel$(): Observable<T> {
    return this.valueChanges$;
  }

  public add(monitors: IMonitor<T>[]): void {
    for (let i = 0; i < monitors.length; i++) {
      this.keys = union(this.keys, monitors[i].keys);
      this.monitors.push(monitors[i]);
    }
  }

  public bypass(bypass: boolean): void {
    this.bypassMaster = bypass;
  }

  public trigger(name?: string, key?: string): Promise<void> {
    this.changedKey = key;
    return new Promise<void>((resolve) => this.triggerSubject.next({ name, promise: resolve }));
  }

  private _initialize(): void {
    this.valueChanges$ = merge(this._createMonitorStream(), this._createTriggerStream());
  }

  private _createMonitorStream(): Observable<T> {
    return this.model$.pipe(
      // Smooth stream for bypass detection
      debounceTime(1),
      // Start with pairwise
      pairwise(),
      // Filter by bypass
      filter(() => !this.bypassSubject.getValue() && !this.bypassMaster),
      // Filter distinct value by key
      filter(([previous, current]) => this._hasChanges(previous, current)),
      map(([_, current]) => current),
      // Start loading
      tap(() => this.loadingSubject.next(true)),
      // Execute modification
      mergeMap(async (model) => await this._applyModifications(model)),
      // End loading
      tap(() => this.loadingSubject.next(false))
    ) as Observable<T>;
  }

  private _createTriggerStream(): Observable<T> {
    return this.triggerSubject.asObservable().pipe(
      mergeMap(({ name, promise }) => this.model$.pipe(
        take(1),
        // Start loading
        tap(() => this.loadingSubject.next(true)),
        // Execute modification
        mergeMap(async (model) => await this._applyModifications(model, name)),
        // End loading
        tap(() => this.loadingSubject.next(false)),
        // Resolve trigger promise
        tap(() => promise())
      ))) as Observable<T>;
  }

  private _hasChanges(previous: T, current: T): boolean {
    const keys = this.keys.filter(key => !isEqual(previous[key], current[key]));
    if (keys.length > 0) this.changedKey = keys[0];
    return keys.length > 0;
  }

  private async _applyModifications(model: T, triggerName?: string): Promise<ObservableInput<T>> {

    let monitorsToTrigger: IMonitor<T>[] = this.monitors;
    let modified: T = model;

    if (Boolean(triggerName)) {
      monitorsToTrigger = this.monitors.filter(monitor => monitor.name === triggerName);
    } else if (Boolean(this.changedKey)) {
      monitorsToTrigger = this.monitors.filter(monitor => monitor.keys.includes(this.changedKey));
    }

    // Loop previews
    for await(const monitor of monitorsToTrigger) {
      const service: IMonitorService<T> = this.injector.get(monitor.service)
      modified = await lastValueFrom(service.modifyModel(monitor.payload, this.changedKey, modified).pipe(
        take(1),
        catchError(error => of(model))
      ));
    }

    this.changedKey = undefined;
    return modified;
  }
}
