mirror of
https://github.com/imputnet/cobalt.git
synced 2026-02-16 21:46:00 +00:00
web/task-manager: move workers next to runners
This commit is contained in:
94
web/src/lib/task-manager/workers/fetch.ts
Normal file
94
web/src/lib/task-manager/workers/fetch.ts
Normal file
@@ -0,0 +1,94 @@
|
||||
import { OPFSStorage } from "$lib/storage";
|
||||
|
||||
let attempts = 0;
|
||||
|
||||
const fetchFile = async (url: string) => {
|
||||
const error = async (code: string) => {
|
||||
attempts++;
|
||||
|
||||
if (attempts <= 5) {
|
||||
// try 5 more times before actually failing
|
||||
|
||||
console.log("fetch attempt", attempts);
|
||||
await fetchFile(url);
|
||||
} else {
|
||||
// if it still fails, then throw an error and quit
|
||||
self.postMessage({
|
||||
cobaltFetchWorker: {
|
||||
// TODO: return proper error code here
|
||||
// (error.code and not just random shit i typed up)
|
||||
error: code,
|
||||
}
|
||||
});
|
||||
self.close();
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
const response = await fetch(url);
|
||||
|
||||
if (!response.ok) {
|
||||
return error("file response wasn't ok");
|
||||
}
|
||||
|
||||
const contentType = response.headers.get('Content-Type') || 'application/octet-stream';
|
||||
const contentLength = response.headers.get('Content-Length');
|
||||
|
||||
const totalBytes = contentLength ? parseInt(contentLength, 10) : null;
|
||||
const reader = response.body?.getReader();
|
||||
|
||||
const storage = await OPFSStorage.init();
|
||||
|
||||
if (!reader) {
|
||||
return error("no reader");
|
||||
}
|
||||
|
||||
let receivedBytes = 0;
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
|
||||
await storage.write(value, receivedBytes);
|
||||
receivedBytes += value.length;
|
||||
|
||||
if (totalBytes) {
|
||||
self.postMessage({
|
||||
cobaltFetchWorker: {
|
||||
progress: Math.round((receivedBytes / totalBytes) * 100),
|
||||
size: receivedBytes,
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (receivedBytes === 0) {
|
||||
return error("tunnel is broken");
|
||||
}
|
||||
|
||||
const file = await storage.res();
|
||||
|
||||
if (Number(contentLength) !== file.size) {
|
||||
return error("file was not downloaded fully");
|
||||
}
|
||||
|
||||
self.postMessage({
|
||||
cobaltFetchWorker: {
|
||||
result: {
|
||||
file,
|
||||
type: contentType,
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (e) {
|
||||
console.log(e);
|
||||
return error("error when downloading the file");
|
||||
}
|
||||
}
|
||||
|
||||
self.onmessage = async (event: MessageEvent) => {
|
||||
if (event.data.cobaltFetchWorker) {
|
||||
await fetchFile(event.data.cobaltFetchWorker.url);
|
||||
self.close();
|
||||
}
|
||||
}
|
||||
105
web/src/lib/task-manager/workers/remux.ts
Normal file
105
web/src/lib/task-manager/workers/remux.ts
Normal file
@@ -0,0 +1,105 @@
|
||||
import LibAVWrapper from "$lib/libav";
|
||||
|
||||
import type { FileInfo } from "$lib/types/libav";
|
||||
import type { CobaltFileReference } from "$lib/types/storage";
|
||||
|
||||
const error = (code: string) => {
|
||||
self.postMessage({
|
||||
cobaltRemuxWorker: {
|
||||
error: `error.${code}`,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
const remux = async (files: CobaltFileReference[], args: string[], output: FileInfo) => {
|
||||
if (!(files && output && args)) return;
|
||||
|
||||
const ff = new LibAVWrapper((progress) => {
|
||||
self.postMessage({
|
||||
cobaltRemuxWorker: {
|
||||
progress: {
|
||||
durationProcessed: progress.out_time_sec,
|
||||
speed: progress.speed,
|
||||
size: progress.total_size,
|
||||
currentFrame: progress.frame,
|
||||
fps: progress.fps,
|
||||
}
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
ff.init();
|
||||
|
||||
try {
|
||||
// probing just the first file in files array (usually audio) for duration progress
|
||||
const probeFile = files[0]?.file;
|
||||
if (!probeFile) return error("couldn't probe one of files");
|
||||
|
||||
const file_info = await ff.probe(probeFile).catch((e) => {
|
||||
if (e?.message?.toLowerCase().includes("out of memory")) {
|
||||
console.error("uh oh! out of memory");
|
||||
console.error(e);
|
||||
|
||||
error("remux.out_of_resources");
|
||||
self.close();
|
||||
}
|
||||
});
|
||||
|
||||
if (!file_info?.format) {
|
||||
return error("remux.corrupted");
|
||||
}
|
||||
|
||||
self.postMessage({
|
||||
cobaltRemuxWorker: {
|
||||
progress: {
|
||||
duration: Number(file_info.format.duration),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (const file of files) {
|
||||
if (!file.type) {
|
||||
// TODO: better & more appropriate error code
|
||||
return error("remux.corrupted");
|
||||
}
|
||||
}
|
||||
|
||||
const render = await ff
|
||||
.render({
|
||||
files,
|
||||
output,
|
||||
args,
|
||||
})
|
||||
.catch((e) => {
|
||||
console.error("uh-oh! render error");
|
||||
console.error(e);
|
||||
// TODO: better error codes, there are more reasons for a crash
|
||||
error("remux.out_of_resources");
|
||||
});
|
||||
|
||||
if (!render) {
|
||||
console.log("not a valid file");
|
||||
return error("incorrect input or output");
|
||||
}
|
||||
|
||||
await ff.terminate();
|
||||
|
||||
self.postMessage({
|
||||
cobaltRemuxWorker: {
|
||||
render
|
||||
}
|
||||
});
|
||||
} catch (e) {
|
||||
console.log(e);
|
||||
return error("remux.crashed");
|
||||
}
|
||||
}
|
||||
|
||||
self.onmessage = async (event: MessageEvent) => {
|
||||
const ed = event.data.cobaltRemuxWorker;
|
||||
if (ed) {
|
||||
if (ed.files && ed.args && ed.output) {
|
||||
await remux(ed.files, ed.args, ed.output);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user