From 53ca7700a56354a1d42d8c0b7c3a193a48c67a2a Mon Sep 17 00:00:00 2001 From: jj Date: Sat, 29 Mar 2025 14:12:35 +0000 Subject: [PATCH] web/queue: make completedWorkers into set, require pipelineResults --- .../components/queue/ProcessingQueue.svelte | 2 +- .../queue/ProcessingQueueItem.svelte | 2 +- web/src/components/queue/ProgressBar.svelte | 4 +-- web/src/lib/queen-bee/run-worker.ts | 2 +- web/src/lib/queen-bee/scheduler.ts | 20 ++++++++------- web/src/lib/state/queen-bee/queue.ts | 25 +++++++++++-------- web/src/lib/types/queue.ts | 4 +-- 7 files changed, 33 insertions(+), 26 deletions(-) diff --git a/web/src/components/queue/ProcessingQueue.svelte b/web/src/components/queue/ProcessingQueue.svelte index df848ee9..1aeed81c 100644 --- a/web/src/components/queue/ProcessingQueue.svelte +++ b/web/src/components/queue/ProcessingQueue.svelte @@ -40,7 +40,7 @@ return 100; } else if (item.state === "running") { return totalItemProgress( - item.completedWorkers?.length || 0, + item.completedWorkers.size, $currentTasks[item.runningWorker]?.progress?.percentage || 0, item.pipeline.length || 0 ); diff --git a/web/src/components/queue/ProcessingQueueItem.svelte b/web/src/components/queue/ProcessingQueueItem.svelte index f27d3031..4030ef5a 100644 --- a/web/src/components/queue/ProcessingQueueItem.svelte +++ b/web/src/components/queue/ProcessingQueueItem.svelte @@ -78,7 +78,7 @@ const starting = $t(`queue.state.starting.${runningWorker.type}`); if (info.pipeline.length > 1) { - const currentPipeline = (info.completedWorkers?.length || 0) + 1; + const currentPipeline = info.completedWorkers.size + 1; return `${starting} (${currentPipeline}/${info.pipeline.length})`; } return starting; diff --git a/web/src/components/queue/ProgressBar.svelte b/web/src/components/queue/ProgressBar.svelte index cbb1b877..59dcebc2 100644 --- a/web/src/components/queue/ProgressBar.svelte +++ b/web/src/components/queue/ProgressBar.svelte @@ -4,7 +4,7 @@ export let percentage: number = 0; export let workerId: string; export let runningWorkerId: string | undefined; - export let completedWorkers: string[] = []; + export let completedWorkers: Set;
@@ -13,7 +13,7 @@ class="progress" style="width: {Math.min(100, percentage || 0)}%" >
- {:else if completedWorkers?.includes(workerId)} + {:else if completedWorkers.has(workerId)}
{ const task = queueItems[item]; if (task.state === "running") { - // if the running worker isn't completed and wait to be called again - // (on worker completion) - if (!task.completedWorkers?.includes(task.runningWorker)) { + // if the running worker isn't completed, wait + // to be called again on worker completion + if (!task.completedWorkers.has(task.runningWorker)) { break; } - // if all workers are completed, then return the final file and go to next task - if (task.completedWorkers.length === task.pipeline.length) { - const finalFile = task.pipelineResults?.pop(); + // if all workers are completed, then return the + // the final file and go to the next task + if (task.completedWorkers.size === task.pipeline.length) { + const finalFile = task.pipelineResults.pop(); + if (finalFile) { itemDone(task.id, finalFile); continue; @@ -49,9 +51,9 @@ export const checkTasks = () => { // if current worker is completed, but there are more workers, // then start the next one and wait to be called again - for (let i = 0; i < task.pipeline.length; i++) { - if (!task.completedWorkers.includes(task.pipeline[i].workerId)) { - startPipeline(task.pipeline[i]); + for (const worker of task.pipeline) { + if (!task.completedWorkers.has(worker.workerId)) { + startPipeline(worker); break; } } diff --git a/web/src/lib/state/queen-bee/queue.ts b/web/src/lib/state/queen-bee/queue.ts index b8c52f93..4fb4dcf1 100644 --- a/web/src/lib/state/queen-bee/queue.ts +++ b/web/src/lib/state/queen-bee/queue.ts @@ -5,7 +5,7 @@ import { clearFileStorage, removeFromFileStorage } from "$lib/storage"; import { clearCurrentTasks, removeWorkerFromQueue } from "$lib/state/queen-bee/current-tasks"; import type { CobaltFileReference } from "$lib/types/storage"; -import type { CobaltQueue, CobaltQueueItem } from "$lib/types/queue"; +import type { CobaltQueue, CobaltQueueItem, CobaltQueueItemRunning } from "$lib/types/queue"; const clearPipelineCache = (queueItem: CobaltQueueItem) => { if (queueItem.state === "running" && queueItem.pipelineResults) { @@ -74,10 +74,13 @@ export function itemDone(id: string, file: CobaltFileReference) { export function pipelineTaskDone(id: string, workerId: string, file: CobaltFileReference) { update(queueData => { - if (queueData[id] && queueData[id].state === "running") { - queueData[id].pipelineResults = [...queueData[id].pipelineResults || [], file]; - queueData[id].completedWorkers = [...queueData[id].completedWorkers || [], workerId]; + const item = queueData[id]; + + if (item && item.state === 'running') { + item.pipelineResults.push(file); + item.completedWorkers.add(workerId); } + return queueData; }); @@ -87,13 +90,15 @@ export function pipelineTaskDone(id: string, workerId: string, file: CobaltFileR export function itemRunning(id: string, workerId: string) { update(queueData => { - if (queueData[id]) { - queueData[id] = { - ...queueData[id], - state: "running", - runningWorker: workerId, - } + const data = queueData[id] as CobaltQueueItemRunning; + + if (data) { + data.state = 'running'; + data.runningWorker = workerId; + data.completedWorkers ??= new Set(); + data.pipelineResults ??= []; } + return queueData; }); diff --git a/web/src/lib/types/queue.ts b/web/src/lib/types/queue.ts index 9a49c973..312f3ec7 100644 --- a/web/src/lib/types/queue.ts +++ b/web/src/lib/types/queue.ts @@ -23,8 +23,8 @@ export type CobaltQueueItemWaiting = CobaltQueueBaseItem & { export type CobaltQueueItemRunning = CobaltQueueBaseItem & { state: "running", runningWorker: string, - completedWorkers?: string[], - pipelineResults?: CobaltFileReference[], + completedWorkers: Set, + pipelineResults: CobaltFileReference[], }; export type CobaltQueueItemDone = CobaltQueueBaseItem & {