ylioo

Worker Pool

import * as Comlink from "comlink";

import debug from "./debug.ts";

type PWorker = {
  runner: Comlink.Remote<{
    run: (input: string) => Promise<string>;
  }>;
  status: "ready" | "runing";
};

export default class Pool {
  private pendingQ: ((worker: PWorker) => Promise<string>)[] = [];
  private filename: string;
  private maxWorkers: number;
  private workers: PWorker[];

  constructor(options: { filename: string; maxWorkers: number }) {
    this.filename = options.filename;
    this.maxWorkers = options.maxWorkers;
    this.workers = [];
  }

  async run(input: string) {
    let onResolve: (value: unknown) => void;
    const pendingSignal = new Promise((res) => (onResolve = res));
    let ret: string;
    let error: unknown = null;

    this.pendingQ.push((worker) =>
      worker.runner
        .run(input)
        .then(
          (res) => (ret = res),
          (err) => (error = err)
        )
        .catch((err) => (error = err))
        .finally(() => {
          onResolve(null);
        })
    );

    this.runNext();

    await pendingSignal;

    if (error !== null) {
      return Promise.reject(error);
    }

    return ret!;
  }

  private getWorker() {
    const readyWorker = this.workers.find(
      (worker) => worker.status === "ready"
    );
    if (readyWorker) {
      return readyWorker;
    }

    const worker: PWorker = {
      runner: Comlink.wrap(new Worker(this.filename)),
      status: "ready",
    };
    this.workers.push(worker);
    return worker;
  }

  private runNext() {
    if (
      this.workers.filter((w) => w.status === "runing").length <
        this.maxWorkers &&
      this.pendingQ.length > 0
    ) {
      const worker = this.getWorker();
      worker.status = "runing";
      const nextRun = this.pendingQ.shift()!;
      nextRun(worker).finally(() => {
        worker.status = "ready";
        this.runNext();
      });
      debug("WorkerPool runing: ", this.pendingQ.length, this.workers.length);
    }
  }
}