web/queue: make completedWorkers into set, require pipelineResults

This commit is contained in:
jj 2025-03-29 14:12:35 +00:00
parent d78ae8124f
commit 53ca7700a5
No known key found for this signature in database
7 changed files with 33 additions and 26 deletions

View File

@ -40,7 +40,7 @@
return 100;
} else if (item.state === "running") {
return totalItemProgress(
item.completedWorkers?.length || 0,
item.completedWorkers.size,
$currentTasks[item.runningWorker]?.progress?.percentage || 0,
item.pipeline.length || 0
);

View File

@ -78,7 +78,7 @@
const starting = $t(`queue.state.starting.${runningWorker.type}`);
if (info.pipeline.length > 1) {
const currentPipeline = (info.completedWorkers?.length || 0) + 1;
const currentPipeline = info.completedWorkers.size + 1;
return `${starting} (${currentPipeline}/${info.pipeline.length})`;
}
return starting;

View File

@ -4,7 +4,7 @@
export let percentage: number = 0;
export let workerId: string;
export let runningWorkerId: string | undefined;
export let completedWorkers: string[] = [];
export let completedWorkers: Set<string>;
</script>
<div class="file-progress">
@ -13,7 +13,7 @@
class="progress"
style="width: {Math.min(100, percentage || 0)}%"
></div>
{:else if completedWorkers?.includes(workerId)}
{:else if completedWorkers.has(workerId)}
<div
class="progress"
style="width: 100%"

View File

@ -24,7 +24,7 @@ export const startWorker = async ({ worker, workerId, parentId, workerArgs }: Co
if (files?.length === 0) {
const parent = get(queue)[parentId];
if (parent.state === "running" && parent.pipelineResults) {
if (parent.state === "running" && parent.pipelineResults.length) {
files = parent.pipelineResults;
}
}

View File

@ -29,15 +29,17 @@ export const checkTasks = () => {
const task = queueItems[item];
if (task.state === "running") {
// if the running worker isn't completed and wait to be called again
// (on worker completion)
if (!task.completedWorkers?.includes(task.runningWorker)) {
// if the running worker isn't completed, wait
// to be called again on worker completion
if (!task.completedWorkers.has(task.runningWorker)) {
break;
}
// if all workers are completed, then return the final file and go to next task
if (task.completedWorkers.length === task.pipeline.length) {
const finalFile = task.pipelineResults?.pop();
// if all workers are completed, then return the
// the final file and go to the next task
if (task.completedWorkers.size === task.pipeline.length) {
const finalFile = task.pipelineResults.pop();
if (finalFile) {
itemDone(task.id, finalFile);
continue;
@ -49,9 +51,9 @@ export const checkTasks = () => {
// if current worker is completed, but there are more workers,
// then start the next one and wait to be called again
for (let i = 0; i < task.pipeline.length; i++) {
if (!task.completedWorkers.includes(task.pipeline[i].workerId)) {
startPipeline(task.pipeline[i]);
for (const worker of task.pipeline) {
if (!task.completedWorkers.has(worker.workerId)) {
startPipeline(worker);
break;
}
}

View File

@ -5,7 +5,7 @@ import { clearFileStorage, removeFromFileStorage } from "$lib/storage";
import { clearCurrentTasks, removeWorkerFromQueue } from "$lib/state/queen-bee/current-tasks";
import type { CobaltFileReference } from "$lib/types/storage";
import type { CobaltQueue, CobaltQueueItem } from "$lib/types/queue";
import type { CobaltQueue, CobaltQueueItem, CobaltQueueItemRunning } from "$lib/types/queue";
const clearPipelineCache = (queueItem: CobaltQueueItem) => {
if (queueItem.state === "running" && queueItem.pipelineResults) {
@ -74,10 +74,13 @@ export function itemDone(id: string, file: CobaltFileReference) {
export function pipelineTaskDone(id: string, workerId: string, file: CobaltFileReference) {
update(queueData => {
if (queueData[id] && queueData[id].state === "running") {
queueData[id].pipelineResults = [...queueData[id].pipelineResults || [], file];
queueData[id].completedWorkers = [...queueData[id].completedWorkers || [], workerId];
const item = queueData[id];
if (item && item.state === 'running') {
item.pipelineResults.push(file);
item.completedWorkers.add(workerId);
}
return queueData;
});
@ -87,13 +90,15 @@ export function pipelineTaskDone(id: string, workerId: string, file: CobaltFileR
export function itemRunning(id: string, workerId: string) {
update(queueData => {
if (queueData[id]) {
queueData[id] = {
...queueData[id],
state: "running",
runningWorker: workerId,
}
const data = queueData[id] as CobaltQueueItemRunning;
if (data) {
data.state = 'running';
data.runningWorker = workerId;
data.completedWorkers ??= new Set();
data.pipelineResults ??= [];
}
return queueData;
});

View File

@ -23,8 +23,8 @@ export type CobaltQueueItemWaiting = CobaltQueueBaseItem & {
export type CobaltQueueItemRunning = CobaltQueueBaseItem & {
state: "running",
runningWorker: string,
completedWorkers?: string[],
pipelineResults?: CobaltFileReference[],
completedWorkers: Set<string>,
pipelineResults: CobaltFileReference[],
};
export type CobaltQueueItemDone = CobaltQueueBaseItem & {