/* eslint-disable @typescript-eslint/unbound-method */

/* eslint-disable @typescript-eslint/no-dynamic-delete */
import { ServerStreamingCall } from '@protobuf-ts/runtime-rpc';
import hash from 'object-hash';
import { Observable, Subscription } from 'observable-fns';
import { QueuedTask } from 'threads';
import { ObservablePromise } from 'threads/dist/observable-promise';
import { v4 as uuid } from 'uuid';

import { Listeners } from './types/stream';

export class ServerStream<StreamInput, StreamOutput> {
  taskFactory: (
    input: StreamInput,
  ) => QueuedTask<
    never,
    { observer: ObservablePromise<{ value: StreamOutput; input: StreamInput }> }
  >;
  observers: Record<
    string,
    ObservablePromise<{ value: StreamOutput; input: StreamInput }>
  >;
  listeners: Record<string, Listeners<StreamInput, StreamOutput>['onMessage']>;
  subscriptions: Record<
    string,
    Record<
      string,
      Subscription<{
        value: StreamOutput;
        input: StreamInput;
      }>
    >
  >;

  constructor(
    taskFactory: (input: StreamInput) => QueuedTask<
      never,
      {
        observer: ObservablePromise<{
          value: StreamOutput;
          input: StreamInput;
        }>;
      }
    >,
  ) {
    this.taskFactory = taskFactory;
    this.observers = {};
    this.listeners = {};
    this.subscriptions = {};
  }

  public async listen(
    listeners: Listeners<StreamInput, StreamOutput>,
    input: StreamInput,
  ) {
    const inputHash = hash(input != null ? input : {});
    let listenerKey = this.getKeyByValue(this.listeners, listeners.onMessage);
    if (listenerKey == null) {
      listenerKey = this.assignNewListenerKey(listeners.onMessage);
    }
    if (!(inputHash in this.observers)) {
      const { observer } = await this.createObserver(input);
      this.observers[inputHash] = observer;
    }
    if (!(inputHash in this.subscriptions)) {
      this.subscriptions[inputHash] = {};
    }
    if (!(listenerKey in this.subscriptions[inputHash])) {
      this.subscriptions[inputHash][listenerKey] = this.attachListeners(
        listeners,
        this.observers[inputHash],
      );
    }
  }

  public cancel(
    listener: Listeners<StreamInput, StreamOutput>['onMessage'],
    input: StreamInput,
  ) {
    const inputHash = hash(input != null ? input : {});
    const listenerKey = this.getKeyByValue(this.listeners, listener);
    if (listenerKey != null) {
      if (listenerKey in this.subscriptions[inputHash]) {
        const subscription = this.subscriptions[inputHash][listenerKey];
        subscription.unsubscribe();
        delete this.subscriptions[inputHash][listenerKey];
      }

      let isListenerUsed = false;
      for (const inputHashKey in this.subscriptions) {
        for (const subscriptionListenerKey in this.subscriptions[
          inputHashKey
        ]) {
          if (subscriptionListenerKey == listenerKey) {
            isListenerUsed = true;
            break;
          }
        }
        if (isListenerUsed) {
          break;
        }
      }

      if (!isListenerUsed) {
        if (listenerKey in this.listeners) {
          delete this.listeners[listenerKey];
        }
      }

      if (
        inputHash in this.subscriptions &&
        Object.values(this.subscriptions[inputHash]).length === 0
      ) {
        delete this.subscriptions[inputHash];
        if (inputHash in this.observers) {
          delete this.observers[inputHash];
        }
      }
    }
  }

  private attachListeners(
    listeners: Listeners<StreamInput, StreamOutput>,
    observer: ObservablePromise<{ value: StreamOutput; input: StreamInput }>,
  ) {
    return observer.subscribe(
      listeners.onMessage,
      listeners.onDisconnect,
      listeners.onComplete,
    );
  }

  private async createObserver(input: StreamInput) {
    const result = await this.taskFactory(input);
    return result;
  }

  private getKeyByValue(
    object: Record<string, unknown>,
    value: unknown,
  ): string | undefined {
    return Object.keys(object).find((key) => object[key] === value);
  }

  private assignNewListenerKey(
    listener: Listeners<StreamInput, StreamOutput>['onMessage'],
  ) {
    const newListenerKey = uuid();
    this.listeners[newListenerKey] = listener;
    return newListenerKey;
  }
}

export function createWorkerStreamObservable<
  StreamOutput extends object,
  StreamInput extends object,
  Input,
  Output,
>(
  inputConverter: (input: Input) => StreamInput,
  streamFactory: (
    streamInput: StreamInput,
    abortController: AbortController,
  ) => ServerStreamingCall<StreamInput, StreamOutput>,
  input: Input,
  outputConverter: (streamOutput: StreamOutput, input: Input) => Output,
): Observable<Output> {
  let subscriptionCounter = 0;
  let abortController: AbortController | null = null;
  let cancelStreamSubscription: (() => void) | null = null;

  const observer = new Observable<Output>((observer) => {
    abortController = new AbortController();
    const stream = streamFactory(inputConverter(input), abortController);
    cancelStreamSubscription = stream.responses.onNext(
      (message, error, complete) => {
        if (message != null) {
          observer.next(outputConverter(message, input));
        }
        if (error != null) {
          observer.error(error);
        }
        if (complete) {
          observer.complete();
        }
      },
    );
  });

  let observerSubscribe = observer.subscribe;
  observerSubscribe = observerSubscribe.bind(observer);

  observer.subscribe = (
    onNext,
    onError?: (error: unknown) => void,
    onComplete?: () => void,
  ) => {
    subscriptionCounter++;
    let subscription: Subscription<unknown> | null = null;
    if (typeof onNext == 'function') {
      subscription = observerSubscribe(onNext, onError, onComplete);
    } else {
      subscription = observerSubscribe(onNext);
    }
    let unsubscribe = subscription.unsubscribe;
    unsubscribe = unsubscribe.bind(subscription);
    subscription.unsubscribe = () => {
      subscriptionCounter--;

      if (subscriptionCounter <= 0) {
        if (cancelStreamSubscription != null) {
          cancelStreamSubscription();
        }
        abortController?.abort();
      }
      unsubscribe();
    };
    return subscription;
  };
  return observer;
}
