import { DestroyRef, inject, Injectable, OnDestroy } from '@angular/core';
import { Observable, Subject } from 'rxjs';
import { fetchEventSource } from '@microsoft/fetch-event-source';
import { AuthService } from '../auth/auth.service';
import { ENV } from '@dm-workspace/types';
import { filter } from 'rxjs/operators';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

type EventSource = {
  [key: string]: {
    abortController: AbortController;
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    subject: Subject<any>;
  };
};

@Injectable({
  providedIn: 'root',
})
export class SseService implements OnDestroy {
  private readonly env = inject(ENV);
  private readonly destroyRef = inject(DestroyRef);
  private eventSubjects: EventSource = {};

  constructor(private authService: AuthService) {
    this.authService.loggedIn$
      .pipe(
        takeUntilDestroyed(this.destroyRef),
        filter((isLoggedIn) => !isLoggedIn)
      )
      .subscribe(() => this.closeAll());
  }

  public ngOnDestroy() {
    this.closeAll();
  }

  private getEventSubject<T>(url: string): Subject<T> {
    const subject = this.eventSubjects[url]?.subject;
    return subject && !subject.closed ? subject : null;
  }

  private getEventAbortController(url: string): AbortController {
    const abortController = this.eventSubjects[url]?.abortController;
    return abortController && !abortController.signal.aborted ? abortController : null;
  }

  private addEventSubject<T>(url: string): Subject<T> {
    const subject = this.getEventSubject(url);

    if (!subject) {
      this.eventSubjects[url] = { subject: new Subject<MessageEvent<T>>(), abortController: null };
    }
    return this.getEventSubject<T>(url);
  }

  public connect<T>(url: string, openWhenHidden?: boolean): Observable<T> {
    return this.getEventSubject<T>(url) || this.createEventSource<T>(url, openWhenHidden);
  }

  private createEventSource<T>(url: string, openWhenHidden?: boolean): Observable<T> {
    const eventSubject: Subject<T> = this.addEventSubject(url);
    let retries = 0;
    const MAX_RETRIES = 20;
    const RETRY_TIMEOUT = 3000;
    const abortController = new AbortController();
    const signal = abortController.signal;
    this.eventSubjects[url].abortController = abortController;

    const fetch_ = (url: RequestInfo | URL, init?: RequestInit): Promise<Response> => {
      // Custom fetch is used to get authorization header from AuthService
      // on every request not just when the connect method is called.

      const { headers, ...init_ } = init || {};
      const authorizedHeaders = {
        ...headers,
        Authorization: `Bearer ${this.authService.token}`,
      };

      return fetch(`${this.env.apiUrl}${url}`, { ...init_, headers: authorizedHeaders });
    };

    fetchEventSource(url, {
      signal,
      onmessage: (ev) => {
        retries = 0;
        const data = ev.data && JSON.parse(ev.data);
        eventSubject.next(data);
      },
      onerror: (error) => {
        if (retries < MAX_RETRIES) {
          retries++;
          return RETRY_TIMEOUT;
        } else {
          eventSubject.error(error);
          //Throwing an error here will terminate the connection and stop retrying
          throw new Error(error);
        }
      },
      onclose: () => {
        //Throwing an error will cause a retry using onerror
        throw new Error('SSE connection closed');
      },
      fetch: fetch_,
      openWhenHidden,
    });
    return this.getEventSubject<T>(url);
  }

  public close(url: string): void {
    this.getEventSubject(url)?.complete();
    this.getEventAbortController(url)?.abort();
    delete this.eventSubjects[url];
  }

  public closeAll(): void {
    Object.keys(this.eventSubjects).forEach((url) => this.close(url));
  }
}
