web: rename queen-bee to task-manager
Some checks failed
CodeQL / Analyze (${{ matrix.language }}) (none, javascript-typescript) (push) Has been cancelled
Run tests / check lockfile correctness (push) Has been cancelled
Run tests / web sanity check (push) Has been cancelled
Run tests / api sanity check (push) Has been cancelled

less corny and less cryptic
This commit is contained in:
wukko
2025-04-02 14:57:45 +06:00
parent f4f7032062
commit 0fb4cd7888
13 changed files with 24 additions and 24 deletions

View File

@@ -0,0 +1,136 @@
import { get } from "svelte/store";
import { t } from "$lib/i18n/translations";
import { addItem } from "$lib/state/task-manager/queue";
import { createDialog } from "$lib/state/dialogs";
import { openQueuePopover } from "$lib/state/queue-visibility";
import { ffmpegMetadataArgs } from "$lib/util";
import type { CobaltPipelineItem } from "$lib/types/workers";
import type { CobaltLocalProcessingResponse, CobaltSaveRequestBody } from "$lib/types/api";
export const getMediaType = (type: string) => {
const kind = type.split('/')[0];
// can't use .includes() here for some reason
if (kind === "video" || kind === "audio" || kind === "image") {
return kind;
}
}
export const createRemuxPipeline = (file: File) => {
// chopped khia
const parentId = crypto.randomUUID();
const mediaType = getMediaType(file.type);
const pipeline: CobaltPipelineItem[] = [{
worker: "remux",
workerId: crypto.randomUUID(),
parentId,
workerArgs: {
files: [{
file,
type: file.type,
}],
ffargs: [
"-c", "copy",
"-map", "0"
],
output: {
type: file.type,
format: file.name.split(".").pop(),
},
},
}];
if (mediaType) {
addItem({
id: parentId,
state: "waiting",
pipeline,
filename: file.name,
mimeType: file.type,
mediaType,
});
openQueuePopover();
}
}
export const createSavePipeline = (info: CobaltLocalProcessingResponse, request: CobaltSaveRequestBody) => {
// TODO: proper error here
if (!(info.output?.filename && info.output?.type)) return;
const parentId = crypto.randomUUID();
const pipeline: CobaltPipelineItem[] = [];
// reverse is needed for audio (second item) to be downloaded first
const tunnels = info.tunnel.reverse();
for (const tunnel of tunnels) {
pipeline.push({
worker: "fetch",
workerId: crypto.randomUUID(),
parentId,
workerArgs: {
url: tunnel,
},
});
}
if (["merge", "mute"].includes(info.type)) {
const ffargs = ["-c:v", "copy"];
if (info.type === "merge") {
ffargs.push("-c:a", "copy");
} else if (info.type === "mute") {
ffargs.push("-an");
}
ffargs.push(
...(info.output.metadata ? ffmpegMetadataArgs(info.output.metadata) : [])
);
pipeline.push({
worker: "remux",
workerId: crypto.randomUUID(),
parentId,
workerArgs: {
files: [],
ffargs,
output: {
type: info.output.type,
format: info.output.filename.split(".").pop(),
},
},
});
}
if (["audio", "gif"].includes(info.type)) {
return createDialog({
id: "save-error",
type: "small",
meowbalt: "error",
buttons: [
{
text: get(t)("button.gotit"),
main: true,
action: () => { },
},
],
bodyText: "audio and gif processing isn't implemented yet!",
});
}
addItem({
id: parentId,
state: "waiting",
pipeline,
canRetry: true,
originalRequest: request,
filename: info.output.filename,
mimeType: info.output.type,
mediaType: "video",
});
openQueuePopover();
}

View File

@@ -0,0 +1,48 @@
import { get } from "svelte/store";
import { queue } from "$lib/state/task-manager/queue";
import { runRemuxWorker } from "$lib/task-manager/runners/remux";
import { runFetchWorker } from "$lib/task-manager/runners/fetch";
import type { CobaltPipelineItem } from "$lib/types/workers";
import type { CobaltFileReference } from "$lib/types/storage";
export const killWorker = (worker: Worker, unsubscribe: () => void, interval?: NodeJS.Timeout) => {
unsubscribe();
worker.terminate();
if (interval) clearInterval(interval);
}
export const startWorker = async ({ worker, workerId, parentId, workerArgs }: CobaltPipelineItem) => {
let files: CobaltFileReference[] = [];
switch (worker) {
case "remux":
if (workerArgs.files) {
files = workerArgs.files;
}
if (files.length === 0) {
const parent = get(queue)[parentId];
if (parent.state === "running" && parent.pipelineResults.length) {
files = parent.pipelineResults;
}
}
if (files.length > 0 && workerArgs.ffargs && workerArgs.output) {
await runRemuxWorker(
workerId,
parentId,
files,
workerArgs.ffargs,
workerArgs.output,
/*resetStartCounter=*/true,
);
}
break;
case "fetch":
await runFetchWorker(workerId, parentId, workerArgs.url)
break;
}
}

View File

@@ -0,0 +1,51 @@
import FetchWorker from "$lib/workers/fetch?worker";
import { killWorker } from "$lib/task-manager/run-worker";
import { updateWorkerProgress } from "$lib/state/task-manager/current-tasks";
import { pipelineTaskDone, itemError, queue } from "$lib/state/task-manager/queue";
import type { CobaltQueue } from "$lib/types/queue";
export const runFetchWorker = async (workerId: string, parentId: string, url: string) => {
const worker = new FetchWorker();
const unsubscribe = queue.subscribe((queue: CobaltQueue) => {
if (!queue[parentId]) {
// TODO: remove logging
console.log("worker's parent is gone, so it killed itself");
killWorker(worker, unsubscribe);
}
});
worker.postMessage({
cobaltFetchWorker: {
url
}
});
worker.onmessage = (event) => {
const eventData = event.data.cobaltFetchWorker;
if (!eventData) return;
if (eventData.progress) {
updateWorkerProgress(workerId, {
percentage: eventData.progress,
size: eventData.size,
})
}
if (eventData.result) {
killWorker(worker, unsubscribe);
return pipelineTaskDone(
parentId,
workerId,
eventData.result,
);
}
if (eventData.error) {
killWorker(worker, unsubscribe);
return itemError(parentId, workerId, eventData.error);
}
}
}

