From f54d2d82cb591a08ed6adf279e2a3981ef4aee1c Mon Sep 17 00:00:00 2001 From: dumbmoron Date: Sun, 23 Jun 2024 13:54:16 +0000 Subject: [PATCH] stream: use closeRequest instead of abort() directly --- src/modules/stream/internal.js | 11 ++++++----- src/modules/stream/manage.js | 3 ++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/modules/stream/internal.js b/src/modules/stream/internal.js index 5a28af93..4587d48a 100644 --- a/src/modules/stream/internal.js +++ b/src/modules/stream/internal.js @@ -1,6 +1,6 @@ import { request } from 'undici'; import { Readable } from 'node:stream'; -import { getHeaders, pipe } from './shared.js'; +import { closeRequest, getHeaders, pipe } from './shared.js'; import { handleHlsPlaylist, isHlsRequest } from './internal-hls.js'; const CHUNK_SIZE = BigInt(8e6); // 8 MB @@ -26,7 +26,7 @@ async function* readChunks(streamInfo, size) { const received = BigInt(chunk.headers['content-length']); if (received < expected / 2n) { - streamInfo.controller.abort(); + closeRequest(streamInfo.controller); } for await (const data of chunk.body) { @@ -39,7 +39,7 @@ async function* readChunks(streamInfo, size) { async function handleYoutubeStream(streamInfo, res) { const { signal } = streamInfo.controller; - const cleanup = () => (res.end(), streamInfo.controller.abort()); + const cleanup = () => (res.end(), closeRequest(streamInfo.controller)); try { const req = await fetch(streamInfo.url, { @@ -80,7 +80,7 @@ async function handleYoutubeStream(streamInfo, res) { async function handleGenericStream(streamInfo, res) { const { signal } = streamInfo.controller; - const cleanup = () => (res.end(), streamInfo.controller.abort()); + const cleanup = () => (res.end(), closeRequest(streamInfo.controller)); try { const req = await request(streamInfo.url, { @@ -99,7 +99,7 @@ async function handleGenericStream(streamInfo, res) { res.setHeader(name, value) if (req.statusCode < 200 || req.statusCode > 299) - return res.end(); + return cleanup(); if (isHlsRequest(req)) { await handleHlsPlaylist(streamInfo, req, res); @@ -107,6 +107,7 @@ async function handleGenericStream(streamInfo, res) { pipe(req.body, res, cleanup); } } catch { + closeRequest(streamInfo.controller); cleanup(); } } diff --git a/src/modules/stream/manage.js b/src/modules/stream/manage.js index b02c6084..031d6711 100644 --- a/src/modules/stream/manage.js +++ b/src/modules/stream/manage.js @@ -5,6 +5,7 @@ import { nanoid } from "nanoid"; import { decryptStream, encryptStream, generateHmac } from "../sub/crypto.js"; import { env } from "../config.js"; import { strict as assert } from "assert"; +import { closeRequest } from "./shared.js"; // optional dependency const freebind = env.freebindCIDR && await import('freebind').catch(() => {}); @@ -109,7 +110,7 @@ export function destroyInternalStream(url) { const id = url.searchParams.get('id'); if (internalStreamCache[id]) { - internalStreamCache[id].controller.abort(); + closeRequest(internalStreamCache[id].controller); delete internalStreamCache[id]; } }