// Символ для обозначения конца потока данных для Pipe
export const PipeEnd = Symbol.for('Pipe/End');

/**
 * "Трубопровод данных": в один конец трубы кладём данные, из другого читаем
 * Является каналом обмена данными:
 * - Сам - итерируемый объект, который можно использовать в асинхронном цикле
 *         для реализации реакций на поступающие данные
 * - Для добавления данных в трубу используется метод {@link add}
 * - Чтобы закрыть поток, можно выполнить один из 2-х способов:
 *    - Вызвать метод {@link end}, передав туда последнее значение
 *    - Вызвать метод {@link add} со значением {@link Pipe.End}
 *
 * Пример использования (счётчик)?
 * ```typescript
 * const timerPipe = new Pipe<number>(0);
 *
 * // Используя timerPipe можно подписаться и получать акруальные данные.
 * // Есть 2 варината подключения:
 * // 1. В fow-await-of:
 * (async () => {
 *   for await (const tick of timerPipe) {
 *     console.log(tick);
 *   }
 * })();
 * // 2. Через callback:
 * timerPipe.listen((tick) => {
 *   console.log(tick);
 * });
 * // Каждый вызов получает актуальные данные, т.е. тут дважды будет выведено "0", затем "1" и т.д.
 *
 *
 * // Запускаем таймер
 * let tick = 0;
 * timerPipe.add(tick);
 * setInterval(() => {
 *   timerPipe.add(++tick);
 * }, 1000);
 *
 * // Если подписаться позже, будут приходить только новые значения:
 * setTimeout(() => {
 *   timerPipe.listen((tick) => {
 *     console.log(tick); // Этот слушатель начнёт получать значения начиная только с "6"
 *   });
 * }, 5500);
 * ```
 */
export class Pipe<Value> implements AsyncIterable<Value> {
  // Изменяется в Symbol.asyncIterator
  // @ts-ignore
  private value: Value;
  // Изменяется в Symbol.asyncIterator
  // @ts-ignore
  private resolver: (data: Value | Symbol) => void;
  private stepPromise!: Promise<Value | Symbol>;

  constructor(init: Value) {
    this.value = init;
    this.resolver = () => {};
    this.createPromise();
  }

  add(nextValue: Value) {
    this.resolver(nextValue);
    this.createPromise();
  }

  end(lastValue: Value) {
    this.resolver(lastValue);
    this.resolver(PipeEnd);
  }

  listen(listener: (progress: Value) => void) {
    let isRun = true;
    const stop = () => (isRun = false);

    (async () => {
      for await (const progress of this) {
        if (!isRun) {
          break;
        }

        listener(progress);
      }
    })();

    return stop;
  }

  get current(): Value {
    return this.value;
  }

  [Symbol.asyncIterator](): AsyncIterator<Value> {
    return this.NextValueGenerator();
  }

  private createPromise() {
    this.stepPromise = new Promise<Value | Symbol>((resolver) => (this.resolver = resolver));
  }

  private async *NextValueGenerator() {
    while (true) {
      const result: Symbol | Value = await this.stepPromise;

      if (result === PipeEnd) {
        yield this.value;
        break;
      }

      this.value = result as Value;
      yield this.value;
    }
  }

  static get End() {
    return PipeEnd;
  }
}
