import {BehaviorSubject, Observable, Observer, of, retry, Subscription} from 'rxjs';
import {delay, share, tap} from 'rxjs/operators';
import {QueueingSubject} from './queuing-subject';
import {RxWebSocketChannels} from './rx-web-socket-channels';

const defaultOptions = {
  channelJoinAction: 'JOIN',
  channelLeaveAction: 'LEAVE',
  filterJoinAction: 'ADD',
  filterLeaveAction: 'REMOVE',
  notifyAction: 'NOTIFY'
};

const connectionStatusOptions = {
  closed: 'Closed',
  connectionRetry: 'ConnectionRetry',
  open: 'Open',
  openedAfterRetry: 'openedAfterRetry'
};

interface IRxWsOptions {
  url: string;
  invalidUrl?: (e: Event) => Promise<string>;
  channelsMatch?: (source: any, target: any) => boolean;
  filtersMatch?: (source: any, target: any) => boolean;
  transformResponse?: (message: any) => any;
  handleResponseMessage?: (message: any, observer: Observer<any>) => any;
  transformRequest?: (message: any) => any;
  channelJoinAction?: string;
  channelLeaveAction?: string;
  filterJoinAction?: string;
  filterLeaveAction?: string;
  notifyAction?: string;
}

export class RxWebSocket {
  public connectionStatus = new BehaviorSubject<string>(connectionStatusOptions.closed);
  public connectionStatusOptions = connectionStatusOptions;
  public options: IRxWsOptions;
  public channels: RxWebSocketChannels;
  public observable: Observable<any>;
  public subject = new QueueingSubject();
  private retries = false;
  private openedOnce = false;

  constructor(options: IRxWsOptions) {
    if (!options.url) {
      throw new Error('WebSocket url not provided!');
    }

    this.options = Object.assign({}, defaultOptions, options);
    this.observable = this.connect();
    this.channels = new RxWebSocketChannels(this);
  }

  private connect(): Observable<any> {
    return new Observable(observer => this.socketObservable(observer)).pipe(
      retry({
        delay: errors =>
          of(errors).pipe(
            tap({
              next: () => {
                this.retries = true;
                this.connectionStatus.next(connectionStatusOptions.connectionRetry);
              }
            }),
            delay(5 * 1000)
          )
      }),
      share()
    );
  }

  private socketObservable(observer: Observer<any>) {
    const wsUri = (location.protocol === 'https:' ? 'wss:' : 'ws:') + '//' + this.options.url;
    const socket = new WebSocket(wsUri);
    let inputSubscription: Subscription;
    socket.onopen = () => {
      this.connectionStatus.next(this.openedOnce && this.retries ? connectionStatusOptions.openedAfterRetry : connectionStatusOptions.open);
      this.retries = false;
      this.openedOnce = true;
      inputSubscription = this.subject.subscribe(data => {
        let message = data;
        if (this.options.transformRequest) {
          message = this.options.transformRequest(JSON.parse(JSON.stringify(message)));
        }
        socket.send(JSON.stringify(message));
      });
    };
    socket.onmessage = response => {
      let message = JSON.parse(response.data);
      if (this.options.transformResponse) {
        message = this.options.transformResponse(message);
      }
      if (this.options.handleResponseMessage) {
        this.options.handleResponseMessage(message, observer);
      } else {
        observer.next(message);
      }
    };
    socket.onerror = error => {
      console.warn(error);
    };
    socket.onclose = event => {
      this.connectionStatus.next(connectionStatusOptions.closed);
      if (event.code < 1006) {
        observer.complete();
      } else if (this.options.invalidUrl) {
        // Invalid Url
        const promise = this.options.invalidUrl(event);
        promise
          .then(
            (url: string) => {
              this.options.url = url;
              observer.error(new Error(event.code.toString()));
            },
            () => {
              observer.error(new Error(event.code.toString()));
              observer.complete();
            }
          )
          .catch(console.error);
      } else {
        observer.error(new Error(event.code.toString()));
      }
    };
    return () => {
      setTimeout(() => {
        if (inputSubscription) {
          inputSubscription.unsubscribe();
        }
        socket.close();
      });
    };
  }
}
