From 92b8bd82ee2c772a0dfe361d075a28bf0f7c1128 Mon Sep 17 00:00:00 2001 From: celebrateyang Date: Sun, 8 Jun 2025 23:07:56 +0800 Subject: [PATCH] roll back --- api/src/stream/internal.js | 97 ++++++++++---------------------------- api/src/stream/shared.js | 40 ++-------------- 2 files changed, 28 insertions(+), 109 deletions(-) diff --git a/api/src/stream/internal.js b/api/src/stream/internal.js index 3e54085b..6d4ce318 100644 --- a/api/src/stream/internal.js +++ b/api/src/stream/internal.js @@ -13,55 +13,31 @@ async function* readChunks(streamInfo, size) { throw new Error("controller aborted"); } - let retries = 3; - let chunk; - - // Retry mechanism for failed chunks - while (retries > 0) { - try { - chunk = await request(streamInfo.url, { - headers: { - ...getHeaders('youtube'), - Range: `bytes=${read}-${read + CHUNK_SIZE}` - }, - dispatcher: streamInfo.dispatcher, - signal: streamInfo.controller.signal, - maxRedirections: 4 - }); + const chunk = await request(streamInfo.url, { + headers: { + ...getHeaders('youtube'), + Range: `bytes=${read}-${read + CHUNK_SIZE}` + }, + dispatcher: streamInfo.dispatcher, + signal: streamInfo.controller.signal, + maxRedirections: 4 + }); - // Check for valid response before processing - if (chunk.statusCode === 206 || chunk.statusCode === 200) { - break; // Success, exit retry loop - } else if (chunk.statusCode === 403 && chunksSinceTransplant >= 3 && streamInfo.transplant) { - chunksSinceTransplant = 0; - try { - await streamInfo.transplant(streamInfo.dispatcher); - continue; // Retry with transplanted connection - } catch {} - } - - // For other status codes, retry - throw new Error(`HTTP ${chunk.statusCode} response`); - - } catch (error) { - retries--; - if (retries === 0) { - throw new Error(`Failed to fetch chunk after 3 attempts: ${error.message}`); - } - - // Wait before retry (exponential backoff) - await new Promise(resolve => setTimeout(resolve, (4 - retries) * 1000)); - } + if (chunk.statusCode === 403 && chunksSinceTransplant >= 3 && streamInfo.transplant) { + chunksSinceTransplant = 0; + try { + await streamInfo.transplant(streamInfo.dispatcher); + continue; + } catch {} } chunksSinceTransplant++; - + const expected = min(CHUNK_SIZE, size - read); const received = BigInt(chunk.headers['content-length']); - // Validate we received some data - if (received === 0n && expected > 0n) { - throw new Error("Received empty chunk when data was expected"); + if (received < expected / 2n) { + closeRequest(streamInfo.controller); } for await (const data of chunk.body) { @@ -74,13 +50,7 @@ async function* readChunks(streamInfo, size) { async function handleYoutubeStream(streamInfo, res) { const { signal } = streamInfo.controller; - const cleanup = () => { - // Only end response if headers haven't been sent to prevent 0-byte files - if (!res.headersSent) { - res.end(); - } - closeRequest(streamInfo.controller); - }; + const cleanup = () => (res.end(), closeRequest(streamInfo.controller)); try { let req, attempts = 3; @@ -108,21 +78,6 @@ async function handleYoutubeStream(streamInfo, res) { return cleanup(); } - // Check if client sent a Range header for partial content request - const rangeHeader = streamInfo.headers?.get('range'); - - if (rangeHeader && req.headers.get('accept-ranges') === 'bytes') { - // Handle range request - delegate to handleGenericStream for proper range handling - return await handleGenericStream(streamInfo, res); - } - - // Set headers before starting stream to ensure they're sent - for (const headerName of ['content-type', 'content-length', 'accept-ranges']) { - const headerValue = req.headers.get(headerName); - if (headerValue) res.setHeader(headerName, headerValue); - } - - // Full file request - use chunked download approach const generator = readChunks(streamInfo, size); const abortGenerator = () => { @@ -134,17 +89,13 @@ async function handleYoutubeStream(streamInfo, res) { const stream = Readable.from(generator); - // Add error handling to prevent 0-byte files - stream.on('error', (error) => { - console.error('YouTube stream error:', error); - if (!res.headersSent) { - res.status(500).end(); - } - }); + for (const headerName of ['content-type', 'content-length']) { + const headerValue = req.headers.get(headerName); + if (headerValue) res.setHeader(headerName, headerValue); + } pipe(stream, res, cleanup); - } catch (error) { - console.error('YouTube stream handling error:', error); + } catch { cleanup(); } } diff --git a/api/src/stream/shared.js b/api/src/stream/shared.js index 4b091c84..ec06339d 100644 --- a/api/src/stream/shared.js +++ b/api/src/stream/shared.js @@ -41,43 +41,11 @@ export function getHeaders(service) { } export function pipe(from, to, done) { - let finished = false; - let bytesWritten = 0; - - const cleanup = (error) => { - if (finished) return; - finished = true; - - // Log if we got 0 bytes to help debug production issues - if (bytesWritten === 0 && !error) { - console.warn('Stream completed with 0 bytes written - potential issue'); - } - - done(error); - }; + from.on('error', done) + .on('close', done); - from.on('error', (error) => { - console.error('Stream source error:', error); - cleanup(error); - }) - .on('close', () => { - if (!finished) { - cleanup(); - } - }) - .on('data', (chunk) => { - bytesWritten += chunk.length; - }); - - to.on('error', (error) => { - console.error('Stream destination error:', error); - cleanup(error); - }) - .on('close', () => { - if (!finished) { - cleanup(); - } - }); + to.on('error', done) + .on('close', done); from.pipe(to); }