From aa376d76f6649982d360f0b173d073a9d731dbc5 Mon Sep 17 00:00:00 2001 From: wukko Date: Tue, 24 Jun 2025 19:55:50 +0600 Subject: [PATCH] api/stream/types: huge refactor & simplification of code - created render() which handles ffmpeg & piping stuff - merged remux() and merge() into one function - simplified and cleaned up arguments - removed headers since they're handled by internal streams now - removed outdated arguments --- api/src/stream/stream.js | 2 - api/src/stream/types.js | 326 ++++++++++++--------------------------- 2 files changed, 98 insertions(+), 230 deletions(-) diff --git a/api/src/stream/stream.js b/api/src/stream/stream.js index e714f38e..3050c08b 100644 --- a/api/src/stream/stream.js +++ b/api/src/stream/stream.js @@ -13,8 +13,6 @@ export default async function(res, streamInfo) { return await internalStream(streamInfo.data, res); case "merge": - return await stream.merge(streamInfo, res); - case "remux": case "mute": return await stream.remux(streamInfo, res); diff --git a/api/src/stream/types.js b/api/src/stream/types.js index 7b5a5774..746557bc 100644 --- a/api/src/stream/types.js +++ b/api/src/stream/types.js @@ -22,7 +22,7 @@ const metadataTags = [ ]; const convertMetadataToFFmpeg = (metadata) => { - let args = []; + const args = []; for (const [ name, value ] of Object.entries(metadata)) { if (metadataTags.includes(name)) { @@ -39,12 +39,6 @@ const convertMetadataToFFmpeg = (metadata) => { return args; } -const toRawHeaders = (headers) => { - return Object.entries(headers) - .map(([key, value]) => `${key}: ${value}\r\n`) - .join(''); -} - const killProcess = (p) => { p?.kill('SIGTERM'); // ask the process to terminate itself gracefully @@ -99,64 +93,27 @@ const proxy = async (streamInfo, res) => { } } -const merge = async (streamInfo, res) => { +const render = async (res, streamInfo, ffargs, multiplier) => { let process; + const urls = Array.isArray(streamInfo.urls) ? streamInfo.urls : [streamInfo.urls]; const shutdown = () => ( killProcess(process), closeResponse(res), - streamInfo.urls.map(destroyInternalStream) + urls.map(destroyInternalStream) ); try { - if (streamInfo.urls.length !== 2) return shutdown(); + // if the streamInfo.urls is an array but doesn't have 2 urls, + // then something went wrong + if (Array.isArray(streamInfo.urls) && streamInfo.urls.length !== 2) { + return shutdown(); + } - const format = streamInfo.filename.split('.').pop(); - - let args = [ + const args = [ '-loglevel', '-8', - '-i', streamInfo.urls[0], - '-i', streamInfo.urls[1], + ...ffargs, ]; - if (streamInfo.subtitles) { - args.push( - '-i', streamInfo.subtitles, - '-map', '2:s', - '-c:s', format === 'mp4' ? 'mov_text' : 'webvtt', - ); - }; - - args.push( - '-map', '0:v', - '-map', '1:a', - '-c:v', 'copy', - '-c:a', 'copy', - ); - - if (format === "mp4") { - args.push( - '-movflags', - 'faststart+frag_keyframe+empty_moov', - ) - } - - if (hlsExceptions.includes(streamInfo.service) && streamInfo.isHLS) { - if (streamInfo.service === "youtube" && format === "webm") { - args.push('-c:a', 'libopus'); - } else { - args.push('-c:a', 'aac', '-bsf:a', 'aac_adtstoasc'); - } - } - - if (streamInfo.metadata) { - args = args.concat(convertMetadataToFFmpeg(streamInfo.metadata)); - } - - args.push( - '-f', format === "mkv" ? "matroska" : format, - 'pipe:3' - ); - process = spawn(...getCommand(args), { windowsHide: true, stdio: [ @@ -169,7 +126,7 @@ const merge = async (streamInfo, res) => { res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); - res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo)); + res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo, multiplier)); pipe(muxOutput, res, shutdown); @@ -181,201 +138,114 @@ const merge = async (streamInfo, res) => { } const remux = async (streamInfo, res) => { - let process; - const shutdown = () => ( - killProcess(process), - closeResponse(res), - destroyInternalStream(streamInfo.urls) - ); + const format = streamInfo.filename.split('.').pop(); + const urls = Array.isArray(streamInfo.urls) ? streamInfo.urls : [streamInfo.urls]; + const args = [...urls.flatMap(url => ['-i', url])]; - try { - const format = streamInfo.filename.split('.').pop(); - - let args = [ - '-loglevel', '-8', - '-headers', toRawHeaders(getHeaders(streamInfo.service)), - ] - - if (streamInfo.service === "twitter") { - args.push('-seekable', '0') - } - - args.push('-i', streamInfo.urls); - - if (streamInfo.subtitles) { - args.push( - '-i', streamInfo.subtitles, - '-c:s', format === 'mp4' ? 'mov_text' : 'webvtt', - ); - }; + if (streamInfo.subtitles) { + args.push( + '-i', streamInfo.subtitles, + '-map', `${urls.length}:s`, + '-c:s', format === 'mp4' ? 'mov_text' : 'webvtt', + ); + } + if (urls.length === 2) { + args.push( + '-map', '0:v', + '-map', '1:a', + '-c:v', 'copy', + '-c:a', 'copy' + ); + } else { args.push('-c:v', 'copy'); - if (streamInfo.type === "mute") { + if (streamInfo.type === 'mute') { args.push('-an'); + } else { + args.push('-c:a', 'copy'); } - - if (hlsExceptions.includes(streamInfo.service)) { - if (streamInfo.type !== "mute") { - args.push('-c:a', 'aac'); - } - args.push('-bsf:a', 'aac_adtstoasc'); - } - - if (format === "mp4") { - args.push('-movflags', 'faststart+frag_keyframe+empty_moov') - } - - if (streamInfo.metadata) { - args = args.concat(convertMetadataToFFmpeg(streamInfo.metadata)); - } - - args.push('-f', format, 'pipe:3'); - - process = spawn(...getCommand(args), { - windowsHide: true, - stdio: [ - 'inherit', 'inherit', 'inherit', - 'pipe' - ], - }); - - const [,,, muxOutput] = process.stdio; - - res.setHeader('Connection', 'keep-alive'); - res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); - res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo)); - - pipe(muxOutput, res, shutdown); - - process.on('close', shutdown); - res.on('finish', shutdown); - } catch { - shutdown(); } + + if (format === 'mp4') { + args.push('-movflags', 'faststart+frag_keyframe+empty_moov'); + } + + if (streamInfo.type !== 'mute' && streamInfo.isHLS && hlsExceptions.includes(streamInfo.service)) { + if (streamInfo.service === 'youtube' && format === 'webm') { + args.push('-c:a', 'libopus'); + } else { + args.push('-c:a', 'aac', '-bsf:a', 'aac_adtstoasc'); + } + } + + if (streamInfo.metadata) { + args.push(...convertMetadataToFFmpeg(streamInfo.metadata)); + } + + args.push('-f', format === 'mkv' ? 'matroska' : format, 'pipe:3'); + + await render(res, streamInfo, args); } const convertAudio = async (streamInfo, res) => { - let process; - const shutdown = () => ( - killProcess(process), - closeResponse(res), - destroyInternalStream(streamInfo.urls) + const args = [ + '-i', streamInfo.urls, + '-vn', + ...(streamInfo.audioCopy ? ['-c:a', 'copy'] : ['-b:a', `${streamInfo.audioBitrate}k`]), + ]; + + if (streamInfo.audioFormat === 'mp3' && streamInfo.audioBitrate === '8') { + args.push('-ar', '12000'); + } + + if (streamInfo.audioFormat === 'opus') { + args.push('-vbr', 'off'); + } + + if (streamInfo.audioFormat === 'mp4a') { + args.push('-movflags', 'frag_keyframe+empty_moov'); + } + + if (streamInfo.metadata) { + args.push(...convertMetadataToFFmpeg(streamInfo.metadata)); + } + + args.push( + '-f', + streamInfo.audioFormat === 'm4a' ? 'ipod' : streamInfo.audioFormat, + 'pipe:3' ); - try { - let args = [ - '-loglevel', '-8', - '-headers', toRawHeaders(getHeaders(streamInfo.service)), - ] - - if (streamInfo.service === "twitter") { - args.push('-seekable', '0'); - } - - args.push( - '-i', streamInfo.urls, - '-vn' - ) - - if (streamInfo.audioCopy) { - args.push("-c:a", "copy") - } else { - args.push("-b:a", `${streamInfo.audioBitrate}k`) - } - - if (streamInfo.audioFormat === "mp3" && streamInfo.audioBitrate === "8") { - args.push("-ar", "12000"); - } - - if (streamInfo.audioFormat === "opus") { - args.push("-vbr", "off"); - } - - if (streamInfo.audioFormat === "mp4a") { - args.push("-movflags", "frag_keyframe+empty_moov"); - } - - if (streamInfo.metadata) { - args = args.concat(convertMetadataToFFmpeg(streamInfo.metadata)) - } - - args.push('-f', streamInfo.audioFormat === "m4a" ? "ipod" : streamInfo.audioFormat, 'pipe:3'); - - process = spawn(...getCommand(args), { - windowsHide: true, - stdio: [ - 'inherit', 'inherit', 'inherit', - 'pipe' - ], - }); - - const [,,, muxOutput] = process.stdio; - - res.setHeader('Connection', 'keep-alive'); - res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); - res.setHeader( - 'Estimated-Content-Length', - await estimateTunnelLength( - streamInfo, - estimateAudioMultiplier(streamInfo) * 1.1 - ) - ); - - pipe(muxOutput, res, shutdown); - res.on('finish', shutdown); - } catch { - shutdown(); - } + await render( + res, + streamInfo, + args, + estimateAudioMultiplier(streamInfo) * 1.1, + ); } const convertGif = async (streamInfo, res) => { - let process; - const shutdown = () => (killProcess(process), closeResponse(res)); + const args = [ + '-i', streamInfo.urls, - try { - let args = [ - '-loglevel', '-8' - ] + '-vf', + 'scale=-1:-1:flags=lanczos,split[s0][s1];[s0]palettegen[p];[s1][p]paletteuse', + '-loop', '0', - if (streamInfo.service === "twitter") { - args.push('-seekable', '0') - } + '-f', "gif", 'pipe:3', + ]; - args.push('-i', streamInfo.urls); - args.push( - '-vf', - 'scale=-1:-1:flags=lanczos,split[s0][s1];[s0]palettegen[p];[s1][p]paletteuse', - '-loop', '0' - ); - args.push('-f', "gif", 'pipe:3'); - - process = spawn(...getCommand(args), { - windowsHide: true, - stdio: [ - 'inherit', 'inherit', 'inherit', - 'pipe' - ], - }); - - const [,,, muxOutput] = process.stdio; - - res.setHeader('Connection', 'keep-alive'); - res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); - res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo, 60)); - - pipe(muxOutput, res, shutdown); - - process.on('close', shutdown); - res.on('finish', shutdown); - } catch { - shutdown(); - } + await render( + res, + streamInfo, + args, + 60, + ); } export default { proxy, - merge, remux, convertAudio, convertGif,