diff --git a/api/src/stream/internal.js b/api/src/stream/internal.js index 6d4ce318..93eeca85 100644 --- a/api/src/stream/internal.js +++ b/api/src/stream/internal.js @@ -8,11 +8,18 @@ const min = (a, b) => a < b ? a : b; async function* readChunks(streamInfo, size) { let read = 0n, chunksSinceTransplant = 0; + console.log(`[readChunks] Starting chunk download - Total size: ${size}, URL: ${streamInfo.url}`); + while (read < size) { if (streamInfo.controller.signal.aborted) { + console.log(`[readChunks] Controller aborted at read=${read}/${size}`); throw new Error("controller aborted"); } + const rangeStart = read; + const rangeEnd = read + CHUNK_SIZE; + console.log(`[readChunks] Requesting chunk: bytes=${rangeStart}-${rangeEnd}, read=${read}/${size}`); + const chunk = await request(streamInfo.url, { headers: { ...getHeaders('youtube'), @@ -23,12 +30,18 @@ async function* readChunks(streamInfo, size) { maxRedirections: 4 }); + console.log(`[readChunks] Chunk response: status=${chunk.statusCode}, content-length=${chunk.headers['content-length']}`); + if (chunk.statusCode === 403 && chunksSinceTransplant >= 3 && streamInfo.transplant) { chunksSinceTransplant = 0; + console.log(`[readChunks] 403 error after ${chunksSinceTransplant} chunks, attempting transplant`); try { await streamInfo.transplant(streamInfo.dispatcher); + console.log(`[readChunks] Transplant successful, retrying`); continue; - } catch {} + } catch (error) { + console.log(`[readChunks] Transplant failed: ${error}`); + } } chunksSinceTransplant++; @@ -36,24 +49,39 @@ async function* readChunks(streamInfo, size) { const expected = min(CHUNK_SIZE, size - read); const received = BigInt(chunk.headers['content-length']); + console.log(`[readChunks] Chunk validation: expected=${expected}, received=${received}, threshold=${expected / 2n}`); + if (received < expected / 2n) { + console.log(`[readChunks] CRITICAL: Received size (${received}) < expected/2 (${expected / 2n}), closing controller`); closeRequest(streamInfo.controller); } + let chunkDataSize = 0; for await (const data of chunk.body) { + chunkDataSize += data.length; yield data; } + console.log(`[readChunks] Chunk processed: data size=${chunkDataSize}, header size=${received}, read progress=${read + received}/${size}`); read += received; - } + } + console.log(`[readChunks] Download completed: total read=${read}/${size}`); } async function handleYoutubeStream(streamInfo, res) { const { signal } = streamInfo.controller; - const cleanup = () => (res.end(), closeRequest(streamInfo.controller)); + const cleanup = () => { + console.log(`[handleYoutubeStream] Cleanup called`); + res.end(); + closeRequest(streamInfo.controller); + }; + + console.log(`[handleYoutubeStream] Starting YouTube stream for URL: ${streamInfo.url}`); try { let req, attempts = 3; + console.log(`[handleYoutubeStream] Starting HEAD request with ${attempts} attempts`); + while (attempts--) { req = await fetch(streamInfo.url, { headers: getHeaders('youtube'), @@ -62,25 +90,34 @@ async function handleYoutubeStream(streamInfo, res) { signal }); + console.log(`[handleYoutubeStream] HEAD response: status=${req.status}, url=${req.url}`); + streamInfo.url = req.url; if (req.status === 403 && streamInfo.transplant) { + console.log(`[handleYoutubeStream] Got 403, attempting transplant`); try { await streamInfo.transplant(streamInfo.dispatcher); - } catch { + console.log(`[handleYoutubeStream] Transplant successful`); + } catch (error) { + console.log(`[handleYoutubeStream] Transplant failed: ${error}`); break; } } else break; } const size = BigInt(req.headers.get('content-length')); + console.log(`[handleYoutubeStream] Content length: ${size}, status: ${req.status}`); if (req.status !== 200 || !size) { + console.log(`[handleYoutubeStream] Invalid response - status: ${req.status}, size: ${size}, calling cleanup`); return cleanup(); } + console.log(`[handleYoutubeStream] Creating generator for size: ${size}`); const generator = readChunks(streamInfo, size); const abortGenerator = () => { + console.log(`[handleYoutubeStream] Abort generator called`); generator.return(); signal.removeEventListener('abort', abortGenerator); } @@ -88,14 +125,21 @@ async function handleYoutubeStream(streamInfo, res) { signal.addEventListener('abort', abortGenerator); const stream = Readable.from(generator); + console.log(`[handleYoutubeStream] Created readable stream`); + // Set response headers for (const headerName of ['content-type', 'content-length']) { const headerValue = req.headers.get(headerName); - if (headerValue) res.setHeader(headerName, headerValue); + if (headerValue) { + res.setHeader(headerName, headerValue); + console.log(`[handleYoutubeStream] Set header ${headerName}: ${headerValue}`); + } } + console.log(`[handleYoutubeStream] Starting pipe operation`); pipe(stream, res, cleanup); - } catch { + } catch (error) { + console.log(`[handleYoutubeStream] Error occurred: ${error}`); cleanup(); } } @@ -142,14 +186,18 @@ async function handleGenericStream(streamInfo, res) { } export function internalStream(streamInfo, res) { + console.log(`[internalStream] Starting stream - service: ${streamInfo.service}, isHLS: ${streamInfo.isHLS}, URL: ${streamInfo.url}`); + if (streamInfo.headers) { streamInfo.headers.delete('icy-metadata'); } if (streamInfo.service === 'youtube' && !streamInfo.isHLS) { + console.log(`[internalStream] Routing to handleYoutubeStream`); return handleYoutubeStream(streamInfo, res); } + console.log(`[internalStream] Routing to handleGenericStream`); return handleGenericStream(streamInfo, res); } diff --git a/api/src/stream/shared.js b/api/src/stream/shared.js index ec06339d..e997b20c 100644 --- a/api/src/stream/shared.js +++ b/api/src/stream/shared.js @@ -41,13 +41,38 @@ export function getHeaders(service) { } export function pipe(from, to, done) { - from.on('error', done) - .on('close', done); + let bytesTransferred = 0; + let startTime = Date.now(); + console.log(`[pipe] Starting pipe operation`); - to.on('error', done) - .on('close', done); + from.on('error', (error) => { + console.log(`[pipe] Source stream error after ${bytesTransferred} bytes: ${error}`); + done(error); + }) + .on('close', () => { + const duration = Date.now() - startTime; + console.log(`[pipe] Source stream closed after ${bytesTransferred} bytes in ${duration}ms`); + done(); + }) .on('data', (chunk) => { + bytesTransferred += chunk.length; + // Log every 8MB (chunk size) or first few chunks + if (bytesTransferred % (8 * 1024 * 1024) < chunk.length || bytesTransferred < 32 * 1024) { + console.log(`[pipe] Data transferred: ${bytesTransferred} bytes`); + } + }); + + to.on('error', (error) => { + console.log(`[pipe] Destination stream error after ${bytesTransferred} bytes: ${error}`); + done(error); + }) + .on('close', () => { + const duration = Date.now() - startTime; + console.log(`[pipe] Destination stream closed after ${bytesTransferred} bytes in ${duration}ms`); + done(); + }); from.pipe(to); + console.log(`[pipe] Pipe established between streams`); } export async function estimateTunnelLength(streamInfo, multiplier = 1.1) {