diff --git a/web/src/components/queue/ProcessingQueueItem.svelte b/web/src/components/queue/ProcessingQueueItem.svelte index 1535d7c5..d9972012 100644 --- a/web/src/components/queue/ProcessingQueueItem.svelte +++ b/web/src/components/queue/ProcessingQueueItem.svelte @@ -96,7 +96,8 @@ // if only fetch workers are running, then we should // show the sum of all running & completed fetch workers if (running.size === 1 && running.has("fetch")) { - totalSize += info.pipelineResults.reduce((s, p) => s + (p?.size ?? 0), 0); + totalSize += Object.values(info.pipelineResults) + .reduce((s, p) => s + (p?.size ?? 0), 0); } const runningText = [...running].map(task => $t(`queue.state.running.${task}`)).join(", "); @@ -107,7 +108,7 @@ } const firstUnstarted = info.pipeline.find(w => { - if (info.completedWorkers.has(w.workerId)) + if (info.pipelineResults[w.workerId]) return false; const task = currentTasks[w.workerId]; @@ -134,7 +135,7 @@ }; const getWorkerProgress = (item: CobaltQueueItem, workerId: UUID): number | undefined => { - if (item.state === 'running' && item.completedWorkers.has(workerId)) { + if (item.state === 'running' && item.pipelineResults[workerId]) { return 100; } @@ -187,7 +188,7 @@ {/each} diff --git a/web/src/components/queue/ProgressBar.svelte b/web/src/components/queue/ProgressBar.svelte index 138f1fe7..f5db2dc5 100644 --- a/web/src/components/queue/ProgressBar.svelte +++ b/web/src/components/queue/ProgressBar.svelte @@ -1,14 +1,14 @@
@@ -17,7 +17,7 @@ class="progress" style="width: {Math.min(100, percentage)}%" >
- {:else if completedWorkers.has(workerId)} + {:else if pipelineResults[workerId]}
{ if (queueItem.state === "running") { - let item: File | undefined; - while ((item = queueItem.pipelineResults.pop())) { - removeFromFileStorage(item.name); + for (const [ workerId, item ] of Object.entries(queueItem.pipelineResults)) { + if (item.name !== DUMMY_FILE.name) { + removeFromFileStorage(item.name); + queueItem.pipelineResults[workerId] = DUMMY_FILE; + } } } else if (queueItem.state === "done") { removeFromFileStorage(queueItem.resultFile.name); @@ -75,8 +79,7 @@ export function pipelineTaskDone(id: UUID, workerId: UUID, file: File) { const item = queueData[id]; if (item && item.state === 'running') { - item.pipelineResults.push(file); - item.completedWorkers.add(workerId); + item.pipelineResults[workerId] = file; } return queueData; @@ -92,8 +95,7 @@ export function itemRunning(id: UUID) { if (data) { data.state = 'running'; - data.completedWorkers ??= new Set(); - data.pipelineResults ??= []; + data.pipelineResults ??= {}; } return queueData; diff --git a/web/src/lib/task-manager/queue.ts b/web/src/lib/task-manager/queue.ts index d4638f6f..85852082 100644 --- a/web/src/lib/task-manager/queue.ts +++ b/web/src/lib/task-manager/queue.ts @@ -217,7 +217,7 @@ export const getProgress = (item: CobaltQueueItem, currentTasks: CobaltCurrentTa let sum = 0; for (const worker of item.pipeline) { - if (item.completedWorkers.has(worker.workerId)) { + if (item.pipelineResults[worker.workerId]) { sum += 1; } else { const task = currentTasks[worker.workerId]; diff --git a/web/src/lib/task-manager/run-worker.ts b/web/src/lib/task-manager/run-worker.ts index bb732cf3..4e68f3bb 100644 --- a/web/src/lib/task-manager/run-worker.ts +++ b/web/src/lib/task-manager/run-worker.ts @@ -12,7 +12,7 @@ export const killWorker = (worker: Worker, unsubscribe: () => void, interval?: N if (interval) clearInterval(interval); } -export const startWorker = async ({ worker, workerId, parentId, workerArgs }: CobaltPipelineItem) => { +export const startWorker = async ({ worker, workerId, dependsOn, parentId, workerArgs }: CobaltPipelineItem) => { let files: File[] = []; switch (worker) { @@ -22,10 +22,15 @@ export const startWorker = async ({ worker, workerId, parentId, workerArgs }: Co files = workerArgs.files; } - if (files.length === 0) { - const parent = get(queue)[parentId]; - if (parent.state === "running" && parent.pipelineResults.length) { - files = parent.pipelineResults; + const parent = get(queue)[parentId]; + if (parent?.state === "running" && dependsOn) { + for (const workerId of dependsOn) { + const file = parent.pipelineResults[workerId]; + if (!file) { + return itemError(parentId, workerId, "queue.ffmpeg.no_args"); + } + + files.push(file); } } diff --git a/web/src/lib/task-manager/scheduler.ts b/web/src/lib/task-manager/scheduler.ts index b4aa3e28..ec8070ac 100644 --- a/web/src/lib/task-manager/scheduler.ts +++ b/web/src/lib/task-manager/scheduler.ts @@ -1,7 +1,7 @@ import { get } from "svelte/store"; import { startWorker } from "$lib/task-manager/run-worker"; import { addWorkerToQueue, currentTasks } from "$lib/state/task-manager/current-tasks"; -import { itemDone, itemError, itemRunning, queue } from "$lib/state/task-manager/queue"; +import { DUMMY_FILE, itemDone, itemError, itemRunning, queue } from "$lib/state/task-manager/queue"; import type { CobaltPipelineItem } from "$lib/types/workers"; @@ -27,8 +27,11 @@ export const schedule = () => { // if all workers are completed, then return the // the final file and go to the next task - if (task.completedWorkers.size === task.pipeline.length) { - const finalFile = task.pipelineResults.pop(); + if (Object.keys(task.pipelineResults).length === task.pipeline.length) { + // swap final file for a dummy, so that it doesn't get + // deleted when we clean up the intermediate files + const finalFile = task.pipelineResults[finalWorker.workerId]; + task.pipelineResults[finalWorker.workerId] = DUMMY_FILE; if (finalFile) { itemDone(task.id, finalFile); @@ -42,11 +45,11 @@ export const schedule = () => { // if current worker is completed, but there are more workers, // then start the next one and wait to be called again for (const worker of task.pipeline) { - if (task.completedWorkers.has(worker.workerId) || ongoingTasks[worker.workerId]) { + if (task.pipelineResults[worker.workerId] || ongoingTasks[worker.workerId]) { continue; } - const needsToWait = worker.dependsOn?.some(id => !task.completedWorkers.has(id)); + const needsToWait = worker.dependsOn?.some(id => !task.pipelineResults[id]); if (needsToWait) { break; } diff --git a/web/src/lib/types/queue.ts b/web/src/lib/types/queue.ts index 242d75e4..e542808b 100644 --- a/web/src/lib/types/queue.ts +++ b/web/src/lib/types/queue.ts @@ -19,8 +19,7 @@ type CobaltQueueItemWaiting = CobaltQueueBaseItem & { export type CobaltQueueItemRunning = CobaltQueueBaseItem & { state: "running", - completedWorkers: Set, - pipelineResults: File[], + pipelineResults: Record, }; type CobaltQueueItemDone = CobaltQueueBaseItem & {