web/run-worker: subscribe to queue & kill worker when removed from store

& also clear the interval
This commit is contained in:
wukko 2025-01-25 23:59:45 +06:00
parent d4684fa1f7
commit de66ac6b08
No known key found for this signature in database
GPG Key ID: 3E30B3F26C7B4AA2

View File

@ -1,31 +1,20 @@
import RemuxWorker from "$lib/workers/remux?worker"; import RemuxWorker from "$lib/workers/remux?worker";
import type { CobaltPipelineItem } from "$lib/types/workers"; import { itemDone, itemError, queue } from "$lib/state/queen-bee/queue";
import { itemDone, itemError } from "$lib/state/queen-bee/queue";
import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks"; import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks";
const workerError = (parentId: string, workerId: string, worker: Worker, error: string) => { import type { CobaltQueue } from "$lib/types/queue";
itemError(parentId, workerId, error); import type { CobaltPipelineItem } from "$lib/types/workers";
worker.terminate();
}
const workerSuccess = (parentId: string, workerId: string, worker: Worker, file: File) => { const killWorker = (worker: Worker, unsubscribe: () => void, interval: NodeJS.Timeout) => {
itemDone(parentId, workerId, file); unsubscribe();
worker.terminate(); worker.terminate();
clearInterval(interval);
} }
export const runRemuxWorker = async (workerId: string, parentId: string, file: File) => { export const runRemuxWorker = async (workerId: string, parentId: string, file: File) => {
const worker = new RemuxWorker(); const worker = new RemuxWorker();
worker.postMessage({ file });
worker.onerror = (e) => {
console.error("remux worker exploded:", e);
// TODO: proper error code
workerError(parentId, workerId, worker, "internal error");
};
// sometimes chrome refuses to start libav wasm, // sometimes chrome refuses to start libav wasm,
// so we check the health and kill self if it doesn't spawn // so we check the health and kill self if it doesn't spawn
@ -34,25 +23,43 @@ export const runRemuxWorker = async (workerId: string, parentId: string, file: F
bumpAttempts++; bumpAttempts++;
if (bumpAttempts === 8) { if (bumpAttempts === 8) {
worker.terminate(); killWorker(worker, unsubscribe, startCheck);
console.error("worker didn't start after 4 seconds, so it was killed"); console.error("worker didn't start after 4 seconds, so it was killed");
// TODO: proper error code // TODO: proper error code
return workerError(parentId, workerId, worker, "worker didn't start"); return itemError(parentId, workerId, "worker didn't start");
} }
}, 500); }, 500);
const unsubscribe = queue.subscribe((queue: CobaltQueue) => {
if (!queue[parentId]) {
// TODO: remove logging
console.log("worker's parent is gone, so it killed itself");
killWorker(worker, unsubscribe, startCheck);
}
});
worker.postMessage({ file });
worker.onerror = (e) => {
console.error("remux worker exploded:", e);
killWorker(worker, unsubscribe, startCheck);
// TODO: proper error code
return itemError(parentId, workerId, "internal error");
};
let totalDuration: number | null = null; let totalDuration: number | null = null;
worker.onmessage = (event) => { worker.onmessage = (event) => {
const eventData = event.data.cobaltRemuxWorker; const eventData = event.data.cobaltRemuxWorker;
if (!eventData) return; if (!eventData) return;
clearInterval(startCheck);
// temporary debug logging // temporary debug logging
console.log(JSON.stringify(eventData, null, 2)); console.log(JSON.stringify(eventData, null, 2));
clearInterval(startCheck);
if (eventData.progress) { if (eventData.progress) {
if (eventData.progress.duration) { if (eventData.progress.duration) {
totalDuration = eventData.progress.duration; totalDuration = eventData.progress.duration;
@ -65,10 +72,10 @@ export const runRemuxWorker = async (workerId: string, parentId: string, file: F
} }
if (eventData.render) { if (eventData.render) {
return workerSuccess( killWorker(worker, unsubscribe, startCheck);
return itemDone(
parentId, parentId,
workerId, workerId,
worker,
new File([eventData.render], eventData.filename, { new File([eventData.render], eventData.filename, {
type: eventData.render.type, type: eventData.render.type,
}) })
@ -76,7 +83,8 @@ export const runRemuxWorker = async (workerId: string, parentId: string, file: F
} }
if (eventData.error) { if (eventData.error) {
return workerError(parentId, workerId, worker, eventData.error); killWorker(worker, unsubscribe, startCheck);
return itemError(parentId, workerId, eventData.error);
} }
}; };
} }