stream/internal: return from generator when AbortController aborts

This commit is contained in:
dumbmoron 2024-06-14 08:44:44 +00:00
parent 21d5b4b8d4
commit 2ab1a01e91
No known key found for this signature in database

View File

@ -1,6 +1,5 @@
import { request } from 'undici'; import { request } from 'undici';
import { Readable } from 'node:stream'; import { Readable } from 'node:stream';
import { assert } from 'console';
import { getHeaders, pipe } from './shared.js'; import { getHeaders, pipe } from './shared.js';
import { handleHlsPlaylist, isHlsRequest } from './internal-hls.js'; import { handleHlsPlaylist, isHlsRequest } from './internal-hls.js';
@ -36,21 +35,17 @@ async function* readChunks(streamInfo, size) {
read += received; read += received;
} }
}
function chunkedStream(streamInfo, size) {
assert(streamInfo.controller instanceof AbortController);
const stream = Readable.from(readChunks(streamInfo, size));
return stream;
} }
async function handleYoutubeStream(streamInfo, res) { async function handleYoutubeStream(streamInfo, res) {
const { signal } = streamInfo.controller;
try { try {
const req = await fetch(streamInfo.url, { const req = await fetch(streamInfo.url, {
headers: getHeaders('youtube'), headers: getHeaders('youtube'),
method: 'HEAD', method: 'HEAD',
dispatcher: streamInfo.dispatcher, dispatcher: streamInfo.dispatcher,
signal: streamInfo.controller.signal signal
}); });
streamInfo.url = req.url; streamInfo.url = req.url;
@ -60,7 +55,16 @@ async function handleYoutubeStream(streamInfo, res) {
return res.end(); return res.end();
} }
const stream = chunkedStream(streamInfo, size); const generator = readChunks(streamInfo, size);
const abortGenerator = () => {
generator.return();
signal.removeEventListener('abort', abortGenerator);
}
signal.addEventListener('abort', abortGenerator);
const stream = Readable.from(generator);
for (const headerName of ['content-type', 'content-length']) { for (const headerName of ['content-type', 'content-length']) {
const headerValue = req.headers.get(headerName); const headerValue = req.headers.get(headerName);
@ -69,6 +73,7 @@ async function handleYoutubeStream(streamInfo, res) {
pipe(stream, res, () => res.end()); pipe(stream, res, () => res.end());
} catch { } catch {
signal.abort();
res.end(); res.end();
} }
} }