mirror of
https://github.com/imputnet/cobalt.git
synced 2025-06-28 09:28:29 +00:00
add log to check 0 byte error
This commit is contained in:
parent
92b8bd82ee
commit
f572d99cc2
@ -8,11 +8,18 @@ const min = (a, b) => a < b ? a : b;
|
|||||||
|
|
||||||
async function* readChunks(streamInfo, size) {
|
async function* readChunks(streamInfo, size) {
|
||||||
let read = 0n, chunksSinceTransplant = 0;
|
let read = 0n, chunksSinceTransplant = 0;
|
||||||
|
console.log(`[readChunks] Starting chunk download - Total size: ${size}, URL: ${streamInfo.url}`);
|
||||||
|
|
||||||
while (read < size) {
|
while (read < size) {
|
||||||
if (streamInfo.controller.signal.aborted) {
|
if (streamInfo.controller.signal.aborted) {
|
||||||
|
console.log(`[readChunks] Controller aborted at read=${read}/${size}`);
|
||||||
throw new Error("controller aborted");
|
throw new Error("controller aborted");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const rangeStart = read;
|
||||||
|
const rangeEnd = read + CHUNK_SIZE;
|
||||||
|
console.log(`[readChunks] Requesting chunk: bytes=${rangeStart}-${rangeEnd}, read=${read}/${size}`);
|
||||||
|
|
||||||
const chunk = await request(streamInfo.url, {
|
const chunk = await request(streamInfo.url, {
|
||||||
headers: {
|
headers: {
|
||||||
...getHeaders('youtube'),
|
...getHeaders('youtube'),
|
||||||
@ -23,12 +30,18 @@ async function* readChunks(streamInfo, size) {
|
|||||||
maxRedirections: 4
|
maxRedirections: 4
|
||||||
});
|
});
|
||||||
|
|
||||||
|
console.log(`[readChunks] Chunk response: status=${chunk.statusCode}, content-length=${chunk.headers['content-length']}`);
|
||||||
|
|
||||||
if (chunk.statusCode === 403 && chunksSinceTransplant >= 3 && streamInfo.transplant) {
|
if (chunk.statusCode === 403 && chunksSinceTransplant >= 3 && streamInfo.transplant) {
|
||||||
chunksSinceTransplant = 0;
|
chunksSinceTransplant = 0;
|
||||||
|
console.log(`[readChunks] 403 error after ${chunksSinceTransplant} chunks, attempting transplant`);
|
||||||
try {
|
try {
|
||||||
await streamInfo.transplant(streamInfo.dispatcher);
|
await streamInfo.transplant(streamInfo.dispatcher);
|
||||||
|
console.log(`[readChunks] Transplant successful, retrying`);
|
||||||
continue;
|
continue;
|
||||||
} catch {}
|
} catch (error) {
|
||||||
|
console.log(`[readChunks] Transplant failed: ${error}`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
chunksSinceTransplant++;
|
chunksSinceTransplant++;
|
||||||
@ -36,24 +49,39 @@ async function* readChunks(streamInfo, size) {
|
|||||||
const expected = min(CHUNK_SIZE, size - read);
|
const expected = min(CHUNK_SIZE, size - read);
|
||||||
const received = BigInt(chunk.headers['content-length']);
|
const received = BigInt(chunk.headers['content-length']);
|
||||||
|
|
||||||
|
console.log(`[readChunks] Chunk validation: expected=${expected}, received=${received}, threshold=${expected / 2n}`);
|
||||||
|
|
||||||
if (received < expected / 2n) {
|
if (received < expected / 2n) {
|
||||||
|
console.log(`[readChunks] CRITICAL: Received size (${received}) < expected/2 (${expected / 2n}), closing controller`);
|
||||||
closeRequest(streamInfo.controller);
|
closeRequest(streamInfo.controller);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let chunkDataSize = 0;
|
||||||
for await (const data of chunk.body) {
|
for await (const data of chunk.body) {
|
||||||
|
chunkDataSize += data.length;
|
||||||
yield data;
|
yield data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
console.log(`[readChunks] Chunk processed: data size=${chunkDataSize}, header size=${received}, read progress=${read + received}/${size}`);
|
||||||
read += received;
|
read += received;
|
||||||
}
|
}
|
||||||
|
console.log(`[readChunks] Download completed: total read=${read}/${size}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function handleYoutubeStream(streamInfo, res) {
|
async function handleYoutubeStream(streamInfo, res) {
|
||||||
const { signal } = streamInfo.controller;
|
const { signal } = streamInfo.controller;
|
||||||
const cleanup = () => (res.end(), closeRequest(streamInfo.controller));
|
const cleanup = () => {
|
||||||
|
console.log(`[handleYoutubeStream] Cleanup called`);
|
||||||
|
res.end();
|
||||||
|
closeRequest(streamInfo.controller);
|
||||||
|
};
|
||||||
|
|
||||||
|
console.log(`[handleYoutubeStream] Starting YouTube stream for URL: ${streamInfo.url}`);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
let req, attempts = 3;
|
let req, attempts = 3;
|
||||||
|
console.log(`[handleYoutubeStream] Starting HEAD request with ${attempts} attempts`);
|
||||||
|
|
||||||
while (attempts--) {
|
while (attempts--) {
|
||||||
req = await fetch(streamInfo.url, {
|
req = await fetch(streamInfo.url, {
|
||||||
headers: getHeaders('youtube'),
|
headers: getHeaders('youtube'),
|
||||||
@ -62,25 +90,34 @@ async function handleYoutubeStream(streamInfo, res) {
|
|||||||
signal
|
signal
|
||||||
});
|
});
|
||||||
|
|
||||||
|
console.log(`[handleYoutubeStream] HEAD response: status=${req.status}, url=${req.url}`);
|
||||||
|
|
||||||
streamInfo.url = req.url;
|
streamInfo.url = req.url;
|
||||||
if (req.status === 403 && streamInfo.transplant) {
|
if (req.status === 403 && streamInfo.transplant) {
|
||||||
|
console.log(`[handleYoutubeStream] Got 403, attempting transplant`);
|
||||||
try {
|
try {
|
||||||
await streamInfo.transplant(streamInfo.dispatcher);
|
await streamInfo.transplant(streamInfo.dispatcher);
|
||||||
} catch {
|
console.log(`[handleYoutubeStream] Transplant successful`);
|
||||||
|
} catch (error) {
|
||||||
|
console.log(`[handleYoutubeStream] Transplant failed: ${error}`);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else break;
|
} else break;
|
||||||
}
|
}
|
||||||
|
|
||||||
const size = BigInt(req.headers.get('content-length'));
|
const size = BigInt(req.headers.get('content-length'));
|
||||||
|
console.log(`[handleYoutubeStream] Content length: ${size}, status: ${req.status}`);
|
||||||
|
|
||||||
if (req.status !== 200 || !size) {
|
if (req.status !== 200 || !size) {
|
||||||
|
console.log(`[handleYoutubeStream] Invalid response - status: ${req.status}, size: ${size}, calling cleanup`);
|
||||||
return cleanup();
|
return cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
console.log(`[handleYoutubeStream] Creating generator for size: ${size}`);
|
||||||
const generator = readChunks(streamInfo, size);
|
const generator = readChunks(streamInfo, size);
|
||||||
|
|
||||||
const abortGenerator = () => {
|
const abortGenerator = () => {
|
||||||
|
console.log(`[handleYoutubeStream] Abort generator called`);
|
||||||
generator.return();
|
generator.return();
|
||||||
signal.removeEventListener('abort', abortGenerator);
|
signal.removeEventListener('abort', abortGenerator);
|
||||||
}
|
}
|
||||||
@ -88,14 +125,21 @@ async function handleYoutubeStream(streamInfo, res) {
|
|||||||
signal.addEventListener('abort', abortGenerator);
|
signal.addEventListener('abort', abortGenerator);
|
||||||
|
|
||||||
const stream = Readable.from(generator);
|
const stream = Readable.from(generator);
|
||||||
|
console.log(`[handleYoutubeStream] Created readable stream`);
|
||||||
|
|
||||||
|
// Set response headers
|
||||||
for (const headerName of ['content-type', 'content-length']) {
|
for (const headerName of ['content-type', 'content-length']) {
|
||||||
const headerValue = req.headers.get(headerName);
|
const headerValue = req.headers.get(headerName);
|
||||||
if (headerValue) res.setHeader(headerName, headerValue);
|
if (headerValue) {
|
||||||
|
res.setHeader(headerName, headerValue);
|
||||||
|
console.log(`[handleYoutubeStream] Set header ${headerName}: ${headerValue}`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
console.log(`[handleYoutubeStream] Starting pipe operation`);
|
||||||
pipe(stream, res, cleanup);
|
pipe(stream, res, cleanup);
|
||||||
} catch {
|
} catch (error) {
|
||||||
|
console.log(`[handleYoutubeStream] Error occurred: ${error}`);
|
||||||
cleanup();
|
cleanup();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -142,14 +186,18 @@ async function handleGenericStream(streamInfo, res) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function internalStream(streamInfo, res) {
|
export function internalStream(streamInfo, res) {
|
||||||
|
console.log(`[internalStream] Starting stream - service: ${streamInfo.service}, isHLS: ${streamInfo.isHLS}, URL: ${streamInfo.url}`);
|
||||||
|
|
||||||
if (streamInfo.headers) {
|
if (streamInfo.headers) {
|
||||||
streamInfo.headers.delete('icy-metadata');
|
streamInfo.headers.delete('icy-metadata');
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamInfo.service === 'youtube' && !streamInfo.isHLS) {
|
if (streamInfo.service === 'youtube' && !streamInfo.isHLS) {
|
||||||
|
console.log(`[internalStream] Routing to handleYoutubeStream`);
|
||||||
return handleYoutubeStream(streamInfo, res);
|
return handleYoutubeStream(streamInfo, res);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
console.log(`[internalStream] Routing to handleGenericStream`);
|
||||||
return handleGenericStream(streamInfo, res);
|
return handleGenericStream(streamInfo, res);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,13 +41,38 @@ export function getHeaders(service) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function pipe(from, to, done) {
|
export function pipe(from, to, done) {
|
||||||
from.on('error', done)
|
let bytesTransferred = 0;
|
||||||
.on('close', done);
|
let startTime = Date.now();
|
||||||
|
console.log(`[pipe] Starting pipe operation`);
|
||||||
|
|
||||||
to.on('error', done)
|
from.on('error', (error) => {
|
||||||
.on('close', done);
|
console.log(`[pipe] Source stream error after ${bytesTransferred} bytes: ${error}`);
|
||||||
|
done(error);
|
||||||
|
})
|
||||||
|
.on('close', () => {
|
||||||
|
const duration = Date.now() - startTime;
|
||||||
|
console.log(`[pipe] Source stream closed after ${bytesTransferred} bytes in ${duration}ms`);
|
||||||
|
done();
|
||||||
|
}) .on('data', (chunk) => {
|
||||||
|
bytesTransferred += chunk.length;
|
||||||
|
// Log every 8MB (chunk size) or first few chunks
|
||||||
|
if (bytesTransferred % (8 * 1024 * 1024) < chunk.length || bytesTransferred < 32 * 1024) {
|
||||||
|
console.log(`[pipe] Data transferred: ${bytesTransferred} bytes`);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
to.on('error', (error) => {
|
||||||
|
console.log(`[pipe] Destination stream error after ${bytesTransferred} bytes: ${error}`);
|
||||||
|
done(error);
|
||||||
|
})
|
||||||
|
.on('close', () => {
|
||||||
|
const duration = Date.now() - startTime;
|
||||||
|
console.log(`[pipe] Destination stream closed after ${bytesTransferred} bytes in ${duration}ms`);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
|
||||||
from.pipe(to);
|
from.pipe(to);
|
||||||
|
console.log(`[pipe] Pipe established between streams`);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function estimateTunnelLength(streamInfo, multiplier = 1.1) {
|
export async function estimateTunnelLength(streamInfo, multiplier = 1.1) {
|
||||||
|
Loading…
Reference in New Issue
Block a user