diff --git a/web/src/components/queue/ProcessingQueue.svelte b/web/src/components/queue/ProcessingQueue.svelte index e4c80b1c..62ecdcbb 100644 --- a/web/src/components/queue/ProcessingQueue.svelte +++ b/web/src/components/queue/ProcessingQueue.svelte @@ -8,6 +8,7 @@ import { queueVisible } from "$lib/state/queue-visibility"; import { currentTasks } from "$lib/state/task-manager/current-tasks"; import { clearQueue, queue as readableQueue } from "$lib/state/task-manager/queue"; + import { getProgress } from "$lib/task-manager/queue"; import SectionHeading from "$components/misc/SectionHeading.svelte"; import PopoverContainer from "$components/misc/PopoverContainer.svelte"; @@ -21,23 +22,10 @@ $queueVisible = !$queueVisible; }; - const totalItemProgress = (completed: number, current: number, total: number) => { - return (completed * 100 + current) / total - } - $: queue = Object.entries($readableQueue); $: totalProgress = queue.length ? queue.map(([, item]) => { - if (item.state === "done" || item.state === "error") { - return 100; - } else if (item.state === "running") { - return totalItemProgress( - item.completedWorkers.size, - $currentTasks[item.runningWorker]?.progress?.percentage || 0, - item.pipeline.length || 0 - ); - } - return 0; + return getProgress(item) * 100; }).reduce((a, b) => a + b) / (100 * queue.length) : 0; $: indeterminate = queue.length > 0 && totalProgress === 0; @@ -93,16 +81,7 @@
{#each queue as [id, item]} - + {/each} {#if queue.length === 0} diff --git a/web/src/components/queue/ProcessingQueueItem.svelte b/web/src/components/queue/ProcessingQueueItem.svelte index bbbfd402..bcbc48b4 100644 --- a/web/src/components/queue/ProcessingQueueItem.svelte +++ b/web/src/components/queue/ProcessingQueueItem.svelte @@ -4,10 +4,11 @@ import { downloadFile } from "$lib/download"; import { removeItem } from "$lib/state/task-manager/queue"; import { savingHandler } from "$lib/api/saving-handler"; + import { getProgress } from "$lib/task-manager/queue"; + import { currentTasks } from "$lib/state/task-manager/current-tasks"; import type { CobaltQueueItem } from "$lib/types/queue"; - import type { CobaltWorkerProgress } from "$lib/types/workers"; - import type { CobaltCurrentTaskItem } from "$lib/types/task-manager"; + import type { CobaltCurrentTasks } from "$lib/types/task-manager"; import ProgressBar from "$components/queue/ProgressBar.svelte"; @@ -30,8 +31,6 @@ export let id: string; export let info: CobaltQueueItem; - export let runningWorker: CobaltCurrentTaskItem | undefined; - export let runningWorkerId: string | undefined; let retrying = false; @@ -52,41 +51,51 @@ }), }); - $: progress = runningWorker?.progress; - type StatusText = { info: CobaltQueueItem; - runningWorker: CobaltCurrentTaskItem | undefined; - progress: CobaltWorkerProgress | undefined; + currentTasks: CobaltCurrentTasks; retrying: boolean; }; - const generateStatusText = ({ info, runningWorker, progress, retrying }: StatusText) => { + const generateStatusText = ({ info, retrying, currentTasks }: StatusText) => { switch (info.state) { case "running": - if (runningWorker) { - const running = $t(`queue.state.running.${runningWorker.type}`); - const formattedSize = formatFileSize(progress?.size); + const progress = getProgress(info); - if (progress && progress.percentage) { - return `${running}: ${Math.floor(progress.percentage)}%, ${formattedSize}`; - } - else if (runningWorker && progress) { - if (progress.size > 0) { - return `${running}: ${formattedSize}`; - } - return running; - } - else if (runningWorker?.type) { - const starting = $t(`queue.state.starting.${runningWorker.type}`); + const runningWorkers = info.pipeline.filter(w => w.workerId in currentTasks); + const running = [...new Set(runningWorkers.map(task => $t(`queue.state.running.${task.worker}`)))].join(', '); + const progresses = runningWorkers.map(w => currentTasks[w.workerId]) + .map(t => t.progress) + .filter(p => p); - if (info.pipeline.length > 1) { - const currentPipeline = info.completedWorkers.size + 1; - return `${starting} (${currentPipeline}/${info.pipeline.length})`; - } - return starting; - } + const totalSize = progresses.reduce((s, p) => s + (p?.size ?? 0), 0); + + if (runningWorkers.length && totalSize > 0) { + const formattedSize = formatFileSize(totalSize); + return `${running}: ${Math.floor(progress * 100)}%, ${formattedSize}`; } + + const firstUnstarted = info.pipeline.find(w => { + if (info.completedWorkers.has(w.workerId)) + return false; + + const task = currentTasks[w.workerId]; + if (!task || !task.progress?.percentage) { + return true; + } + }); + + if (firstUnstarted) { + const starting = $t(`queue.state.starting.${firstUnstarted.worker}`); + + if (info.pipeline.length > 1) { + const currentPipeline = info.completedWorkers.size + 1; + return `${starting} (${currentPipeline}/${info.pipeline.length})`; + } + + return starting; + } + return $t("queue.state.starting"); case "done": @@ -100,6 +109,23 @@ } }; + const getWorkerProgress = (item: CobaltQueueItem, workerId: string): number | undefined => { + if (item.state === 'running' && item.completedWorkers.has(workerId)) { + return 100; + } + + const workerIndex = item.pipeline.findIndex(w => w.workerId === workerId); + if (workerIndex === -1) { + return; + } + + const worker = item.pipeline[workerIndex]; + const task = $currentTasks[worker.workerId]; + if (task?.progress) { + return task.progress.percentage; + } + } + /* params are passed here because svelte will re-run the function every time either of them is changed, @@ -107,9 +133,8 @@ */ $: statusText = generateStatusText({ info, - runningWorker, - progress, retrying, + currentTasks: $currentTasks }); @@ -126,11 +151,10 @@ {#if info.state === "running"}
- {#each info.pipeline as pipeline} + {#each info.pipeline as task} {/each} diff --git a/web/src/components/queue/ProgressBar.svelte b/web/src/components/queue/ProgressBar.svelte index 59dcebc2..e6531387 100644 --- a/web/src/components/queue/ProgressBar.svelte +++ b/web/src/components/queue/ProgressBar.svelte @@ -3,22 +3,21 @@ export let percentage: number = 0; export let workerId: string; - export let runningWorkerId: string | undefined; export let completedWorkers: Set;
- {#if percentage && workerId === runningWorkerId} + {#if percentage}
{:else if completedWorkers.has(workerId)}
- {:else if workerId === runningWorkerId} + {:else} { const data = queueData[id] as CobaltQueueItemRunning; if (data) { data.state = 'running'; - data.runningWorker = workerId; data.completedWorkers ??= new Set(); data.pipelineResults ??= []; } diff --git a/web/src/lib/task-manager/queue.ts b/web/src/lib/task-manager/queue.ts index b1be95b7..a1b93df2 100644 --- a/web/src/lib/task-manager/queue.ts +++ b/web/src/lib/task-manager/queue.ts @@ -4,9 +4,11 @@ import { ffmpegMetadataArgs } from "$lib/util"; import { createDialog } from "$lib/state/dialogs"; import { addItem } from "$lib/state/task-manager/queue"; import { openQueuePopover } from "$lib/state/queue-visibility"; +import { currentTasks } from "$lib/state/task-manager/current-tasks"; import type { CobaltPipelineItem, CobaltPipelineResultFileType } from "$lib/types/workers"; import type { CobaltLocalProcessingResponse, CobaltSaveRequestBody } from "$lib/types/api"; +import type { CobaltQueueItem } from "$lib/types/queue"; export const getMediaType = (type: string) => { const kind = type.split('/')[0]; @@ -177,6 +179,7 @@ export const createSavePipeline = (info: CobaltLocalProcessingResponse, request: worker: workerType, workerId: crypto.randomUUID(), parentId, + dependsOn: pipeline.map(w => w.workerId), workerArgs: { files: [], ffargs, @@ -200,3 +203,24 @@ export const createSavePipeline = (info: CobaltLocalProcessingResponse, request: openQueuePopover(); } + +export const getProgress = (item: CobaltQueueItem): number => { + if (item.state === 'done' || item.state === 'error') { + return 1; + } else if (item.state === 'waiting') { + return 0; + } + + const runningTasks = get(currentTasks); + let sum = 0; + for (const worker of item.pipeline) { + if (item.completedWorkers.has(worker.workerId)) { + sum += 1; + } else { + const task = runningTasks[worker.workerId]; + sum += (task?.progress?.percentage || 0) / 100; + } + } + + return sum / item.pipeline.length; +} diff --git a/web/src/lib/task-manager/scheduler.ts b/web/src/lib/task-manager/scheduler.ts index 4cb0ec53..bcb5cab7 100644 --- a/web/src/lib/task-manager/scheduler.ts +++ b/web/src/lib/task-manager/scheduler.ts @@ -11,11 +11,7 @@ const startPipeline = (pipelineItem: CobaltPipelineItem) => { parentId: pipelineItem.parentId, }); - itemRunning( - pipelineItem.parentId, - pipelineItem.workerId, - ); - + itemRunning(pipelineItem.parentId); startWorker(pipelineItem); } @@ -23,18 +19,9 @@ export const schedule = () => { const queueItems = get(queue); const ongoingTasks = get(currentTasks); - // TODO (?): task concurrency - if (Object.keys(ongoingTasks).length > 0) { - return; - } - for (const task of Object.values(queueItems)) { if (task.state === "running") { - // if the running worker isn't completed, wait - // to be called again on worker completion - if (!task.completedWorkers.has(task.runningWorker)) { - break; - } + const finalWorker = task.pipeline[task.pipeline.length - 1]; // if all workers are completed, then return the // the final file and go to the next task @@ -44,7 +31,7 @@ export const schedule = () => { if (finalFile) { itemDone(task.id, finalFile); } else { - itemError(task.id, task.runningWorker, "queue.no_final_file"); + itemError(task.id, finalWorker.workerId, "queue.no_final_file"); } continue; @@ -53,10 +40,16 @@ export const schedule = () => { // if current worker is completed, but there are more workers, // then start the next one and wait to be called again for (const worker of task.pipeline) { - if (!task.completedWorkers.has(worker.workerId)) { - startPipeline(worker); + if (task.completedWorkers.has(worker.workerId) || ongoingTasks[worker.workerId]) { + continue; + } + + const needsToWait = worker.dependsOn?.some(id => !task.completedWorkers.has(id)); + if (needsToWait) { break; } + + startPipeline(worker); } // break because we don't want to start next tasks before this one is done diff --git a/web/src/lib/types/queue.ts b/web/src/lib/types/queue.ts index fe5dd6b9..d11993a5 100644 --- a/web/src/lib/types/queue.ts +++ b/web/src/lib/types/queue.ts @@ -1,11 +1,8 @@ import type { CobaltSaveRequestBody } from "$lib/types/api"; import type { CobaltPipelineItem, CobaltPipelineResultFileType } from "$lib/types/workers"; -export type CobaltQueueItemState = "waiting" | "running" | "done" | "error"; - -export type CobaltQueueBaseItem = { +type CobaltQueueBaseItem = { id: string, - state: CobaltQueueItemState, pipeline: CobaltPipelineItem[], canRetry?: boolean, originalRequest?: CobaltSaveRequestBody, @@ -14,28 +11,30 @@ export type CobaltQueueBaseItem = { mediaType: CobaltPipelineResultFileType, }; -export type CobaltQueueItemWaiting = CobaltQueueBaseItem & { +type CobaltQueueItemWaiting = CobaltQueueBaseItem & { state: "waiting", }; export type CobaltQueueItemRunning = CobaltQueueBaseItem & { state: "running", - runningWorker: string, completedWorkers: Set, pipelineResults: File[], }; -export type CobaltQueueItemDone = CobaltQueueBaseItem & { +type CobaltQueueItemDone = CobaltQueueBaseItem & { state: "done", resultFile: File, }; -export type CobaltQueueItemError = CobaltQueueBaseItem & { +type CobaltQueueItemError = CobaltQueueBaseItem & { state: "error", errorCode: string, }; -export type CobaltQueueItem = CobaltQueueItemWaiting | CobaltQueueItemRunning | CobaltQueueItemDone | CobaltQueueItemError; +export type CobaltQueueItem = CobaltQueueItemWaiting + | CobaltQueueItemRunning + | CobaltQueueItemDone + | CobaltQueueItemError; export type CobaltQueue = { [id: string]: CobaltQueueItem, diff --git a/web/src/lib/types/workers.ts b/web/src/lib/types/workers.ts index c6647f2f..58ab9b0a 100644 --- a/web/src/lib/types/workers.ts +++ b/web/src/lib/types/workers.ts @@ -8,17 +8,18 @@ export type CobaltWorkerProgress = { percentage?: number, speed?: number, size: number, -} +}; type CobaltFFmpegWorkerArgs = { files: File[], ffargs: string[], output: FileInfo, -} +}; type CobaltPipelineItemBase = { workerId: string, parentId: string, + dependsOn?: string[], }; type CobaltRemuxPipelineItem = CobaltPipelineItemBase & {