export type MessageFilterFunction<T, S> = (
  onComplete: (event: S) => void
) => (event: T) => void;

// An example of a simplest filter, which does nothing,
// except passing an event further into a pipe
export function simpleFilter<T>(): MessageFilterFunction<T, T> {
  return (onComplete) => (event) => {
    onComplete(event);
  };
}

export function aggregateTimeout<T>(
  _timeout: number
): MessageFilterFunction<T, T[]> {
  let _queue: T[] = [];
  let _intervalId: number = null;
  return (onComplete) => (event) => {
    if (_queue.length === 0) {
      clearTimeout(_intervalId);
      _intervalId = window.setTimeout(() => {
        onComplete(_queue);
        _queue = [];
      }, _timeout);
    }
    _queue.push(event);
  };
}

/**
 * This filter pushes events forward to external processor only if
 * its internal queue collects enough of events
 * (the amount is determined by limit parameter).
 */

export function aggregateLimit<T>(
  _limit: number
): MessageFilterFunction<T, T[]> {
  let _queue: T[] = [];
  return (onComplete) => (event) => {
    _queue.push(event);

    if (_queue.length >= _limit) {
      onComplete(_queue);
      _queue = [];
    }
  };
}

export function getLastFromBatch<T>(): MessageFilterFunction<T[], T> {
  return (onComplete) => (events) => {
    if (events.length > 0) onComplete(events.pop());
  };
}

export const pipeFilters = <T, S, R>(
  filterA: MessageFilterFunction<T, S>,
  filterB: MessageFilterFunction<S, R>
): MessageFilterFunction<T, R> => {
  return (onComplete) => filterA(filterB(onComplete));
};
