From 2ab1a01e910845d36390a59c08bebd70d4beb4d1 Mon Sep 17 00:00:00 2001 From: dumbmoron Date: Fri, 14 Jun 2024 08:44:44 +0000 Subject: [PATCH] stream/internal: return from generator when AbortController aborts --- src/modules/stream/internal.js | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/modules/stream/internal.js b/src/modules/stream/internal.js index 535bba2d..ef8ec6f7 100644 --- a/src/modules/stream/internal.js +++ b/src/modules/stream/internal.js @@ -1,6 +1,5 @@ import { request } from 'undici'; import { Readable } from 'node:stream'; -import { assert } from 'console'; import { getHeaders, pipe } from './shared.js'; import { handleHlsPlaylist, isHlsRequest } from './internal-hls.js'; @@ -36,21 +35,17 @@ async function* readChunks(streamInfo, size) { read += received; } -} - -function chunkedStream(streamInfo, size) { - assert(streamInfo.controller instanceof AbortController); - const stream = Readable.from(readChunks(streamInfo, size)); - return stream; } async function handleYoutubeStream(streamInfo, res) { + const { signal } = streamInfo.controller; + try { const req = await fetch(streamInfo.url, { headers: getHeaders('youtube'), method: 'HEAD', dispatcher: streamInfo.dispatcher, - signal: streamInfo.controller.signal + signal }); streamInfo.url = req.url; @@ -60,7 +55,16 @@ async function handleYoutubeStream(streamInfo, res) { return res.end(); } - const stream = chunkedStream(streamInfo, size); + const generator = readChunks(streamInfo, size); + + const abortGenerator = () => { + generator.return(); + signal.removeEventListener('abort', abortGenerator); + } + + signal.addEventListener('abort', abortGenerator); + + const stream = Readable.from(generator); for (const headerName of ['content-type', 'content-length']) { const headerValue = req.headers.get(headerName); @@ -69,6 +73,7 @@ async function handleYoutubeStream(streamInfo, res) { pipe(stream, res, () => res.end()); } catch { + signal.abort(); res.end(); } }