View File

@@ -0,0 +1,109 @@
import RemuxWorker from "$lib/workers/remux?worker";
import { killWorker } from "$lib/task-manager/run-worker";
import { updateWorkerProgress } from "$lib/state/task-manager/current-tasks";
import { pipelineTaskDone, itemError, queue } from "$lib/state/task-manager/queue";
import type { FileInfo } from "$lib/types/libav";
import type { CobaltQueue } from "$lib/types/queue";
import type { CobaltFileReference } from "$lib/types/storage";
let startAttempts = 0;
export const runRemuxWorker = async (
workerId: string,
parentId: string,
files: CobaltFileReference[],
args: string[],
output: FileInfo,
resetStartCounter = false
) => {
const worker = new RemuxWorker();
// sometimes chrome refuses to start libav wasm,
// so we check if it started, try 10 more times if not, and kill self if it still doesn't work
// TODO: fix the underlying issue because this is ridiculous
if (resetStartCounter) startAttempts = 0;
let bumpAttempts = 0;
const startCheck = setInterval(async () => {
bumpAttempts++;
if (bumpAttempts === 10) {
startAttempts++;
if (startAttempts <= 10) {
killWorker(worker, unsubscribe, startCheck);
console.error("worker didn't start after 5 seconds, so it was killed and started again");
return await runRemuxWorker(workerId, parentId, files, args, output);
} else {
killWorker(worker, unsubscribe, startCheck);
console.error("worker didn't start after 10 attempts, so we're giving up");
// TODO: proper error code
return itemError(parentId, workerId, "worker didn't start");
}
}
}, 500);
const unsubscribe = queue.subscribe((queue: CobaltQueue) => {
if (!queue[parentId]) {
// TODO: remove logging
console.log("worker's parent is gone, so it killed itself");
killWorker(worker, unsubscribe, startCheck);
}
});
worker.postMessage({
cobaltRemuxWorker: {
files,
args,
output,
}
});
worker.onerror = (e) => {
console.error("remux worker exploded:", e);
killWorker(worker, unsubscribe, startCheck);
// TODO: proper error code
return itemError(parentId, workerId, "internal error");
};
let totalDuration: number | null = null;
worker.onmessage = (event) => {
const eventData = event.data.cobaltRemuxWorker;
if (!eventData) return;
clearInterval(startCheck);
// temporary debug logging
console.log(JSON.stringify(eventData, null, 2));
if (eventData.progress) {
if (eventData.progress.duration) {
totalDuration = eventData.progress.duration;
}
updateWorkerProgress(workerId, {
percentage: totalDuration ? (eventData.progress.durationProcessed / totalDuration) * 100 : 0,
size: eventData.progress.size,
})
}
if (eventData.render) {
killWorker(worker, unsubscribe, startCheck);
return pipelineTaskDone(
parentId,
workerId,
eventData.render,
);
}
if (eventData.error) {
killWorker(worker, unsubscribe, startCheck);
return itemError(parentId, workerId, eventData.error);
}
};
}

View File

@@ -0,0 +1,76 @@
import { get } from "svelte/store";
import { startWorker } from "$lib/task-manager/run-worker";
import { addWorkerToQueue, currentTasks } from "$lib/state/task-manager/current-tasks";
import { itemDone, itemError, itemRunning, queue } from "$lib/state/task-manager/queue";
import type { CobaltPipelineItem } from "$lib/types/workers";
const startPipeline = (pipelineItem: CobaltPipelineItem) => {
addWorkerToQueue(pipelineItem.workerId, {
type: pipelineItem.worker,
parentId: pipelineItem.parentId,
});
itemRunning(
pipelineItem.parentId,
pipelineItem.workerId,
);
startWorker(pipelineItem);
}
export const schedule = () => {
const queueItems = get(queue);
const ongoingTasks = get(currentTasks);
// TODO (?): task concurrency
if (Object.keys(ongoingTasks).length > 0) {
return;
}
for (const task of Object.values(queueItems)) {
if (task.state === "running") {
// if the running worker isn't completed, wait
// to be called again on worker completion
if (!task.completedWorkers.has(task.runningWorker)) {
break;
}
// if all workers are completed, then return the
// the final file and go to the next task
if (task.completedWorkers.size === task.pipeline.length) {
const finalFile = task.pipelineResults.pop();
if (finalFile) {
itemDone(task.id, finalFile);
} else {
itemError(task.id, task.runningWorker, "no final file");
}
continue;
}
// if current worker is completed, but there are more workers,
// then start the next one and wait to be called again
for (const worker of task.pipeline) {
if (!task.completedWorkers.has(worker.workerId)) {
startPipeline(worker);
break;
}
}
// break because we don't want to start next tasks before this one is done
// it's necessary because some tasks might take some time before being marked as running
break;
}
// start the nearest waiting task and wait to be called again
else if (task.state === "waiting" && task.pipeline.length > 0) {
startPipeline(task.pipeline[0]);
// break because we don't want to start next tasks before this one is done
// it's necessary because some tasks might take some time before being marked as running
break;
}
}
}