diff --git a/package.json b/package.json index 84a250e8..64b19358 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,7 @@ "express": "^4.18.1", "express-rate-limit": "^6.3.0", "ffmpeg-static": "^5.1.0", + "ffprobe-static": "^3.1.0", "hls-parser": "^0.10.7", "ipaddr.js": "2.1.0", "nanoid": "^4.0.2", diff --git a/src/modules/processing/matchActionDecider.js b/src/modules/processing/matchActionDecider.js index 7aa154bd..4116f161 100644 --- a/src/modules/processing/matchActionDecider.js +++ b/src/modules/processing/matchActionDecider.js @@ -18,6 +18,7 @@ export default function(r, host, userFormat, isAudioOnly, lang, isAudioMuted, di if (r.isPhoto) action = "photo"; else if (r.picker) action = "picker" + else if (r.isClip) action = "clip"; // TODO: user-specified clips else if (r.isGif && toGif) action = "gif"; else if (isAudioMuted) action = "muteVideo"; else if (isAudioOnly) action = "audio"; @@ -40,7 +41,11 @@ export default function(r, host, userFormat, isAudioOnly, lang, isAudioMuted, di case "photo": responseType = 1; break; - + + case "clip": + params = { type: "clip" } + break; + case "gif": params = { type: "gif" } break; diff --git a/src/modules/stream/cut.js b/src/modules/stream/cut.js new file mode 100644 index 00000000..09799b31 --- /dev/null +++ b/src/modules/stream/cut.js @@ -0,0 +1,272 @@ +import ffmpeg from 'ffmpeg-static'; +import { strict as assert } from 'node:assert'; +import { path as ffprobe } from 'ffprobe-static'; + +import { spawn } from './shared.js'; + +function mapFormat(format) { + if (format?.includes('webm')) + return 'webm'; + + if (format?.includes('mp4')) + return 'mp4'; + + throw new Error(`unknown format: ${format}`); +} + +const close_enough = (a, b) => Math.abs(a - b) < 0.02; +const last = arr => arr[arr.length - 1]; +const output = process => last(process.stdio); +const lerp = (a, b, α) => Number(a) + α * (b - a); + +function getKeyframesAround(url, { start, end }) { + start = Math.floor(start), end = Math.ceil(end); + assert(end > start); + + const ε = 30; + return new Promise((resolve, reject) => { + let process; + try { + process = spawn(ffprobe, [ + '-loglevel', '-8', + '-of', 'json', + '-skip_frame', 'nokey', + '-show_entries', [ + 'stream=duration,codec_name,bit_rate', + 'format=bit_rate,format_name', + 'frame=pts_time' + ].join(':'), + '-select_streams', 'v:0', + '-read_intervals', Object.values({ + startInterval: `${start}%+${ε}`, + endInterval: `${end - ε}%+${ε * 2}` + }).join(','), + url + ]); + + const bufs = []; + process.stdout.on('data', buf => bufs.push(buf)) + process.on('exit', (code) => { + if (code !== 0) { + return reject('non-zero/unexpected return value'); + } + + const data = JSON.parse(Buffer.concat(bufs)); + + if (!data?.streams?.[0]?.duration) { + return reject('could not get video duration'); + } + + if (Number(data?.streams[0]?.duration) < Number(end)) { + return reject( + 'cut out of bounds: ' + + `(duration ${data?.streams[0]?.duration} < end ${start})` + ) + } + + const stream = data.streams[0]; + const out = { + keyframes: data.frames.map(f => f.pts_time), + codec: stream?.codec_name, + format: mapFormat(data.format?.format_name), + bitrate: data?.format?.bit_rate + }; + + if (!out.codec || !out.bitrate || !out.format) { + return reject('could not get stream info'); + } + + return resolve(out); + }); + + process.on('error', reject); + process.stdout.on('error', reject); + } catch { + process.kill('SIGKILL'); + } + }) +} + +async function getBoundingKeyframes(url, { start, end }) { + const { keyframes, ...info } = await getKeyframesAround(url, { start, end }); + const afterStart = keyframes.find(k => Number(k) >= Number(start)), + beforeEnd = keyframes.findLast(k => Number(k) <= Number(end)); + + const afterStartNext = keyframes[keyframes.indexOf(afterStart) + 1]; + const beforeEndNext = keyframes[keyframes.indexOf(beforeEnd) + 1]; + + if (!afterStartNext || !beforeEndNext) + throw 'not enough keyframes' + + return { + afterStart, beforeEnd, + cleanCut: { + start: close_enough(afterStart, start), + end: close_enough(beforeEnd, end) + }, + shifted: { + /* we need to provide a timestamp that is little bit after the actual keyframe + * this is due to the fact that ffmpeg's lossless cut actually cuts on the + * "previous" keyframe, so if we supply the exact keyframe timestamp, it won't + * line up and the start/end transcodes will not sync up with it */ + afterStart: lerp(afterStart, afterStartNext, 0.2), + beforeEnd: lerp(beforeEnd, beforeEndNext, 0.2), + }, + ...info + }; +} + +function spawnStream(args, inputs = 0) { + return spawn( + ffmpeg, + [ '-loglevel', '-8', ...args, 'pipe:' + (inputs + 3) ], + { + stdio: [ + 'inherit', 'inherit', 'inherit', + ...Array(inputs).fill('pipe'), + 'pipe' + ] + } + ) +} + +function makeStream(...args) { + const process = spawnStream(...args); + output(process).process = process; + return output(process); +} + +function transcode(url, from, to, { bitrate, format, codec, filter }) { + return makeStream([ + '-ss', from, + '-i', url, + '-copyts', + '-to', to, + ...(filter ? ['-filter:v', filter] : []), + '-an', + '-b:v', (bitrate * 1.2).toFixed(6), + '-f', format, + '-movflags', 'frag_keyframe+empty_moov', + '-vcodec', codec + ]); +} + +function cut(type, url, from, to, info) { + let toggles; + + switch (type) { + case 'audio': + toggles = ['-vn']; + break; + case 'video': + toggles = [ '-c:v', 'copy', '-an' ]; + from = info.shifted.afterStart; + to = info.shifted.beforeEnd; + break; + default: + throw `invalid cut type: ${type}`; + } + + return makeStream([ + '-ss', from, + '-i', url, + '-copyts', + '-to', to, + '-f', info.format, + '-movflags', 'frag_keyframe+empty_moov', + ...toggles + ]); +} + +function mergeVideoOnly(streams, info) { + let fd = 3; + const input = streams.map( + () => `file 'pipe:${++fd}'` + ).join('\n') + + const args = [ + '-f', 'concat', + '-safe', '0', + '-protocol_whitelist', 'file,pipe', + '-i', 'pipe:3', + '-c', 'copy', + '-movflags', 'frag_keyframe+empty_moov', + '-f', info.format + ]; + + const process = spawnStream(args, fd - 2); + process.stdio[3].write(input); + process.stdio[3].end(); + + return process; +} + +function mergeAudioVideo(info) { + return spawnStream([ + '-i', 'pipe:3', + '-i', 'pipe:4', + '-movflags', 'frag_keyframe+empty_moov', + '-f', info.format, + '-c', 'copy', + ], 2); +} + +export async function makeCut(url, { start, end }, audio) { + let processes = []; + + try { + const { + afterStart, beforeEnd, + cleanCut, ...info + } = await getBoundingKeyframes(url, { start, end }); + + let streams = []; + if (!cleanCut.start) { + streams.push( + transcode(url, start, afterStart, { + filter: `select=not(eq(t\\,${afterStart}))`, + ...info + }) + ) + } + + streams.push(cut('video', url, afterStart, beforeEnd, info)) + + if (!cleanCut.end) { + streams.push( + transcode(url, beforeEnd, end, { + filter: 'select=not(eq(n\\,0))', + ...info + }) + ) + } + + processes.push(...streams.map(s => s.process)); + + if (audio) { + audio = cut('audio', audio, start, end, info); + processes.push(audio.process); + } + + const videoMerge = mergeVideoOnly(streams, info); + processes.push(videoMerge); + + for (let fd = 0; fd < streams.length; ++fd) { + streams[fd].pipe(videoMerge.stdio[4 + fd]); + } + + let finalMerge = videoMerge; + if (audio) { + finalMerge = mergeAudioVideo(info); + const [,,, audioIn, videoIn] = finalMerge.stdio; + + output(videoMerge).pipe(videoIn); + audio.pipe(audioIn); + } + + return output(finalMerge); + } catch { + for (const process of processes) + process.kill('SIGKILL'); + } +} \ No newline at end of file diff --git a/src/modules/stream/manage.js b/src/modules/stream/manage.js index 459e5a6b..1339f259 100644 --- a/src/modules/stream/manage.js +++ b/src/modules/stream/manage.js @@ -34,6 +34,10 @@ export function createStream(obj) { isAudioOnly: !!obj.isAudioOnly, audioFormat: obj.audioFormat, time: obj.time ? obj.time : false, + clip: obj.clip ? { + start: parseFloat(obj.clip.start).toFixed(6), + end: parseFloat(obj.clip.end).toFixed(6), + } : false, copy: !!obj.copy, mute: !!obj.mute, metadata: obj.fileMetadata ? obj.fileMetadata : false diff --git a/src/modules/stream/shared.js b/src/modules/stream/shared.js new file mode 100644 index 00000000..3e911eee --- /dev/null +++ b/src/modules/stream/shared.js @@ -0,0 +1,38 @@ +import { spawn as _node_spawn } from 'child_process' + +export function killProcess(p) { + // ask the process to terminate itself gracefully + p?.kill('SIGTERM'); + setTimeout(() => { + if (p?.exitCode === null) + // brutally murder the process if it didn't quit + p?.kill('SIGKILL'); + }, 5000); +} + +export function pipe(from, to, done) { + from.on('error', done) + .on('close', done); + + to.on('error', done) + .on('close', done); + + from.pipe(to); +} + +export function wrapCommand(command, args = []) { + if (process.env.PROCESSING_PRIORITY && process.platform !== "win32") { + return ['nice', ['-n', process.env.PROCESSING_PRIORITY, command, ...args]] + } + + return [command, args] +} + +export function spawn(command, args, opts) { + opts = { + ...opts, + windowsHide: true + }; + + return _node_spawn(...wrapCommand(command, args), opts); +} \ No newline at end of file diff --git a/src/modules/stream/stream.js b/src/modules/stream/stream.js index f254dacc..babf5616 100644 --- a/src/modules/stream/stream.js +++ b/src/modules/stream/stream.js @@ -1,4 +1,4 @@ -import { streamAudioOnly, streamDefault, streamLiveRender, streamVideoOnly, convertToGif } from "./types.js"; +import { streamAudioOnly, streamDefault, streamLiveRender, streamVideoOnly, streamClip, convertToGif } from "./types.js"; export default async function(res, streamInfo) { try { @@ -17,6 +17,9 @@ export default async function(res, streamInfo) { case "mute": streamVideoOnly(streamInfo, res); break; + case "clip": + streamClip(streamInfo, res); + break; default: await streamDefault(streamInfo, res); break; diff --git a/src/modules/stream/types.js b/src/modules/stream/types.js index cdfb4a05..f29c14a9 100644 --- a/src/modules/stream/types.js +++ b/src/modules/stream/types.js @@ -1,10 +1,12 @@ -import { spawn } from "child_process"; -import ffmpeg from "ffmpeg-static"; -import { ffmpegArgs, genericUserAgent } from "../config.js"; -import { metadataManager } from "../sub/utils.js"; -import { request } from "undici"; -import { create as contentDisposition } from "content-disposition-header"; +import { request } from "undici" +import ffmpeg from "ffmpeg-static" import { AbortController } from "abort-controller" +import { create as contentDisposition } from "content-disposition-header" + +import { makeCut } from "./cut.js" +import { metadataManager } from "../sub/utils.js" +import { spawn, pipe, killProcess } from "./shared.js" +import { ffmpegArgs, genericUserAgent } from "../config.js" function closeRequest(controller) { try { controller.abort() } catch {} @@ -15,33 +17,6 @@ function closeResponse(res) { return res.destroy(); } -function killProcess(p) { - // ask the process to terminate itself gracefully - p?.kill('SIGTERM'); - setTimeout(() => { - if (p?.exitCode === null) - // brutally murder the process if it didn't quit - p?.kill('SIGKILL'); - }, 5000); -} - -function pipe(from, to, done) { - from.on('error', done) - .on('close', done); - - to.on('error', done) - .on('close', done); - - from.pipe(to); -} - -function getCommand(args) { - if (process.env.PROCESSING_PRIORITY && process.platform !== "win32") { - return ['nice', ['-n', process.env.PROCESSING_PRIORITY, ffmpeg, ...args]] - } - return [ffmpeg, args] -} - export async function streamDefault(streamInfo, res) { const abortController = new AbortController(); const shutdown = () => (closeRequest(abortController), closeResponse(res)); @@ -98,8 +73,7 @@ export async function streamLiveRender(streamInfo, res) { } args.push('-f', format, 'pipe:4'); - process = spawn(...getCommand(args), { - windowsHide: true, + process = spawn(ffmpeg, args, { stdio: [ 'inherit', 'inherit', 'inherit', 'pipe', 'pipe' @@ -140,18 +114,18 @@ export function streamAudioOnly(streamInfo, res) { ) if (streamInfo.metadata) { - args = args.concat(metadataManager(streamInfo.metadata)) + args.push(...metadataManager(streamInfo.metadata)) } - let arg = streamInfo.copy ? ffmpegArgs["copy"] : ffmpegArgs["audio"]; - args = args.concat(arg); + + args.push(...ffmpegArgs[streamInfo.copy ? "copy" : "audio"]); if (ffmpegArgs[streamInfo.audioFormat]) { - args = args.concat(ffmpegArgs[streamInfo.audioFormat]) + args.push(...ffmpegArgs[streamInfo.audioFormat]); } + args.push('-f', streamInfo.audioFormat === "m4a" ? "ipod" : streamInfo.audioFormat, 'pipe:3'); - process = spawn(...getCommand(args), { - windowsHide: true, + process = spawn(ffmpeg, args, { stdio: [ 'inherit', 'inherit', 'inherit', 'pipe' @@ -198,8 +172,7 @@ export function streamVideoOnly(streamInfo, res) { } args.push('-f', format, 'pipe:3'); - process = spawn(...getCommand(args), { - windowsHide: true, + process = spawn(ffmpeg, args, { stdio: [ 'inherit', 'inherit', 'inherit', 'pipe' @@ -235,8 +208,7 @@ export function convertToGif(streamInfo, res) { args = args.concat(ffmpegArgs["gif"]); args.push('-f', "gif", 'pipe:3'); - process = spawn(...getCommand(args), { - windowsHide: true, + process = spawn(ffmpeg, args, { stdio: [ 'inherit', 'inherit', 'inherit', 'pipe' @@ -256,3 +228,30 @@ export function convertToGif(streamInfo, res) { shutdown(); } } + +export async function streamClip(streamInfo, res) { + if (typeof streamInfo.urls === 'string') + streamInfo.urls = [streamInfo.urls]; + + const shutdown = () => (killProcess(process), closeResponse(res)); + + try { + const [ video, audio ] = streamInfo.urls; + const { start, end } = streamInfo.clip; + if (!start || !end || start === 'NaN' || end === 'NaN') + return shutdown(); + + const stream = await makeCut(video, { start, end }, audio); + process = stream.process; + + res.setHeader('Connection', 'keep-alive'); + res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); + + pipe(stream, res, shutdown); + + process.on('close', shutdown); + res.on('finish', shutdown); + } catch { + shutdown(); + } +} \ No newline at end of file