From 33c3c398fc6361ddc4f684d8334e7a41482de7bf Mon Sep 17 00:00:00 2001 From: jj Date: Sun, 23 Jun 2024 17:37:02 +0200 Subject: [PATCH] stream/internal: don't abort immediately after close for generic streams (#584) * stream: move closeRequest to shared functions * stream: use closeRequest instead of abort() directly * stream/internal: don't abort immediately after close for generic streams --- src/modules/stream/internal.js | 12 +++++++----- src/modules/stream/manage.js | 3 ++- src/modules/stream/shared.js | 4 ++++ src/modules/stream/types.js | 6 +----- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/modules/stream/internal.js b/src/modules/stream/internal.js index 5a28af93..8ab2ec76 100644 --- a/src/modules/stream/internal.js +++ b/src/modules/stream/internal.js @@ -1,6 +1,6 @@ import { request } from 'undici'; import { Readable } from 'node:stream'; -import { getHeaders, pipe } from './shared.js'; +import { closeRequest, getHeaders, pipe } from './shared.js'; import { handleHlsPlaylist, isHlsRequest } from './internal-hls.js'; const CHUNK_SIZE = BigInt(8e6); // 8 MB @@ -26,7 +26,7 @@ async function* readChunks(streamInfo, size) { const received = BigInt(chunk.headers['content-length']); if (received < expected / 2n) { - streamInfo.controller.abort(); + closeRequest(streamInfo.controller); } for await (const data of chunk.body) { @@ -39,7 +39,7 @@ async function* readChunks(streamInfo, size) { async function handleYoutubeStream(streamInfo, res) { const { signal } = streamInfo.controller; - const cleanup = () => (res.end(), streamInfo.controller.abort()); + const cleanup = () => (res.end(), closeRequest(streamInfo.controller)); try { const req = await fetch(streamInfo.url, { @@ -80,7 +80,7 @@ async function handleYoutubeStream(streamInfo, res) { async function handleGenericStream(streamInfo, res) { const { signal } = streamInfo.controller; - const cleanup = () => (res.end(), streamInfo.controller.abort()); + const cleanup = () => res.end(); try { const req = await request(streamInfo.url, { @@ -94,12 +94,13 @@ async function handleGenericStream(streamInfo, res) { }); res.status(req.statusCode); + req.body.on('error', () => {}); for (const [ name, value ] of Object.entries(req.headers)) res.setHeader(name, value) if (req.statusCode < 200 || req.statusCode > 299) - return res.end(); + return cleanup(); if (isHlsRequest(req)) { await handleHlsPlaylist(streamInfo, req, res); @@ -107,6 +108,7 @@ async function handleGenericStream(streamInfo, res) { pipe(req.body, res, cleanup); } } catch { + closeRequest(streamInfo.controller); cleanup(); } } diff --git a/src/modules/stream/manage.js b/src/modules/stream/manage.js index b02c6084..031d6711 100644 --- a/src/modules/stream/manage.js +++ b/src/modules/stream/manage.js @@ -5,6 +5,7 @@ import { nanoid } from "nanoid"; import { decryptStream, encryptStream, generateHmac } from "../sub/crypto.js"; import { env } from "../config.js"; import { strict as assert } from "assert"; +import { closeRequest } from "./shared.js"; // optional dependency const freebind = env.freebindCIDR && await import('freebind').catch(() => {}); @@ -109,7 +110,7 @@ export function destroyInternalStream(url) { const id = url.searchParams.get('id'); if (internalStreamCache[id]) { - internalStreamCache[id].controller.abort(); + closeRequest(internalStreamCache[id].controller); delete internalStreamCache[id]; } } diff --git a/src/modules/stream/shared.js b/src/modules/stream/shared.js index 1e2f2e25..fd7d1569 100644 --- a/src/modules/stream/shared.js +++ b/src/modules/stream/shared.js @@ -16,6 +16,10 @@ const serviceHeaders = { } } +export function closeRequest(controller) { + try { controller.abort() } catch {} +} + export function closeResponse(res) { if (!res.headersSent) { res.sendStatus(500); diff --git a/src/modules/stream/types.js b/src/modules/stream/types.js index 2372d6c1..000b7f7f 100644 --- a/src/modules/stream/types.js +++ b/src/modules/stream/types.js @@ -6,7 +6,7 @@ import { create as contentDisposition } from "content-disposition-header"; import { metadataManager } from "../sub/utils.js"; import { destroyInternalStream } from "./manage.js"; import { env, ffmpegArgs, hlsExceptions } from "../config.js"; -import { getHeaders, closeResponse, pipe } from "./shared.js"; +import { getHeaders, closeRequest, closeResponse, pipe } from "./shared.js"; function toRawHeaders(headers) { return Object.entries(headers) @@ -14,10 +14,6 @@ function toRawHeaders(headers) { .join(''); } -function closeRequest(controller) { - try { controller.abort() } catch {} -} - function killProcess(p) { // ask the process to terminate itself gracefully p?.kill('SIGTERM');