1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-12-19 22:48:53 +00:00

Merge remote-tracking branch 'origin/master' into yt-live-from-start-range

This commit is contained in:
Elyse
2023-05-07 00:33:18 -06:00
74 changed files with 5099 additions and 1935 deletions

View File

@@ -21,7 +21,7 @@ import tokenize
import traceback
import unicodedata
import urllib.request
from string import ascii_letters
from string import Formatter, ascii_letters
from .cache import Cache
from .compat import compat_os_name, compat_shlex_quote
@@ -1161,7 +1161,7 @@ class YoutubeDL:
}
MATH_FIELD_RE = rf'(?:{FIELD_RE}|-?{NUMBER_RE})'
MATH_OPERATORS_RE = r'(?:%s)' % '|'.join(map(re.escape, MATH_FUNCTIONS.keys()))
INTERNAL_FORMAT_RE = re.compile(rf'''(?x)
INTERNAL_FORMAT_RE = re.compile(rf'''(?xs)
(?P<negate>-)?
(?P<fields>{FIELD_RE})
(?P<maths>(?:{MATH_OPERATORS_RE}{MATH_FIELD_RE})*)
@@ -1242,6 +1242,14 @@ class YoutubeDL:
return list(obj)
return repr(obj)
class _ReplacementFormatter(Formatter):
def get_field(self, field_name, args, kwargs):
if field_name.isdigit():
return args[0], -1
raise ValueError('Unsupported field')
replacement_formatter = _ReplacementFormatter()
def create_key(outer_mobj):
if not outer_mobj.group('has_key'):
return outer_mobj.group(0)
@@ -1263,7 +1271,13 @@ class YoutubeDL:
if fmt == 's' and value is not None and key in field_size_compat_map.keys():
fmt = f'0{field_size_compat_map[key]:d}d'
value = default if value is None else value if replacement is None else replacement
if value is None:
value = default
elif replacement is not None:
try:
value = replacement_formatter.format(replacement, value)
except ValueError:
value = na
flags = outer_mobj.group('conversion') or ''
str_fmt = f'{fmt[:-1]}s'
@@ -1668,7 +1682,7 @@ class YoutubeDL:
self.add_extra_info(info_copy, extra_info)
info_copy, _ = self.pre_process(info_copy)
self._fill_common_fields(info_copy, False)
self.__forced_printings(info_copy, self.prepare_filename(info_copy), incomplete=True)
self.__forced_printings(info_copy)
self._raise_pending_errors(info_copy)
if self.params.get('force_write_download_archive', False):
self.record_download_archive(info_copy)
@@ -1937,7 +1951,7 @@ class YoutubeDL:
'!=': operator.ne,
}
operator_rex = re.compile(r'''(?x)\s*
(?P<key>width|height|tbr|abr|vbr|asr|filesize|filesize_approx|fps)\s*
(?P<key>[\w.-]+)\s*
(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
(?P<value>[0-9.]+(?:[kKmMgGtTpPeEzZyY]i?[Bb]?)?)\s*
''' % '|'.join(map(re.escape, OPERATORS.keys())))
@@ -2710,7 +2724,7 @@ class YoutubeDL:
self.list_formats(info_dict)
if list_only:
# Without this printing, -F --print-json will not work
self.__forced_printings(info_dict, self.prepare_filename(info_dict), incomplete=True)
self.__forced_printings(info_dict)
return info_dict
format_selector = self.format_selector
@@ -2870,6 +2884,12 @@ class YoutubeDL:
if info_dict is None:
return
info_copy = info_dict.copy()
info_copy.setdefault('filename', self.prepare_filename(info_dict))
if info_dict.get('requested_formats') is not None:
# For RTMP URLs, also include the playpath
info_copy['urls'] = '\n'.join(f['url'] + f.get('play_path', '') for f in info_dict['requested_formats'])
elif info_dict.get('url'):
info_copy['urls'] = info_dict['url'] + info_dict.get('play_path', '')
info_copy['formats_table'] = self.render_formats_table(info_dict)
info_copy['thumbnails_table'] = self.render_thumbnails_table(info_dict)
info_copy['subtitles_table'] = self.render_subtitles_table(info_dict.get('id'), info_dict.get('subtitles'))
@@ -2895,46 +2915,36 @@ class YoutubeDL:
tmpl = format_tmpl(tmpl)
self.to_screen(f'[info] Writing {tmpl!r} to: {filename}')
if self._ensure_dir_exists(filename):
with open(filename, 'a', encoding='utf-8') as f:
f.write(self.evaluate_outtmpl(tmpl, info_copy) + '\n')
with open(filename, 'a', encoding='utf-8', newline='') as f:
f.write(self.evaluate_outtmpl(tmpl, info_copy) + os.linesep)
def __forced_printings(self, info_dict, filename, incomplete):
def print_mandatory(field, actual_field=None):
if actual_field is None:
actual_field = field
if (self.params.get('force%s' % field, False)
and (not incomplete or info_dict.get(actual_field) is not None)):
self.to_stdout(info_dict[actual_field])
def print_optional(field):
if (self.params.get('force%s' % field, False)
and info_dict.get(field) is not None):
self.to_stdout(info_dict[field])
info_dict = info_dict.copy()
if filename is not None:
info_dict['filename'] = filename
if info_dict.get('requested_formats') is not None:
# For RTMP URLs, also include the playpath
info_dict['urls'] = '\n'.join(f['url'] + f.get('play_path', '') for f in info_dict['requested_formats'])
elif info_dict.get('url'):
info_dict['urls'] = info_dict['url'] + info_dict.get('play_path', '')
return info_copy
def __forced_printings(self, info_dict, filename=None, incomplete=True):
if (self.params.get('forcejson')
or self.params['forceprint'].get('video')
or self.params['print_to_file'].get('video')):
self.post_extract(info_dict)
self._forceprint('video', info_dict)
if filename:
info_dict['filename'] = filename
info_copy = self._forceprint('video', info_dict)
print_mandatory('title')
print_mandatory('id')
print_mandatory('url', 'urls')
print_optional('thumbnail')
print_optional('description')
print_optional('filename')
if self.params.get('forceduration') and info_dict.get('duration') is not None:
self.to_stdout(formatSeconds(info_dict['duration']))
print_mandatory('format')
def print_field(field, actual_field=None, optional=False):
if actual_field is None:
actual_field = field
if self.params.get(f'force{field}') and (
info_copy.get(field) is not None or (not optional and not incomplete)):
self.to_stdout(info_copy[actual_field])
print_field('title')
print_field('id')
print_field('url', 'urls')
print_field('thumbnail', optional=True)
print_field('description', optional=True)
print_field('filename', optional=True)
if self.params.get('forceduration') and info_copy.get('duration') is not None:
self.to_stdout(formatSeconds(info_copy['duration']))
print_field('format')
if self.params.get('forcejson'):
self.to_stdout(json.dumps(self.sanitize_info(info_dict)))
@@ -3316,7 +3326,7 @@ class YoutubeDL:
or info_dict.get('is_live') and self.params.get('hls_use_mpegts') is None,
'Possible MPEG-TS in MP4 container or malformed AAC timestamps',
FFmpegFixupM3u8PP)
ffmpeg_fixup(info_dict.get('is_live') and downloader == 'DashSegmentsFD',
ffmpeg_fixup(info_dict.get('is_live') and downloader == 'dashsegments',
'Possible duplicate MOOV atoms', FFmpegFixupDuplicateMoovPP)
ffmpeg_fixup(downloader == 'web_socket_fragment', 'Malformed timestamps detected', FFmpegFixupTimestampPP)
@@ -3482,7 +3492,7 @@ class YoutubeDL:
*files_to_delete, info=infodict, msg='Deleting original file %s (pass -k to keep)')
return infodict
def run_all_pps(self, key, info, *, additional_pps=None):
def run_all_pps(self, key, info, *, additional_pps=None, fatal=True):
if key != 'video':
self._forceprint(key, info)
for pp in (additional_pps or []) + self._pps[key]: