diff --git a/web/src/components/queue/ProcessingQueueItem.svelte b/web/src/components/queue/ProcessingQueueItem.svelte
index f3da210d..ac6aac65 100644
--- a/web/src/components/queue/ProcessingQueueItem.svelte
+++ b/web/src/components/queue/ProcessingQueueItem.svelte
@@ -65,7 +65,7 @@
{#if info.state === "done"}
- {formatFileSize(info.resultFile?.size)}
+ {formatFileSize(info.resultFile?.file?.size)}
{/if}
{#if info.state === "running"}
@@ -95,7 +95,7 @@
{#if info.state === "done" && info.resultFile}
diff --git a/web/src/lib/libav.ts b/web/src/lib/libav.ts
index b6275f26..6beac0c5 100644
--- a/web/src/lib/libav.ts
+++ b/web/src/lib/libav.ts
@@ -1,3 +1,4 @@
+import { OPFSStorage } from "$lib/storage";
import LibAV, { type LibAV as LibAVInstance } from "@imput/libav.js-remux-cli";
import type { FfprobeData } from "fluent-ffmpeg";
@@ -69,27 +70,18 @@ export default class LibAVWrapper {
try {
for (let i = 0; i < files.length; i++) {
- await libav.mkreadaheadfile(`input${i}`, files[i]);
+ const file = files[i].file;
+
+ await libav.mkreadaheadfile(`input${i}`, file);
ffInputs.push('-i', `input${i}`);
}
await libav.mkwriterdev(outputName);
await libav.mkwriterdev('progress.txt');
- const totalInputSize = files.reduce((a, b) => a + b.size, 0);
+ const storage = await OPFSStorage.init();
- const MB = 1024 * 1024;
- const chunks: Uint8Array[] = [];
- const chunkSize = Math.min(512 * MB, totalInputSize);
-
- // since we expect the output file to be roughly the same size
- // as inputs, preallocate its size for the output
- for (let toAllocate = totalInputSize; toAllocate > 0; toAllocate -= chunkSize) {
- chunks.push(new Uint8Array(chunkSize));
- }
-
- let actualSize = 0;
- libav.onwrite = (name, pos, data) => {
+ libav.onwrite = async (name, pos, data) => {
if (name === 'progress.txt') {
try {
return this.#emitProgress(data);
@@ -98,26 +90,7 @@ export default class LibAVWrapper {
}
} else if (name !== outputName) return;
- const writeEnd = pos + data.length;
- if (writeEnd > chunkSize * chunks.length) {
- chunks.push(new Uint8Array(chunkSize));
- }
-
- const chunkIndex = pos / chunkSize | 0;
- const offset = pos - (chunkSize * chunkIndex);
-
- if (offset + data.length > chunkSize) {
- chunks[chunkIndex].set(
- data.subarray(0, chunkSize - offset), offset
- );
- chunks[chunkIndex + 1].set(
- data.subarray(chunkSize - offset), 0
- );
- } else {
- chunks[chunkIndex].set(data, offset);
- }
-
- actualSize = Math.max(writeEnd, actualSize);
+ await storage.write(data, pos);
};
await libav.ffmpeg([
@@ -130,30 +103,14 @@ export default class LibAVWrapper {
outputName
]);
- // if we didn't need as much space as we allocated for some reason,
- // shrink the buffers so that we don't inflate the file with zeroes
- const outputView: Uint8Array[] = [];
+ const file = await storage.res();
- for (let i = 0; i < chunks.length; ++i) {
- outputView.push(
- chunks[i].subarray(
- 0, Math.min(chunkSize, actualSize)
- )
- );
+ if (file.size === 0) return;
- actualSize -= chunkSize;
- if (actualSize <= 0) {
- break;
- }
+ return {
+ file,
+ type: output.type,
}
-
- const renderBlob = new Blob(
- outputView,
- { type: output.type }
- );
-
- if (renderBlob.size === 0) return;
- return renderBlob;
} finally {
try {
await libav.unlink(outputName);
diff --git a/web/src/lib/queen-bee/queue.ts b/web/src/lib/queen-bee/queue.ts
index 10a9fef8..141d800c 100644
--- a/web/src/lib/queen-bee/queue.ts
+++ b/web/src/lib/queen-bee/queue.ts
@@ -23,7 +23,10 @@ export const createRemuxPipeline = (file: File) => {
workerId: crypto.randomUUID(),
parentId,
workerArgs: {
- files: [file],
+ files: [{
+ file,
+ type: file.type,
+ }],
ffargs: [
"-c", "copy",
"-map", "0"
diff --git a/web/src/lib/queen-bee/run-worker.ts b/web/src/lib/queen-bee/run-worker.ts
index 4c05072f..47746806 100644
--- a/web/src/lib/queen-bee/run-worker.ts
+++ b/web/src/lib/queen-bee/run-worker.ts
@@ -8,6 +8,7 @@ import { pipelineTaskDone, itemError, queue } from "$lib/state/queen-bee/queue";
import type { FileInfo } from "$lib/types/libav";
import type { CobaltQueue } from "$lib/types/queue";
import type { CobaltPipelineItem } from "$lib/types/workers";
+import type { CobaltFileReference } from "$lib/types/storage";
const killWorker = (worker: Worker, unsubscribe: () => void, interval?: NodeJS.Timeout) => {
unsubscribe();
@@ -15,7 +16,14 @@ const killWorker = (worker: Worker, unsubscribe: () => void, interval?: NodeJS.T
if (interval) clearInterval(interval);
}
-export const runRemuxWorker = async (workerId: string, parentId: string, files: File[], args: string[], output: FileInfo, filename: string) => {
+export const runRemuxWorker = async (
+ workerId: string,
+ parentId: string,
+ files: CobaltFileReference[],
+ args: string[],
+ output: FileInfo,
+ filename: string
+) => {
const worker = new RemuxWorker();
// sometimes chrome refuses to start libav wasm,
@@ -86,9 +94,7 @@ export const runRemuxWorker = async (workerId: string, parentId: string, files:
return pipelineTaskDone(
parentId,
workerId,
- new File([eventData.render], eventData.filename, {
- type: eventData.render.type,
- })
+ eventData.render,
);
}
@@ -127,12 +133,12 @@ export const runFetchWorker = async (workerId: string, parentId: string, url: st
})
}
- if (eventData.file) {
+ if (eventData.result) {
killWorker(worker, unsubscribe);
return pipelineTaskDone(
parentId,
workerId,
- eventData.file,
+ eventData.result,
);
}
@@ -144,7 +150,7 @@ export const runFetchWorker = async (workerId: string, parentId: string, url: st
}
export const startWorker = async ({ worker, workerId, parentId, workerArgs }: CobaltPipelineItem) => {
- let files: File[] = [];
+ let files: CobaltFileReference[] = [];
switch (worker) {
case "remux":
diff --git a/web/src/lib/state/queen-bee/queue.ts b/web/src/lib/state/queen-bee/queue.ts
index 1ff45162..5151551b 100644
--- a/web/src/lib/state/queen-bee/queue.ts
+++ b/web/src/lib/state/queen-bee/queue.ts
@@ -1,8 +1,10 @@
import { readable, type Updater } from "svelte/store";
import { checkTasks } from "$lib/queen-bee/scheduler";
-import type { CobaltQueue, CobaltQueueItem } from "$lib/types/queue";
import { clearCurrentTasks, removeWorkerFromQueue } from "$lib/state/queen-bee/current-tasks";
+import type { CobaltFileReference } from "$lib/types/storage";
+import type { CobaltQueue, CobaltQueueItem } from "$lib/types/queue";
+
let update: (_: Updater) => void;
const queue = readable(
@@ -39,7 +41,7 @@ export function itemError(id: string, workerId: string, error: string) {
checkTasks();
}
-export function itemDone(id: string, file: File) {
+export function itemDone(id: string, file: CobaltFileReference) {
update(queueData => {
if (queueData[id]) {
if (queueData[id].state === "running" && queueData[id].pipelineResults) {
@@ -58,7 +60,7 @@ export function itemDone(id: string, file: File) {
checkTasks();
}
-export function pipelineTaskDone(id: string, workerId: string, file: File) {
+export function pipelineTaskDone(id: string, workerId: string, file: CobaltFileReference) {
update(queueData => {
if (queueData[id] && queueData[id].state === "running") {
queueData[id].pipelineResults = [...queueData[id].pipelineResults || [], file];
diff --git a/web/src/lib/storage.ts b/web/src/lib/storage.ts
new file mode 100644
index 00000000..68a5982c
--- /dev/null
+++ b/web/src/lib/storage.ts
@@ -0,0 +1,53 @@
+export class OPFSStorage {
+ #root;
+ #handle;
+ #io;
+
+ constructor(root: FileSystemDirectoryHandle, handle: FileSystemFileHandle, reader: FileSystemSyncAccessHandle) {
+ this.#root = root;
+ this.#handle = handle;
+ this.#io = reader;
+ }
+
+ static async init() {
+ const root = await navigator.storage.getDirectory();
+ const cobaltDir = await root.getDirectoryHandle('cobalt-processing-data', { create: true });
+ const handle = await cobaltDir.getFileHandle(crypto.randomUUID(), { create: true });
+ const reader = await handle.createSyncAccessHandle();
+
+ return new this(cobaltDir, handle, reader);
+ }
+
+ async res() {
+ // await for compat with ios 15
+ await this.#io.flush();
+ await this.#io.close();
+ return await this.#handle.getFile();
+ }
+
+ read(size: number, offset: number) {
+ const out = new Uint8Array(size);
+ const bytesRead = this.#io.read(out, { at: offset });
+
+ return out.subarray(0, bytesRead);
+ }
+
+ async write(data: Uint8Array | Int8Array, offset: number) {
+ const writ = this.#io.write(data, { at: offset });
+
+ if (data.length !== writ) {
+ console.log(data.length, writ);
+ }
+
+ return writ;
+ }
+
+ async destroy() {
+ await this.#root.removeEntry(this.#handle.name);
+ }
+
+ static isAvailable() {
+ return !!navigator.storage?.getDirectory;
+ }
+}
+
diff --git a/web/src/lib/types/libav.ts b/web/src/lib/types/libav.ts
index 92a07a7d..9871e4cb 100644
--- a/web/src/lib/types/libav.ts
+++ b/web/src/lib/types/libav.ts
@@ -1,10 +1,12 @@
+import type { CobaltFileReference } from "$lib/types/storage";
+
export type FileInfo = {
type?: string,
extension?: string,
}
export type RenderParams = {
- files: File[],
+ files: CobaltFileReference[],
output: FileInfo,
args: string[],
}
diff --git a/web/src/lib/types/queue.ts b/web/src/lib/types/queue.ts
index f50093a4..c09e0d6b 100644
--- a/web/src/lib/types/queue.ts
+++ b/web/src/lib/types/queue.ts
@@ -1,3 +1,4 @@
+import type { CobaltFileReference } from "$lib/types/storage";
import type { CobaltPipelineItem, CobaltPipelineResultFileType } from "$lib/types/workers";
export type CobaltQueueItemState = "waiting" | "running" | "done" | "error";
@@ -19,12 +20,12 @@ export type CobaltQueueItemRunning = CobaltQueueBaseItem & {
state: "running",
runningWorker: string,
completedWorkers?: string[],
- pipelineResults?: File[],
+ pipelineResults?: CobaltFileReference[],
};
export type CobaltQueueItemDone = CobaltQueueBaseItem & {
state: "done",
- resultFile: File,
+ resultFile: CobaltFileReference,
};
export type CobaltQueueItemError = CobaltQueueBaseItem & {
diff --git a/web/src/lib/types/storage.ts b/web/src/lib/types/storage.ts
new file mode 100644
index 00000000..a9bb13a7
--- /dev/null
+++ b/web/src/lib/types/storage.ts
@@ -0,0 +1,4 @@
+export type CobaltFileReference = {
+ file: File,
+ type: string,
+}
diff --git a/web/src/lib/types/workers.ts b/web/src/lib/types/workers.ts
index 6a6e2c98..e42d8ba7 100644
--- a/web/src/lib/types/workers.ts
+++ b/web/src/lib/types/workers.ts
@@ -1,4 +1,5 @@
import type { FileInfo } from "$lib/types/libav";
+import type { CobaltFileReference } from "$lib/types/storage";
export const resultFileTypes = ["video", "audio", "image"] as const;
@@ -12,7 +13,7 @@ export type CobaltWorkerProgress = {
}
export type CobaltWorkerArgs = {
- files?: File[],
+ files?: CobaltFileReference[],
url?: string,
ffargs?: string[],
output?: FileInfo,
diff --git a/web/src/lib/workers/fetch.ts b/web/src/lib/workers/fetch.ts
index 0965c51d..aa30a4c4 100644
--- a/web/src/lib/workers/fetch.ts
+++ b/web/src/lib/workers/fetch.ts
@@ -1,3 +1,5 @@
+import { OPFSStorage } from "$lib/storage";
+
const error = (code: string) => {
// TODO: return proper errors and code here
self.postMessage({
@@ -22,20 +24,21 @@ const fetchFile = async (url: string) => {
const totalBytes = contentLength ? parseInt(contentLength, 10) : null;
const reader = response.body?.getReader();
+ const storage = await OPFSStorage.init();
+
if (!reader) {
error("no reader");
return self.close();
}
let receivedBytes = 0;
- const chunks = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
+ await storage.write(value, receivedBytes);
receivedBytes += value.length;
- chunks.push(value);
if (totalBytes) {
self.postMessage({
@@ -52,15 +55,22 @@ const fetchFile = async (url: string) => {
return self.close();
}
- const file = new File(chunks, "file", { type: contentType });
+ const file = await storage.res();
+
+ if (Number(contentLength) !== file.size) {
+ error("file is not downloaded fully");
+ }
self.postMessage({
cobaltFetchWorker: {
- file
+ result: {
+ file,
+ type: contentType,
+ }
}
});
} catch (e) {
- console.log(e)
+ console.log(e);
error("error when downloading the file");
return self.close();
}
diff --git a/web/src/lib/workers/remux.ts b/web/src/lib/workers/remux.ts
index 61f06bfc..1f3f6aff 100644
--- a/web/src/lib/workers/remux.ts
+++ b/web/src/lib/workers/remux.ts
@@ -1,5 +1,7 @@
import LibAVWrapper from "$lib/libav";
+
import type { FileInfo } from "$lib/types/libav";
+import type { CobaltFileReference } from "$lib/types/storage";
const error = (code: string) => {
self.postMessage({
@@ -25,14 +27,17 @@ const ff = new LibAVWrapper((progress) => {
ff.init();
-const remux = async (files: File[], args: string[], output: FileInfo, filename: string) => {
+const remux = async (files: CobaltFileReference[], args: string[], output: FileInfo, filename: string) => {
if (!(files && output && args)) return;
await ff.init();
try {
// probing just the first file in files array (usually audio) for duration progress
- const file_info = await ff.probe(files[0]).catch((e) => {
+ const probeFile = files[0]?.file;
+ if (!probeFile) return error("couldn't probe one of files");
+
+ const file_info = await ff.probe(probeFile).catch((e) => {
if (e?.message?.toLowerCase().includes("out of memory")) {
console.error("uh oh! out of memory");
console.error(e);
@@ -87,6 +92,7 @@ const remux = async (files: File[], args: string[], output: FileInfo, filename:
});
} catch (e) {
console.log(e);
+ return error("remux.crashed");
}
}