mirror of
https://github.com/imputnet/cobalt.git
synced 2025-06-30 18:38:29 +00:00
roll back
This commit is contained in:
parent
7369bc3687
commit
92b8bd82ee
@ -13,13 +13,7 @@ async function* readChunks(streamInfo, size) {
|
|||||||
throw new Error("controller aborted");
|
throw new Error("controller aborted");
|
||||||
}
|
}
|
||||||
|
|
||||||
let retries = 3;
|
const chunk = await request(streamInfo.url, {
|
||||||
let chunk;
|
|
||||||
|
|
||||||
// Retry mechanism for failed chunks
|
|
||||||
while (retries > 0) {
|
|
||||||
try {
|
|
||||||
chunk = await request(streamInfo.url, {
|
|
||||||
headers: {
|
headers: {
|
||||||
...getHeaders('youtube'),
|
...getHeaders('youtube'),
|
||||||
Range: `bytes=${read}-${read + CHUNK_SIZE}`
|
Range: `bytes=${read}-${read + CHUNK_SIZE}`
|
||||||
@ -29,39 +23,21 @@ async function* readChunks(streamInfo, size) {
|
|||||||
maxRedirections: 4
|
maxRedirections: 4
|
||||||
});
|
});
|
||||||
|
|
||||||
// Check for valid response before processing
|
if (chunk.statusCode === 403 && chunksSinceTransplant >= 3 && streamInfo.transplant) {
|
||||||
if (chunk.statusCode === 206 || chunk.statusCode === 200) {
|
|
||||||
break; // Success, exit retry loop
|
|
||||||
} else if (chunk.statusCode === 403 && chunksSinceTransplant >= 3 && streamInfo.transplant) {
|
|
||||||
chunksSinceTransplant = 0;
|
chunksSinceTransplant = 0;
|
||||||
try {
|
try {
|
||||||
await streamInfo.transplant(streamInfo.dispatcher);
|
await streamInfo.transplant(streamInfo.dispatcher);
|
||||||
continue; // Retry with transplanted connection
|
continue;
|
||||||
} catch {}
|
} catch {}
|
||||||
}
|
}
|
||||||
|
|
||||||
// For other status codes, retry
|
|
||||||
throw new Error(`HTTP ${chunk.statusCode} response`);
|
|
||||||
|
|
||||||
} catch (error) {
|
|
||||||
retries--;
|
|
||||||
if (retries === 0) {
|
|
||||||
throw new Error(`Failed to fetch chunk after 3 attempts: ${error.message}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait before retry (exponential backoff)
|
|
||||||
await new Promise(resolve => setTimeout(resolve, (4 - retries) * 1000));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
chunksSinceTransplant++;
|
chunksSinceTransplant++;
|
||||||
|
|
||||||
const expected = min(CHUNK_SIZE, size - read);
|
const expected = min(CHUNK_SIZE, size - read);
|
||||||
const received = BigInt(chunk.headers['content-length']);
|
const received = BigInt(chunk.headers['content-length']);
|
||||||
|
|
||||||
// Validate we received some data
|
if (received < expected / 2n) {
|
||||||
if (received === 0n && expected > 0n) {
|
closeRequest(streamInfo.controller);
|
||||||
throw new Error("Received empty chunk when data was expected");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for await (const data of chunk.body) {
|
for await (const data of chunk.body) {
|
||||||
@ -74,13 +50,7 @@ async function* readChunks(streamInfo, size) {
|
|||||||
|
|
||||||
async function handleYoutubeStream(streamInfo, res) {
|
async function handleYoutubeStream(streamInfo, res) {
|
||||||
const { signal } = streamInfo.controller;
|
const { signal } = streamInfo.controller;
|
||||||
const cleanup = () => {
|
const cleanup = () => (res.end(), closeRequest(streamInfo.controller));
|
||||||
// Only end response if headers haven't been sent to prevent 0-byte files
|
|
||||||
if (!res.headersSent) {
|
|
||||||
res.end();
|
|
||||||
}
|
|
||||||
closeRequest(streamInfo.controller);
|
|
||||||
};
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
let req, attempts = 3;
|
let req, attempts = 3;
|
||||||
@ -108,21 +78,6 @@ async function handleYoutubeStream(streamInfo, res) {
|
|||||||
return cleanup();
|
return cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if client sent a Range header for partial content request
|
|
||||||
const rangeHeader = streamInfo.headers?.get('range');
|
|
||||||
|
|
||||||
if (rangeHeader && req.headers.get('accept-ranges') === 'bytes') {
|
|
||||||
// Handle range request - delegate to handleGenericStream for proper range handling
|
|
||||||
return await handleGenericStream(streamInfo, res);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set headers before starting stream to ensure they're sent
|
|
||||||
for (const headerName of ['content-type', 'content-length', 'accept-ranges']) {
|
|
||||||
const headerValue = req.headers.get(headerName);
|
|
||||||
if (headerValue) res.setHeader(headerName, headerValue);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Full file request - use chunked download approach
|
|
||||||
const generator = readChunks(streamInfo, size);
|
const generator = readChunks(streamInfo, size);
|
||||||
|
|
||||||
const abortGenerator = () => {
|
const abortGenerator = () => {
|
||||||
@ -134,17 +89,13 @@ async function handleYoutubeStream(streamInfo, res) {
|
|||||||
|
|
||||||
const stream = Readable.from(generator);
|
const stream = Readable.from(generator);
|
||||||
|
|
||||||
// Add error handling to prevent 0-byte files
|
for (const headerName of ['content-type', 'content-length']) {
|
||||||
stream.on('error', (error) => {
|
const headerValue = req.headers.get(headerName);
|
||||||
console.error('YouTube stream error:', error);
|
if (headerValue) res.setHeader(headerName, headerValue);
|
||||||
if (!res.headersSent) {
|
|
||||||
res.status(500).end();
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
pipe(stream, res, cleanup);
|
pipe(stream, res, cleanup);
|
||||||
} catch (error) {
|
} catch {
|
||||||
console.error('YouTube stream handling error:', error);
|
|
||||||
cleanup();
|
cleanup();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -41,43 +41,11 @@ export function getHeaders(service) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function pipe(from, to, done) {
|
export function pipe(from, to, done) {
|
||||||
let finished = false;
|
from.on('error', done)
|
||||||
let bytesWritten = 0;
|
.on('close', done);
|
||||||
|
|
||||||
const cleanup = (error) => {
|
to.on('error', done)
|
||||||
if (finished) return;
|
.on('close', done);
|
||||||
finished = true;
|
|
||||||
|
|
||||||
// Log if we got 0 bytes to help debug production issues
|
|
||||||
if (bytesWritten === 0 && !error) {
|
|
||||||
console.warn('Stream completed with 0 bytes written - potential issue');
|
|
||||||
}
|
|
||||||
|
|
||||||
done(error);
|
|
||||||
};
|
|
||||||
|
|
||||||
from.on('error', (error) => {
|
|
||||||
console.error('Stream source error:', error);
|
|
||||||
cleanup(error);
|
|
||||||
})
|
|
||||||
.on('close', () => {
|
|
||||||
if (!finished) {
|
|
||||||
cleanup();
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.on('data', (chunk) => {
|
|
||||||
bytesWritten += chunk.length;
|
|
||||||
});
|
|
||||||
|
|
||||||
to.on('error', (error) => {
|
|
||||||
console.error('Stream destination error:', error);
|
|
||||||
cleanup(error);
|
|
||||||
})
|
|
||||||
.on('close', () => {
|
|
||||||
if (!finished) {
|
|
||||||
cleanup();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
from.pipe(to);
|
from.pipe(to);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user