import {Injectable} from '@angular/core';
import {forkJoin, mergeMap, Observable, ReplaySubject, Subject, Subscription, takeUntil, timer} from 'rxjs';
import {buffer, debounceTime, filter, map, share} from 'rxjs/operators';
import {LiveValueSubscriptionGQL} from './live-value.generated';

export type LiveValueFilterTo = {
  dataPointId: number;
  propertyId: number;
};

export type LiveValue = {
  id: number;
  dtype: string;
  timestamp: number;
  value: any; // string | number | boolean | null;
};

@Injectable({providedIn: 'root'})
export class LiveValueService {
  private liveValues$ = new Map<number, Map<number, Observable<LiveValue>>>();
  private takeUntilObs: Observable<void>[] = [];
  private buffer = new Subject<LiveValueFilterTo>();
  private buffer$ = this.buffer.pipe(
    buffer(this.buffer.pipe(debounceTime(10))),
    mergeMap((dataPointIdAndPropertyId: LiveValueFilterTo[]) => {
      const filter: number[] = [];
      dataPointIdAndPropertyId.forEach(pair => filter.push(pair.dataPointId, pair.propertyId));
      return this.liveValueSubscriptionGQL.subscribe({filter: filter}).pipe(takeUntil(forkJoin(this.getTakeUntilObs())));
    }),
    map(message => message.data.liveValueSubscription),
    share()
  );

  constructor(private liveValueSubscriptionGQL: LiveValueSubscriptionGQL) {}

  getObservable(dataPointIdAndPropertyId: LiveValueFilterTo): Observable<LiveValue> {
    let dpMap = this.liveValues$.get(dataPointIdAndPropertyId.dataPointId);
    if (!dpMap || !dpMap.has(dataPointIdAndPropertyId.propertyId)) {
      const obs = new Observable<LiveValue>(subscriber => {
        let sub: Subscription, takeUntilObs: Subject<void>;
        try {
          sub = this.buffer$
            .pipe(
              map(liveValues =>
                liveValues.find(lv => lv.dataPointId === dataPointIdAndPropertyId.dataPointId && lv.value.id === dataPointIdAndPropertyId.propertyId)
              ),
              filter(lv => !!lv),
              map(lv => lv.value)
            )
            .subscribe(subscriber);
          takeUntilObs = new Subject<void>();
          this.takeUntilObs.push(takeUntilObs);
          this.buffer.next(dataPointIdAndPropertyId);
        } catch (e) {
          subscriber.error(e);
        }
        return () => {
          subscriber.complete();
          takeUntilObs?.next();
          takeUntilObs?.complete();
          sub?.unsubscribe();
          dpMap.delete(dataPointIdAndPropertyId.propertyId);
          if (dpMap.size === 0) {
            this.liveValues$.delete(dataPointIdAndPropertyId.dataPointId);
          }
        };
      }).pipe(share({connector: () => new ReplaySubject(1), resetOnRefCountZero: () => timer(5000)}));
      if (!dpMap) {
        dpMap = new Map<number, Observable<LiveValue>>();
        this.liveValues$.set(dataPointIdAndPropertyId.dataPointId, dpMap);
      }
      dpMap.set(dataPointIdAndPropertyId.propertyId, obs);
    }

    return dpMap.get(dataPointIdAndPropertyId.propertyId);
  }

  private getTakeUntilObs(): Observable<void>[] {
    const obs = this.takeUntilObs;
    this.takeUntilObs = [];
    return obs;
  }
}
