import { MessageFilterFunction } from '@air/lib/server-notifications/MessageFilters';

/**
 * Server Side Events consumers.
 *
 * Usually the type of received event would be the same as the type of
 * event passed to listeners.
 * In case of aggregated events though, instead of type T, processing function
 * would receive a collection of events, thus T[].
 */
export class MessageConsumer<T, S = T> {
  protected _listener: (event: S) => void = null;
  _process: (event: T) => void = null;
  _filter?: MessageFilterFunction<T, S>;

  constructor(filter?: MessageFilterFunction<T, S>) {
    this._filter = filter;
    this.setupProcessing();
  }

  setupProcessing() {
    this._process = this._filter
      ? this._filter(this._runProcessing.bind(this))
      : this._runProcessing.bind(this);
  }

  onEventReceived(event: T) {
    this._process(event);
  }

  subscribe = (cb: (event: S) => void) => {
    this._listener = cb;
    this.setupProcessing();
    return this.unsubscribe;
  };

  unsubscribe = () => {
    this._listener = null;
  };

  private _runProcessing(event: S): void {
    if (this._listener) {
      this._listener(event);
    }
  }
}

export class MessageConsumerPool {
  _processors: Map<number, MessageConsumer<any, any>>;

  constructor(private _filter: () => MessageFilterFunction<any, any>) {
    this._processors = new Map();
  }

  private _createSearchConsumer() {
    return new MessageConsumer<any, any>(this._filter());
  }

  getConsumerById(id: number): MessageConsumer<any, any> {
    let messageConsumer = this._processors.get(id);
    if (!messageConsumer) {
      messageConsumer = this._createSearchConsumer();
      this._processors.set(id, messageConsumer);
    }
    return messageConsumer;
  }
}
