web: parallel queue item processing
Some checks failed
CodeQL / Analyze (${{ matrix.language }}) (none, javascript-typescript) (push) Has been cancelled
Run tests / check lockfile correctness (push) Has been cancelled
Run tests / web sanity check (push) Has been cancelled
Run tests / api sanity check (push) Has been cancelled

This commit is contained in:
jj 2025-05-16 16:42:40 +00:00
parent 426c073d5f
commit 398681857b
No known key found for this signature in database
8 changed files with 112 additions and 94 deletions

View File

@ -8,6 +8,7 @@
import { queueVisible } from "$lib/state/queue-visibility"; import { queueVisible } from "$lib/state/queue-visibility";
import { currentTasks } from "$lib/state/task-manager/current-tasks"; import { currentTasks } from "$lib/state/task-manager/current-tasks";
import { clearQueue, queue as readableQueue } from "$lib/state/task-manager/queue"; import { clearQueue, queue as readableQueue } from "$lib/state/task-manager/queue";
import { getProgress } from "$lib/task-manager/queue";
import SectionHeading from "$components/misc/SectionHeading.svelte"; import SectionHeading from "$components/misc/SectionHeading.svelte";
import PopoverContainer from "$components/misc/PopoverContainer.svelte"; import PopoverContainer from "$components/misc/PopoverContainer.svelte";
@ -21,23 +22,10 @@
$queueVisible = !$queueVisible; $queueVisible = !$queueVisible;
}; };
const totalItemProgress = (completed: number, current: number, total: number) => {
return (completed * 100 + current) / total
}
$: queue = Object.entries($readableQueue); $: queue = Object.entries($readableQueue);
$: totalProgress = queue.length ? queue.map(([, item]) => { $: totalProgress = queue.length ? queue.map(([, item]) => {
if (item.state === "done" || item.state === "error") { return getProgress(item) * 100;
return 100;
} else if (item.state === "running") {
return totalItemProgress(
item.completedWorkers.size,
$currentTasks[item.runningWorker]?.progress?.percentage || 0,
item.pipeline.length || 0
);
}
return 0;
}).reduce((a, b) => a + b) / (100 * queue.length) : 0; }).reduce((a, b) => a + b) / (100 * queue.length) : 0;
$: indeterminate = queue.length > 0 && totalProgress === 0; $: indeterminate = queue.length > 0 && totalProgress === 0;
@ -93,16 +81,7 @@
<div id="processing-list" role="list" aria-labelledby="queue-title"> <div id="processing-list" role="list" aria-labelledby="queue-title">
{#each queue as [id, item]} {#each queue as [id, item]}
<ProcessingQueueItem <ProcessingQueueItem {id} info={item} />
{id}
info={item}
runningWorker={
item.state === "running" ? $currentTasks[item.runningWorker] : undefined
}
runningWorkerId={
item.state === "running" ? item.runningWorker : undefined
}
/>
{/each} {/each}
{#if queue.length === 0} {#if queue.length === 0}
<ProcessingQueueStub /> <ProcessingQueueStub />

View File

@ -4,10 +4,11 @@
import { downloadFile } from "$lib/download"; import { downloadFile } from "$lib/download";
import { removeItem } from "$lib/state/task-manager/queue"; import { removeItem } from "$lib/state/task-manager/queue";
import { savingHandler } from "$lib/api/saving-handler"; import { savingHandler } from "$lib/api/saving-handler";
import { getProgress } from "$lib/task-manager/queue";
import { currentTasks } from "$lib/state/task-manager/current-tasks";
import type { CobaltQueueItem } from "$lib/types/queue"; import type { CobaltQueueItem } from "$lib/types/queue";
import type { CobaltWorkerProgress } from "$lib/types/workers"; import type { CobaltCurrentTasks } from "$lib/types/task-manager";
import type { CobaltCurrentTaskItem } from "$lib/types/task-manager";
import ProgressBar from "$components/queue/ProgressBar.svelte"; import ProgressBar from "$components/queue/ProgressBar.svelte";
@ -30,8 +31,6 @@
export let id: string; export let id: string;
export let info: CobaltQueueItem; export let info: CobaltQueueItem;
export let runningWorker: CobaltCurrentTaskItem | undefined;
export let runningWorkerId: string | undefined;
let retrying = false; let retrying = false;
@ -52,41 +51,51 @@
}), }),
}); });
$: progress = runningWorker?.progress;
type StatusText = { type StatusText = {
info: CobaltQueueItem; info: CobaltQueueItem;
runningWorker: CobaltCurrentTaskItem | undefined; currentTasks: CobaltCurrentTasks;
progress: CobaltWorkerProgress | undefined;
retrying: boolean; retrying: boolean;
}; };
const generateStatusText = ({ info, runningWorker, progress, retrying }: StatusText) => { const generateStatusText = ({ info, retrying, currentTasks }: StatusText) => {
switch (info.state) { switch (info.state) {
case "running": case "running":
if (runningWorker) { const progress = getProgress(info);
const running = $t(`queue.state.running.${runningWorker.type}`);
const formattedSize = formatFileSize(progress?.size);
if (progress && progress.percentage) { const runningWorkers = info.pipeline.filter(w => w.workerId in currentTasks);
return `${running}: ${Math.floor(progress.percentage)}%, ${formattedSize}`; const running = [...new Set(runningWorkers.map(task => $t(`queue.state.running.${task.worker}`)))].join(', ');
const progresses = runningWorkers.map(w => currentTasks[w.workerId])
.map(t => t.progress)
.filter(p => p);
const totalSize = progresses.reduce((s, p) => s + (p?.size ?? 0), 0);
if (runningWorkers.length && totalSize > 0) {
const formattedSize = formatFileSize(totalSize);
return `${running}: ${Math.floor(progress * 100)}%, ${formattedSize}`;
} }
else if (runningWorker && progress) {
if (progress.size > 0) { const firstUnstarted = info.pipeline.find(w => {
return `${running}: ${formattedSize}`; if (info.completedWorkers.has(w.workerId))
return false;
const task = currentTasks[w.workerId];
if (!task || !task.progress?.percentage) {
return true;
} }
return running; });
}
else if (runningWorker?.type) { if (firstUnstarted) {
const starting = $t(`queue.state.starting.${runningWorker.type}`); const starting = $t(`queue.state.starting.${firstUnstarted.worker}`);
if (info.pipeline.length > 1) { if (info.pipeline.length > 1) {
const currentPipeline = info.completedWorkers.size + 1; const currentPipeline = info.completedWorkers.size + 1;
return `${starting} (${currentPipeline}/${info.pipeline.length})`; return `${starting} (${currentPipeline}/${info.pipeline.length})`;
} }
return starting; return starting;
} }
}
return $t("queue.state.starting"); return $t("queue.state.starting");
case "done": case "done":
@ -100,6 +109,23 @@
} }
}; };
const getWorkerProgress = (item: CobaltQueueItem, workerId: string): number | undefined => {
if (item.state === 'running' && item.completedWorkers.has(workerId)) {
return 100;
}
const workerIndex = item.pipeline.findIndex(w => w.workerId === workerId);
if (workerIndex === -1) {
return;
}
const worker = item.pipeline[workerIndex];
const task = $currentTasks[worker.workerId];
if (task?.progress) {
return task.progress.percentage;
}
}
/* /*
params are passed here because svelte will re-run params are passed here because svelte will re-run
the function every time either of them is changed, the function every time either of them is changed,
@ -107,9 +133,8 @@
*/ */
$: statusText = generateStatusText({ $: statusText = generateStatusText({
info, info,
runningWorker,
progress,
retrying, retrying,
currentTasks: $currentTasks
}); });
</script> </script>
@ -126,11 +151,10 @@
{#if info.state === "running"} {#if info.state === "running"}
<div class="progress-holder"> <div class="progress-holder">
{#each info.pipeline as pipeline} {#each info.pipeline as task}
<ProgressBar <ProgressBar
percentage={progress?.percentage} percentage={getWorkerProgress(info, task.workerId) || 0}
workerId={pipeline.workerId} workerId={task.workerId}
{runningWorkerId}
completedWorkers={info.completedWorkers} completedWorkers={info.completedWorkers}
/> />
{/each} {/each}

View File

@ -3,22 +3,21 @@
export let percentage: number = 0; export let percentage: number = 0;
export let workerId: string; export let workerId: string;
export let runningWorkerId: string | undefined;
export let completedWorkers: Set<string>; export let completedWorkers: Set<string>;
</script> </script>
<div class="file-progress"> <div class="file-progress">
{#if percentage && workerId === runningWorkerId} {#if percentage}
<div <div
class="progress" class="progress"
style="width: {Math.min(100, percentage || 0)}%" style="width: {Math.min(100, percentage)}%"
></div> ></div>
{:else if completedWorkers.has(workerId)} {:else if completedWorkers.has(workerId)}
<div <div
class="progress" class="progress"
style="width: 100%" style="width: 100%"
></div> ></div>
{:else if workerId === runningWorkerId} {:else}
<Skeleton <Skeleton
height="6px" height="6px"
width="100%" width="100%"

View File

@ -86,13 +86,12 @@ export function pipelineTaskDone(id: string, workerId: string, file: File) {
schedule(); schedule();
} }
export function itemRunning(id: string, workerId: string) { export function itemRunning(id: string) {
update(queueData => { update(queueData => {
const data = queueData[id] as CobaltQueueItemRunning; const data = queueData[id] as CobaltQueueItemRunning;
if (data) { if (data) {
data.state = 'running'; data.state = 'running';
data.runningWorker = workerId;
data.completedWorkers ??= new Set(); data.completedWorkers ??= new Set();
data.pipelineResults ??= []; data.pipelineResults ??= [];
} }

View File

@ -4,9 +4,11 @@ import { ffmpegMetadataArgs } from "$lib/util";
import { createDialog } from "$lib/state/dialogs"; import { createDialog } from "$lib/state/dialogs";
import { addItem } from "$lib/state/task-manager/queue"; import { addItem } from "$lib/state/task-manager/queue";
import { openQueuePopover } from "$lib/state/queue-visibility"; import { openQueuePopover } from "$lib/state/queue-visibility";
import { currentTasks } from "$lib/state/task-manager/current-tasks";
import type { CobaltPipelineItem, CobaltPipelineResultFileType } from "$lib/types/workers"; import type { CobaltPipelineItem, CobaltPipelineResultFileType } from "$lib/types/workers";
import type { CobaltLocalProcessingResponse, CobaltSaveRequestBody } from "$lib/types/api"; import type { CobaltLocalProcessingResponse, CobaltSaveRequestBody } from "$lib/types/api";
import type { CobaltQueueItem } from "$lib/types/queue";
export const getMediaType = (type: string) => { export const getMediaType = (type: string) => {
const kind = type.split('/')[0]; const kind = type.split('/')[0];
@ -177,6 +179,7 @@ export const createSavePipeline = (info: CobaltLocalProcessingResponse, request:
worker: workerType, worker: workerType,
workerId: crypto.randomUUID(), workerId: crypto.randomUUID(),
parentId, parentId,
dependsOn: pipeline.map(w => w.workerId),
workerArgs: { workerArgs: {
files: [], files: [],
ffargs, ffargs,
@ -200,3 +203,24 @@ export const createSavePipeline = (info: CobaltLocalProcessingResponse, request:
openQueuePopover(); openQueuePopover();
} }
export const getProgress = (item: CobaltQueueItem): number => {
if (item.state === 'done' || item.state === 'error') {
return 1;
} else if (item.state === 'waiting') {
return 0;
}
const runningTasks = get(currentTasks);
let sum = 0;
for (const worker of item.pipeline) {
if (item.completedWorkers.has(worker.workerId)) {
sum += 1;
} else {
const task = runningTasks[worker.workerId];
sum += (task?.progress?.percentage || 0) / 100;
}
}
return sum / item.pipeline.length;
}

View File

@ -11,11 +11,7 @@ const startPipeline = (pipelineItem: CobaltPipelineItem) => {
parentId: pipelineItem.parentId, parentId: pipelineItem.parentId,
}); });
itemRunning( itemRunning(pipelineItem.parentId);
pipelineItem.parentId,
pipelineItem.workerId,
);
startWorker(pipelineItem); startWorker(pipelineItem);
} }
@ -23,18 +19,9 @@ export const schedule = () => {
const queueItems = get(queue); const queueItems = get(queue);
const ongoingTasks = get(currentTasks); const ongoingTasks = get(currentTasks);
// TODO (?): task concurrency
if (Object.keys(ongoingTasks).length > 0) {
return;
}
for (const task of Object.values(queueItems)) { for (const task of Object.values(queueItems)) {
if (task.state === "running") { if (task.state === "running") {
// if the running worker isn't completed, wait const finalWorker = task.pipeline[task.pipeline.length - 1];
// to be called again on worker completion
if (!task.completedWorkers.has(task.runningWorker)) {
break;
}
// if all workers are completed, then return the // if all workers are completed, then return the
// the final file and go to the next task // the final file and go to the next task
@ -44,7 +31,7 @@ export const schedule = () => {
if (finalFile) { if (finalFile) {
itemDone(task.id, finalFile); itemDone(task.id, finalFile);
} else { } else {
itemError(task.id, task.runningWorker, "queue.no_final_file"); itemError(task.id, finalWorker.workerId, "queue.no_final_file");
} }
continue; continue;
@ -53,10 +40,16 @@ export const schedule = () => {
// if current worker is completed, but there are more workers, // if current worker is completed, but there are more workers,
// then start the next one and wait to be called again // then start the next one and wait to be called again
for (const worker of task.pipeline) { for (const worker of task.pipeline) {
if (!task.completedWorkers.has(worker.workerId)) { if (task.completedWorkers.has(worker.workerId) || ongoingTasks[worker.workerId]) {
startPipeline(worker); continue;
}
const needsToWait = worker.dependsOn?.some(id => !task.completedWorkers.has(id));
if (needsToWait) {
break; break;
} }
startPipeline(worker);
} }
// break because we don't want to start next tasks before this one is done // break because we don't want to start next tasks before this one is done

View File

@ -1,11 +1,8 @@
import type { CobaltSaveRequestBody } from "$lib/types/api"; import type { CobaltSaveRequestBody } from "$lib/types/api";
import type { CobaltPipelineItem, CobaltPipelineResultFileType } from "$lib/types/workers"; import type { CobaltPipelineItem, CobaltPipelineResultFileType } from "$lib/types/workers";
export type CobaltQueueItemState = "waiting" | "running" | "done" | "error"; type CobaltQueueBaseItem = {
export type CobaltQueueBaseItem = {
id: string, id: string,
state: CobaltQueueItemState,
pipeline: CobaltPipelineItem[], pipeline: CobaltPipelineItem[],
canRetry?: boolean, canRetry?: boolean,
originalRequest?: CobaltSaveRequestBody, originalRequest?: CobaltSaveRequestBody,
@ -14,28 +11,30 @@ export type CobaltQueueBaseItem = {
mediaType: CobaltPipelineResultFileType, mediaType: CobaltPipelineResultFileType,
}; };
export type CobaltQueueItemWaiting = CobaltQueueBaseItem & { type CobaltQueueItemWaiting = CobaltQueueBaseItem & {
state: "waiting", state: "waiting",
}; };
export type CobaltQueueItemRunning = CobaltQueueBaseItem & { export type CobaltQueueItemRunning = CobaltQueueBaseItem & {
state: "running", state: "running",
runningWorker: string,
completedWorkers: Set<string>, completedWorkers: Set<string>,
pipelineResults: File[], pipelineResults: File[],
}; };
export type CobaltQueueItemDone = CobaltQueueBaseItem & { type CobaltQueueItemDone = CobaltQueueBaseItem & {
state: "done", state: "done",
resultFile: File, resultFile: File,
}; };
export type CobaltQueueItemError = CobaltQueueBaseItem & { type CobaltQueueItemError = CobaltQueueBaseItem & {
state: "error", state: "error",
errorCode: string, errorCode: string,
}; };
export type CobaltQueueItem = CobaltQueueItemWaiting | CobaltQueueItemRunning | CobaltQueueItemDone | CobaltQueueItemError; export type CobaltQueueItem = CobaltQueueItemWaiting
| CobaltQueueItemRunning
| CobaltQueueItemDone
| CobaltQueueItemError;
export type CobaltQueue = { export type CobaltQueue = {
[id: string]: CobaltQueueItem, [id: string]: CobaltQueueItem,

View File

@ -8,17 +8,18 @@ export type CobaltWorkerProgress = {
percentage?: number, percentage?: number,
speed?: number, speed?: number,
size: number, size: number,
} };
type CobaltFFmpegWorkerArgs = { type CobaltFFmpegWorkerArgs = {
files: File[], files: File[],
ffargs: string[], ffargs: string[],
output: FileInfo, output: FileInfo,
} };
type CobaltPipelineItemBase = { type CobaltPipelineItemBase = {
workerId: string, workerId: string,
parentId: string, parentId: string,
dependsOn?: string[],
}; };
type CobaltRemuxPipelineItem = CobaltPipelineItemBase & { type CobaltRemuxPipelineItem = CobaltPipelineItemBase & {