import { Observable, Subject, timer, BehaviorSubject } from 'rxjs';
import { filter, tap, map, take, switchMap, retryWhen, repeat, distinctUntilChanged, skip, } from 'rxjs/operators';
import { webSocket } from 'rxjs/webSocket';

const reconnectionDelay = 1000;

export class WebSocket {
  private status$: Subject<boolean> = new BehaviorSubject<boolean>(false);
  private attemptNr: number = 0;
  private ws: any;
  private address: string

  public messages$: Subject<unknown> = new Subject<unknown>();

  constructor(address: string) {
    this.address = address
    this.create();
    this.connectionStatus$.pipe(
      skip(1),
      filter(status => !status),
      tap(() => this.create()),
    ).subscribe();
  }

  public get connectionStatus$(): Observable<boolean> {
    return this.status$.pipe(distinctUntilChanged());
  }

  private create() {
    if (this.ws) {
      this.ws.unsubscribe()
    }
    const retryConnection = switchMap(() => {
      this.status$.next(false);
      this.attemptNr = this.attemptNr + 1;
      console.log(`Connection down (${this.address}), will attempt ${this.attemptNr} reconnection in ${reconnectionDelay}ms`);

      return timer(reconnectionDelay);
    });

    const openObserver = new Subject<Event>();
    openObserver.pipe(map((_) => true)).subscribe(this.status$);
    const closeObserver = new Subject<CloseEvent>();
    closeObserver.pipe(map((_) => false)).subscribe(this.status$);
    this.ws = webSocket<any>({
      url: this.address,
      openObserver,
      closeObserver,
    });
    this.ws.pipe(retryWhen((errs) =>
      errs.pipe(retryConnection,
        repeat()))).subscribe(this.messages$);
  }

  message(message: any) {
    this.connectionStatus$.pipe(
      filter(status => status),
      tap(() => this.ws.next(message)),
      take(1)
    ).subscribe();
  }
}