1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-10-31 14:45:14 +00:00

[cleanup] Add more ruff rules (#10149)

Authored by: seproDev

Reviewed-by: bashonly <88596187+bashonly@users.noreply.github.com>
Reviewed-by: Simon Sawicki <contact@grub4k.xyz>
This commit is contained in:
sepro
2024-06-12 01:09:58 +02:00
committed by GitHub
parent db50f19d76
commit add96eb9f8
915 changed files with 7027 additions and 7246 deletions

View File

@@ -109,7 +109,6 @@ from .utils import (
determine_protocol,
encode_compat_str,
encodeFilename,
error_to_compat_str,
escapeHTML,
expand_path,
extract_basic_auth,
@@ -583,7 +582,7 @@ class YoutubeDL:
'player_url', 'protocol', 'fragment_base_url', 'fragments', 'is_from_start', 'is_dash_periods', 'request_data',
'preference', 'language', 'language_preference', 'quality', 'source_preference', 'cookies',
'http_headers', 'stretched_ratio', 'no_resume', 'has_drm', 'extra_param_to_segment_url', 'hls_aes', 'downloader_options',
'page_url', 'app', 'play_path', 'tc_url', 'flash_version', 'rtmp_live', 'rtmp_conn', 'rtmp_protocol', 'rtmp_real_time'
'page_url', 'app', 'play_path', 'tc_url', 'flash_version', 'rtmp_live', 'rtmp_conn', 'rtmp_protocol', 'rtmp_real_time',
}
_deprecated_multivalue_fields = {
'album_artist': 'album_artists',
@@ -594,7 +593,7 @@ class YoutubeDL:
}
_format_selection_exts = {
'audio': set(MEDIA_EXTENSIONS.common_audio),
'video': set(MEDIA_EXTENSIONS.common_video + ('3gp', )),
'video': {*MEDIA_EXTENSIONS.common_video, '3gp'},
'storyboards': set(MEDIA_EXTENSIONS.storyboards),
}
@@ -628,7 +627,7 @@ class YoutubeDL:
error=sys.stderr,
screen=sys.stderr if self.params.get('quiet') else stdout,
console=None if compat_os_name == 'nt' else next(
filter(supports_terminal_sequences, (sys.stderr, sys.stdout)), None)
filter(supports_terminal_sequences, (sys.stderr, sys.stdout)), None),
)
try:
@@ -679,9 +678,9 @@ class YoutubeDL:
width_args = [] if width is None else ['-w', str(width)]
sp_kwargs = {'stdin': subprocess.PIPE, 'stdout': slave, 'stderr': self._out_files.error}
try:
self._output_process = Popen(['bidiv'] + width_args, **sp_kwargs)
self._output_process = Popen(['bidiv', *width_args], **sp_kwargs)
except OSError:
self._output_process = Popen(['fribidi', '-c', 'UTF-8'] + width_args, **sp_kwargs)
self._output_process = Popen(['fribidi', '-c', 'UTF-8', *width_args], **sp_kwargs)
self._output_channel = os.fdopen(master, 'rb')
except OSError as ose:
if ose.errno == errno.ENOENT:
@@ -822,8 +821,7 @@ class YoutubeDL:
)
self.report_warning(
'Long argument string detected. '
'Use -- to separate parameters and URLs, like this:\n%s' %
shell_quote(correct_argv))
f'Use -- to separate parameters and URLs, like this:\n{shell_quote(correct_argv)}')
def add_info_extractor(self, ie):
"""Add an InfoExtractor object to the end of the list."""
@@ -922,7 +920,7 @@ class YoutubeDL:
if (self.params.get('quiet') if quiet is None else quiet) and not self.params.get('verbose'):
return
self._write_string(
'%s%s' % (self._bidi_workaround(message), ('' if skip_eol else '\n')),
'{}{}'.format(self._bidi_workaround(message), ('' if skip_eol else '\n')),
self._out_files.screen, only_once=only_once)
def to_stderr(self, message, only_once=False):
@@ -1045,10 +1043,10 @@ class YoutubeDL:
return self._format_text(self._out_files.error, self._allow_colors.error, *args, **kwargs)
def report_warning(self, message, only_once=False):
'''
"""
Print the message to stderr, it will be prefixed with 'WARNING:'
If stderr is a tty file the 'WARNING:' will be colored
'''
"""
if self.params.get('logger') is not None:
self.params['logger'].warning(message)
else:
@@ -1066,14 +1064,14 @@ class YoutubeDL:
self.to_stderr(f'{self._format_err("Deprecated Feature:", self.Styles.ERROR)} {message}', True)
def report_error(self, message, *args, **kwargs):
'''
"""
Do the same as trouble, but prefixes the message with 'ERROR:', colored
in red if stderr is a tty file.
'''
"""
self.trouble(f'{self._format_err("ERROR:", self.Styles.ERROR)} {message}', *args, **kwargs)
def write_debug(self, message, only_once=False):
'''Log debug message or Print message to stderr'''
"""Log debug message or Print message to stderr"""
if not self.params.get('verbose', False):
return
message = f'[debug] {message}'
@@ -1085,14 +1083,14 @@ class YoutubeDL:
def report_file_already_downloaded(self, file_name):
"""Report file has already been fully downloaded."""
try:
self.to_screen('[download] %s has already been downloaded' % file_name)
self.to_screen(f'[download] {file_name} has already been downloaded')
except UnicodeEncodeError:
self.to_screen('[download] The file has already been downloaded')
def report_file_delete(self, file_name):
"""Report that existing file will be deleted."""
try:
self.to_screen('Deleting existing file %s' % file_name)
self.to_screen(f'Deleting existing file {file_name}')
except UnicodeEncodeError:
self.to_screen('Deleting existing file')
@@ -1147,7 +1145,7 @@ class YoutubeDL:
@staticmethod
def escape_outtmpl(outtmpl):
''' Escape any remaining strings like %s, %abc% etc. '''
""" Escape any remaining strings like %s, %abc% etc. """
return re.sub(
STR_FORMAT_RE_TMPL.format('', '(?![%(\0])'),
lambda mobj: ('' if mobj.group('has_key') else '%') + mobj.group(0),
@@ -1155,7 +1153,7 @@ class YoutubeDL:
@classmethod
def validate_outtmpl(cls, outtmpl):
''' @return None or Exception object '''
""" @return None or Exception object """
outtmpl = re.sub(
STR_FORMAT_RE_TMPL.format('[^)]*', '[ljhqBUDS]'),
lambda mobj: f'{mobj.group(0)[:-1]}s',
@@ -1208,13 +1206,13 @@ class YoutubeDL:
}
# Field is of the form key1.key2...
# where keys (except first) can be string, int, slice or "{field, ...}"
FIELD_INNER_RE = r'(?:\w+|%(num)s|%(num)s?(?::%(num)s?){1,2})' % {'num': r'(?:-?\d+)'}
FIELD_RE = r'\w*(?:\.(?:%(inner)s|{%(field)s(?:,%(field)s)*}))*' % {
FIELD_INNER_RE = r'(?:\w+|%(num)s|%(num)s?(?::%(num)s?){1,2})' % {'num': r'(?:-?\d+)'} # noqa: UP031
FIELD_RE = r'\w*(?:\.(?:%(inner)s|{%(field)s(?:,%(field)s)*}))*' % { # noqa: UP031
'inner': FIELD_INNER_RE,
'field': rf'\w*(?:\.{FIELD_INNER_RE})*'
'field': rf'\w*(?:\.{FIELD_INNER_RE})*',
}
MATH_FIELD_RE = rf'(?:{FIELD_RE}|-?{NUMBER_RE})'
MATH_OPERATORS_RE = r'(?:%s)' % '|'.join(map(re.escape, MATH_FUNCTIONS.keys()))
MATH_OPERATORS_RE = r'(?:{})'.format('|'.join(map(re.escape, MATH_FUNCTIONS.keys())))
INTERNAL_FORMAT_RE = re.compile(rf'''(?xs)
(?P<negate>-)?
(?P<fields>{FIELD_RE})
@@ -1337,7 +1335,7 @@ class YoutubeDL:
value, default = None, na
fmt = outer_mobj.group('format')
if fmt == 's' and last_field in field_size_compat_map.keys() and isinstance(value, int):
if fmt == 's' and last_field in field_size_compat_map and isinstance(value, int):
fmt = f'0{field_size_compat_map[last_field]:d}d'
flags = outer_mobj.group('conversion') or ''
@@ -1362,7 +1360,7 @@ class YoutubeDL:
elif fmt[-1] == 'U': # unicode normalized
value, fmt = unicodedata.normalize(
# "+" = compatibility equivalence, "#" = NFD
'NF%s%s' % ('K' if '+' in flags else '', 'D' if '#' in flags else 'C'),
'NF{}{}'.format('K' if '+' in flags else '', 'D' if '#' in flags else 'C'),
value), str_fmt
elif fmt[-1] == 'D': # decimal suffix
num_fmt, fmt = fmt[:-1].replace('#', ''), 's'
@@ -1390,7 +1388,7 @@ class YoutubeDL:
if fmt[-1] in 'csra':
value = sanitizer(last_field, value)
key = '%s\0%s' % (key.replace('%', '%\0'), outer_mobj.group('format'))
key = '{}\0{}'.format(key.replace('%', '%\0'), outer_mobj.group('format'))
TMPL_DICT[key] = value
return '{prefix}%({key}){fmt}'.format(key=key, fmt=fmt, prefix=outer_mobj.group('prefix'))
@@ -1479,9 +1477,9 @@ class YoutubeDL:
date = info_dict.get('upload_date')
if date is not None:
dateRange = self.params.get('daterange', DateRange())
if date not in dateRange:
return f'{date_from_str(date).isoformat()} upload date is not in range {dateRange}'
date_range = self.params.get('daterange', DateRange())
if date not in date_range:
return f'{date_from_str(date).isoformat()} upload date is not in range {date_range}'
view_count = info_dict.get('view_count')
if view_count is not None:
min_views = self.params.get('min_views')
@@ -1491,7 +1489,7 @@ class YoutubeDL:
if max_views is not None and view_count > max_views:
return 'Skipping %s, because it has exceeded the maximum view count (%d/%d)' % (video_title, view_count, max_views)
if age_restricted(info_dict.get('age_limit'), self.params.get('age_limit')):
return 'Skipping "%s" because it is age restricted' % video_title
return f'Skipping "{video_title}" because it is age restricted'
match_filter = self.params.get('match_filter')
if match_filter is None:
@@ -1544,7 +1542,7 @@ class YoutubeDL:
@staticmethod
def add_extra_info(info_dict, extra_info):
'''Set the keys from extra_info in info dict if they are missing'''
"""Set the keys from extra_info in info dict if they are missing"""
for key, value in extra_info.items():
info_dict.setdefault(key, value)
@@ -1590,7 +1588,7 @@ class YoutubeDL:
self.to_screen(f'[download] {self._format_screen(temp_id, self.Styles.ID)}: '
'has already been recorded in the archive')
if self.params.get('break_on_existing', False):
raise ExistingVideoReached()
raise ExistingVideoReached
break
return self.__extract_info(url, self.get_info_extractor(key), download, extra_info, process)
else:
@@ -1616,8 +1614,8 @@ class YoutubeDL:
except GeoRestrictedError as e:
msg = e.msg
if e.countries:
msg += '\nThis video is available in %s.' % ', '.join(
map(ISO3166Utils.short2full, e.countries))
msg += '\nThis video is available in {}.'.format(', '.join(
map(ISO3166Utils.short2full, e.countries)))
msg += '\nYou might want to use a VPN or a proxy server (with --proxy) to workaround.'
self.report_error(msg)
except ExtractorError as e: # An error we somewhat expected
@@ -1826,8 +1824,8 @@ class YoutubeDL:
if isinstance(additional_urls, str):
additional_urls = [additional_urls]
self.to_screen(
'[info] %s: %d additional URL(s) requested' % (ie_result['id'], len(additional_urls)))
self.write_debug('Additional URLs: "%s"' % '", "'.join(additional_urls))
'[info] {}: {} additional URL(s) requested'.format(ie_result['id'], len(additional_urls)))
self.write_debug('Additional URLs: "{}"'.format('", "'.join(additional_urls)))
ie_result['additional_entries'] = [
self.extract_info(
url, download, extra_info=extra_info,
@@ -1879,8 +1877,8 @@ class YoutubeDL:
webpage_url = ie_result.get('webpage_url') # Playlists maynot have webpage_url
if webpage_url and webpage_url in self._playlist_urls:
self.to_screen(
'[download] Skipping already downloaded playlist: %s'
% ie_result.get('title') or ie_result.get('id'))
'[download] Skipping already downloaded playlist: {}'.format(
ie_result.get('title')) or ie_result.get('id'))
return
self._playlist_level += 1
@@ -1895,8 +1893,8 @@ class YoutubeDL:
self._playlist_urls.clear()
elif result_type == 'compat_list':
self.report_warning(
'Extractor %s returned a compat_list result. '
'It needs to be updated.' % ie_result.get('extractor'))
'Extractor {} returned a compat_list result. '
'It needs to be updated.'.format(ie_result.get('extractor')))
def _fixup(r):
self.add_extra_info(r, {
@@ -1913,7 +1911,7 @@ class YoutubeDL:
]
return ie_result
else:
raise Exception('Invalid result type: %s' % result_type)
raise Exception(f'Invalid result type: {result_type}')
def _ensure_dir_exists(self, path):
return make_dir(path, self.report_error)
@@ -2029,8 +2027,9 @@ class YoutubeDL:
resolved_entries[i] = (playlist_index, NO_DEFAULT)
continue
self.to_screen('[download] Downloading item %s of %s' % (
self._format_screen(i + 1, self.Styles.ID), self._format_screen(n_entries, self.Styles.EMPHASIS)))
self.to_screen(
f'[download] Downloading item {self._format_screen(i + 1, self.Styles.ID)} '
f'of {self._format_screen(n_entries, self.Styles.EMPHASIS)}')
entry_result = self.__process_iterable_entry(entry, download, collections.ChainMap({
'playlist_index': playlist_index,
@@ -2080,9 +2079,9 @@ class YoutubeDL:
}
operator_rex = re.compile(r'''(?x)\s*
(?P<key>[\w.-]+)\s*
(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
(?P<op>{})(?P<none_inclusive>\s*\?)?\s*
(?P<value>[0-9.]+(?:[kKmMgGtTpPeEzZyY]i?[Bb]?)?)\s*
''' % '|'.join(map(re.escape, OPERATORS.keys())))
'''.format('|'.join(map(re.escape, OPERATORS.keys()))))
m = operator_rex.fullmatch(filter_spec)
if m:
try:
@@ -2093,7 +2092,7 @@ class YoutubeDL:
comparison_value = parse_filesize(m.group('value') + 'B')
if comparison_value is None:
raise ValueError(
'Invalid value %r in format specification %r' % (
'Invalid value {!r} in format specification {!r}'.format(
m.group('value'), filter_spec))
op = OPERATORS[m.group('op')]
@@ -2103,15 +2102,15 @@ class YoutubeDL:
'^=': lambda attr, value: attr.startswith(value),
'$=': lambda attr, value: attr.endswith(value),
'*=': lambda attr, value: value in attr,
'~=': lambda attr, value: value.search(attr) is not None
'~=': lambda attr, value: value.search(attr) is not None,
}
str_operator_rex = re.compile(r'''(?x)\s*
(?P<key>[a-zA-Z0-9._-]+)\s*
(?P<negation>!\s*)?(?P<op>%s)\s*(?P<none_inclusive>\?\s*)?
(?P<negation>!\s*)?(?P<op>{})\s*(?P<none_inclusive>\?\s*)?
(?P<quote>["'])?
(?P<value>(?(quote)(?:(?!(?P=quote))[^\\]|\\.)+|[\w.-]+))
(?(quote)(?P=quote))\s*
''' % '|'.join(map(re.escape, STR_OPERATORS.keys())))
'''.format('|'.join(map(re.escape, STR_OPERATORS.keys()))))
m = str_operator_rex.fullmatch(filter_spec)
if m:
if m.group('op') == '~=':
@@ -2125,7 +2124,7 @@ class YoutubeDL:
op = str_op
if not m:
raise SyntaxError('Invalid filter specification %r' % filter_spec)
raise SyntaxError(f'Invalid filter specification {filter_spec!r}')
def _filter(f):
actual_value = f.get(m.group('key'))
@@ -2141,7 +2140,7 @@ class YoutubeDL:
if working:
yield f
continue
self.to_screen('[info] Testing format %s' % f['format_id'])
self.to_screen('[info] Testing format {}'.format(f['format_id']))
path = self.get_output_path('temp')
if not self._ensure_dir_exists(f'{path}/'):
continue
@@ -2149,19 +2148,19 @@ class YoutubeDL:
temp_file.close()
try:
success, _ = self.dl(temp_file.name, f, test=True)
except (DownloadError, OSError, ValueError) + network_exceptions:
except (DownloadError, OSError, ValueError, *network_exceptions):
success = False
finally:
if os.path.exists(temp_file.name):
try:
os.remove(temp_file.name)
except OSError:
self.report_warning('Unable to delete temporary file "%s"' % temp_file.name)
self.report_warning(f'Unable to delete temporary file "{temp_file.name}"')
f['__working'] = success
if success:
yield f
else:
self.to_screen('[info] Unable to download format %s. Skipping...' % f['format_id'])
self.to_screen('[info] Unable to download format {}. Skipping...'.format(f['format_id']))
def _select_formats(self, formats, selector):
return list(selector({
@@ -2214,8 +2213,8 @@ class YoutubeDL:
def _parse_filter(tokens):
filter_parts = []
for type, string_, start, _, _ in tokens:
if type == tokenize.OP and string_ == ']':
for type_, string_, _start, _, _ in tokens:
if type_ == tokenize.OP and string_ == ']':
return ''.join(filter_parts)
else:
filter_parts.append(string_)
@@ -2225,23 +2224,23 @@ class YoutubeDL:
# E.g. 'mp4' '-' 'baseline' '-' '16x9' is converted to 'mp4-baseline-16x9'
ALLOWED_OPS = ('/', '+', ',', '(', ')')
last_string, last_start, last_end, last_line = None, None, None, None
for type, string_, start, end, line in tokens:
if type == tokenize.OP and string_ == '[':
for type_, string_, start, end, line in tokens:
if type_ == tokenize.OP and string_ == '[':
if last_string:
yield tokenize.NAME, last_string, last_start, last_end, last_line
last_string = None
yield type, string_, start, end, line
yield type_, string_, start, end, line
# everything inside brackets will be handled by _parse_filter
for type, string_, start, end, line in tokens:
yield type, string_, start, end, line
if type == tokenize.OP and string_ == ']':
for type_, string_, start, end, line in tokens:
yield type_, string_, start, end, line
if type_ == tokenize.OP and string_ == ']':
break
elif type == tokenize.OP and string_ in ALLOWED_OPS:
elif type_ == tokenize.OP and string_ in ALLOWED_OPS:
if last_string:
yield tokenize.NAME, last_string, last_start, last_end, last_line
last_string = None
yield type, string_, start, end, line
elif type in [tokenize.NAME, tokenize.NUMBER, tokenize.OP]:
yield type_, string_, start, end, line
elif type_ in [tokenize.NAME, tokenize.NUMBER, tokenize.OP]:
if not last_string:
last_string = string_
last_start = start
@@ -2254,13 +2253,13 @@ class YoutubeDL:
def _parse_format_selection(tokens, inside_merge=False, inside_choice=False, inside_group=False):
selectors = []
current_selector = None
for type, string_, start, _, _ in tokens:
for type_, string_, start, _, _ in tokens:
# ENCODING is only defined in Python 3.x
if type == getattr(tokenize, 'ENCODING', None):
if type_ == getattr(tokenize, 'ENCODING', None):
continue
elif type in [tokenize.NAME, tokenize.NUMBER]:
elif type_ in [tokenize.NAME, tokenize.NUMBER]:
current_selector = FormatSelector(SINGLE, string_, [])
elif type == tokenize.OP:
elif type_ == tokenize.OP:
if string_ == ')':
if not inside_group:
# ')' will be handled by the parentheses group
@@ -2303,7 +2302,7 @@ class YoutubeDL:
current_selector = FormatSelector(MERGE, (selector_1, selector_2), [])
else:
raise syntax_error(f'Operator not recognized: "{string_}"', start)
elif type == tokenize.ENDMARKER:
elif type_ == tokenize.ENDMARKER:
break
if current_selector:
selectors.append(current_selector)
@@ -2378,7 +2377,7 @@ class YoutubeDL:
'acodec': the_only_audio.get('acodec'),
'abr': the_only_audio.get('abr'),
'asr': the_only_audio.get('asr'),
'audio_channels': the_only_audio.get('audio_channels')
'audio_channels': the_only_audio.get('audio_channels'),
})
return new_dict
@@ -2459,9 +2458,9 @@ class YoutubeDL:
format_fallback = not format_type and not format_modified # for b, w
_filter_f = (
(lambda f: f.get('%scodec' % format_type) != 'none')
(lambda f: f.get(f'{format_type}codec') != 'none')
if format_type and format_modified # bv*, ba*, wv*, wa*
else (lambda f: f.get('%scodec' % not_format_type) == 'none')
else (lambda f: f.get(f'{not_format_type}codec') == 'none')
if format_type # bv, ba, wv, wa
else (lambda f: f.get('vcodec') != 'none' and f.get('acodec') != 'none')
if not format_modified # b, w
@@ -2529,7 +2528,7 @@ class YoutubeDL:
def __next__(self):
if self.counter >= len(self.tokens):
raise StopIteration()
raise StopIteration
value = self.tokens[self.counter]
self.counter += 1
return value
@@ -2612,7 +2611,7 @@ class YoutubeDL:
self._sort_thumbnails(thumbnails)
for i, t in enumerate(thumbnails):
if t.get('id') is None:
t['id'] = '%d' % i
t['id'] = str(i)
if t.get('width') and t.get('height'):
t['resolution'] = '%dx%d' % (t['width'], t['height'])
t['url'] = sanitize_url(t['url'])
@@ -2673,8 +2672,8 @@ class YoutubeDL:
# Auto generate title fields corresponding to the *_number fields when missing
# in order to always have clean titles. This is very common for TV series.
for field in ('chapter', 'season', 'episode'):
if final and info_dict.get('%s_number' % field) is not None and not info_dict.get(field):
info_dict[field] = '%s %d' % (field.capitalize(), info_dict['%s_number' % field])
if final and info_dict.get(f'{field}_number') is not None and not info_dict.get(field):
info_dict[field] = '%s %d' % (field.capitalize(), info_dict[f'{field}_number'])
for old_key, new_key in self._deprecated_multivalue_fields.items():
if new_key in info_dict and old_key in info_dict:
@@ -2706,8 +2705,8 @@ class YoutubeDL:
def report_force_conversion(field, field_not, conversion):
self.report_warning(
'"%s" field is not %s - forcing %s conversion, there is an error in extractor'
% (field, field_not, conversion))
f'"{field}" field is not {field_not} - forcing {conversion} conversion, '
'there is an error in extractor')
def sanitize_string_field(info, string_field):
field = info.get(string_field)
@@ -2824,28 +2823,28 @@ class YoutubeDL:
if not formats:
self.raise_no_formats(info_dict)
for format in formats:
sanitize_string_field(format, 'format_id')
sanitize_numeric_fields(format)
format['url'] = sanitize_url(format['url'])
if format.get('ext') is None:
format['ext'] = determine_ext(format['url']).lower()
if format['ext'] in ('aac', 'opus', 'mp3', 'flac', 'vorbis'):
if format.get('acodec') is None:
format['acodec'] = format['ext']
if format.get('protocol') is None:
format['protocol'] = determine_protocol(format)
if format.get('resolution') is None:
format['resolution'] = self.format_resolution(format, default=None)
if format.get('dynamic_range') is None and format.get('vcodec') != 'none':
format['dynamic_range'] = 'SDR'
if format.get('aspect_ratio') is None:
format['aspect_ratio'] = try_call(lambda: round(format['width'] / format['height'], 2))
for fmt in formats:
sanitize_string_field(fmt, 'format_id')
sanitize_numeric_fields(fmt)
fmt['url'] = sanitize_url(fmt['url'])
if fmt.get('ext') is None:
fmt['ext'] = determine_ext(fmt['url']).lower()
if fmt['ext'] in ('aac', 'opus', 'mp3', 'flac', 'vorbis'):
if fmt.get('acodec') is None:
fmt['acodec'] = fmt['ext']
if fmt.get('protocol') is None:
fmt['protocol'] = determine_protocol(fmt)
if fmt.get('resolution') is None:
fmt['resolution'] = self.format_resolution(fmt, default=None)
if fmt.get('dynamic_range') is None and fmt.get('vcodec') != 'none':
fmt['dynamic_range'] = 'SDR'
if fmt.get('aspect_ratio') is None:
fmt['aspect_ratio'] = try_call(lambda: round(fmt['width'] / fmt['height'], 2))
# For fragmented formats, "tbr" is often max bitrate and not average
if (('manifest-filesize-approx' in self.params['compat_opts'] or not format.get('manifest_url'))
and not format.get('filesize') and not format.get('filesize_approx')):
format['filesize_approx'] = filesize_from_tbr(format.get('tbr'), info_dict.get('duration'))
format['http_headers'] = self._calc_headers(collections.ChainMap(format, info_dict), load_cookies=True)
if (('manifest-filesize-approx' in self.params['compat_opts'] or not fmt.get('manifest_url'))
and not fmt.get('filesize') and not fmt.get('filesize_approx')):
fmt['filesize_approx'] = filesize_from_tbr(fmt.get('tbr'), info_dict.get('duration'))
fmt['http_headers'] = self._calc_headers(collections.ChainMap(fmt, info_dict), load_cookies=True)
# Safeguard against old/insecure infojson when using --load-info-json
if info_dict.get('http_headers'):
@@ -2858,36 +2857,36 @@ class YoutubeDL:
self.sort_formats({
'formats': formats,
'_format_sort_fields': info_dict.get('_format_sort_fields')
'_format_sort_fields': info_dict.get('_format_sort_fields'),
})
# Sanitize and group by format_id
formats_dict = {}
for i, format in enumerate(formats):
if not format.get('format_id'):
format['format_id'] = str(i)
for i, fmt in enumerate(formats):
if not fmt.get('format_id'):
fmt['format_id'] = str(i)
else:
# Sanitize format_id from characters used in format selector expression
format['format_id'] = re.sub(r'[\s,/+\[\]()]', '_', format['format_id'])
formats_dict.setdefault(format['format_id'], []).append(format)
fmt['format_id'] = re.sub(r'[\s,/+\[\]()]', '_', fmt['format_id'])
formats_dict.setdefault(fmt['format_id'], []).append(fmt)
# Make sure all formats have unique format_id
common_exts = set(itertools.chain(*self._format_selection_exts.values()))
for format_id, ambiguous_formats in formats_dict.items():
ambigious_id = len(ambiguous_formats) > 1
for i, format in enumerate(ambiguous_formats):
for i, fmt in enumerate(ambiguous_formats):
if ambigious_id:
format['format_id'] = '%s-%d' % (format_id, i)
fmt['format_id'] = f'{format_id}-{i}'
# Ensure there is no conflict between id and ext in format selection
# See https://github.com/yt-dlp/yt-dlp/issues/1282
if format['format_id'] != format['ext'] and format['format_id'] in common_exts:
format['format_id'] = 'f%s' % format['format_id']
if fmt['format_id'] != fmt['ext'] and fmt['format_id'] in common_exts:
fmt['format_id'] = 'f{}'.format(fmt['format_id'])
if format.get('format') is None:
format['format'] = '{id} - {res}{note}'.format(
id=format['format_id'],
res=self.format_resolution(format),
note=format_field(format, 'format_note', ' (%s)'),
if fmt.get('format') is None:
fmt['format'] = '{id} - {res}{note}'.format(
id=fmt['format_id'],
res=self.format_resolution(fmt),
note=format_field(fmt, 'format_note', ' (%s)'),
)
if self.params.get('check_formats') is True:
@@ -3009,7 +3008,7 @@ class YoutubeDL:
info_dict['requested_downloads'] = downloaded_formats
info_dict = self.run_all_pps('after_video', info_dict)
if max_downloads_reached:
raise MaxDownloadsReached()
raise MaxDownloadsReached
# We update the info dict with the selected best quality format (backwards compatibility)
info_dict.update(best_format)
@@ -3070,8 +3069,8 @@ class YoutubeDL:
else:
f = formats[-1]
self.report_warning(
'No subtitle format found matching "%s" for language %s, '
'using %s. Use --list-subs for a list of available subtitles' % (formats_query, lang, f['ext']))
'No subtitle format found matching "{}" for language {}, '
'using {}. Use --list-subs for a list of available subtitles'.format(formats_query, lang, f['ext']))
subs[lang] = f
return subs
@@ -3226,7 +3225,7 @@ class YoutubeDL:
def check_max_downloads():
if self._num_downloads >= float(self.params.get('max_downloads') or 'inf'):
raise MaxDownloadsReached()
raise MaxDownloadsReached
if self.params.get('simulate'):
info_dict['__write_download_archive'] = self.params.get('force_write_download_archive')
@@ -3400,7 +3399,7 @@ class YoutubeDL:
for f in info_dict['requested_formats'] if fd != FFmpegFD else []:
f['filepath'] = fname = prepend_extension(
correct_ext(temp_filename, info_dict['ext']),
'f%s' % f['format_id'], info_dict['ext'])
'f{}'.format(f['format_id']), info_dict['ext'])
downloaded.append(fname)
info_dict['url'] = '\n'.join(f['url'] for f in info_dict['requested_formats'])
success, real_download = self.dl(temp_filename, info_dict)
@@ -3433,7 +3432,7 @@ class YoutubeDL:
if temp_filename != '-':
fname = prepend_extension(
correct_ext(temp_filename, new_info['ext']),
'f%s' % f['format_id'], new_info['ext'])
'f{}'.format(f['format_id']), new_info['ext'])
if not self._ensure_dir_exists(fname):
return
f['filepath'] = fname
@@ -3465,11 +3464,11 @@ class YoutubeDL:
info_dict['__finaldir'] = os.path.dirname(os.path.abspath(encodeFilename(full_filename)))
except network_exceptions as err:
self.report_error('unable to download video data: %s' % error_to_compat_str(err))
self.report_error(f'unable to download video data: {err}')
return
except OSError as err:
raise UnavailableVideoError(err)
except (ContentTooShortError, ) as err:
except ContentTooShortError as err:
self.report_error(f'content too short (expected {err.expected} bytes and served {err.downloaded})')
return
@@ -3536,13 +3535,13 @@ class YoutubeDL:
try:
replace_info_dict(self.post_process(dl_filename, info_dict, files_to_move))
except PostProcessingError as err:
self.report_error('Postprocessing: %s' % str(err))
self.report_error(f'Postprocessing: {err}')
return
try:
for ph in self._post_hooks:
ph(info_dict['filepath'])
except Exception as err:
self.report_error('post hooks: %s' % str(err))
self.report_error(f'post hooks: {err}')
return
info_dict['__write_download_archive'] = True
@@ -3609,7 +3608,7 @@ class YoutubeDL:
@staticmethod
def sanitize_info(info_dict, remove_private_keys=False):
''' Sanitize the infodict for converting to json '''
""" Sanitize the infodict for converting to json """
if info_dict is None:
return info_dict
info_dict.setdefault('epoch', int(time.time()))
@@ -3644,7 +3643,7 @@ class YoutubeDL:
@staticmethod
def filter_requested_info(info_dict, actually_filter=True):
''' Alias of sanitize_info for backward compatibility '''
""" Alias of sanitize_info for backward compatibility """
return YoutubeDL.sanitize_info(info_dict, actually_filter)
def _delete_downloaded_files(self, *files_to_delete, info={}, msg=None):
@@ -3666,7 +3665,7 @@ class YoutubeDL:
actual_post_extract(video_dict or {})
return
post_extractor = info_dict.pop('__post_extractor', None) or (lambda: {})
post_extractor = info_dict.pop('__post_extractor', None) or dict
info_dict.update(post_extractor())
actual_post_extract(info_dict or {})
@@ -3771,7 +3770,7 @@ class YoutubeDL:
if format.get('width') and format.get('height'):
return '%dx%d' % (format['width'], format['height'])
elif format.get('height'):
return '%sp' % format['height']
return '{}p'.format(format['height'])
elif format.get('width'):
return '%dx?' % format['width']
return default
@@ -3788,7 +3787,7 @@ class YoutubeDL:
if fdict.get('language'):
if res:
res += ' '
res += '[%s]' % fdict['language']
res += '[{}]'.format(fdict['language'])
if fdict.get('format_note') is not None:
if res:
res += ' '
@@ -3800,7 +3799,7 @@ class YoutubeDL:
if fdict.get('container') is not None:
if res:
res += ', '
res += '%s container' % fdict['container']
res += '{} container'.format(fdict['container'])
if (fdict.get('vcodec') is not None
and fdict.get('vcodec') != 'none'):
if res:
@@ -3815,7 +3814,7 @@ class YoutubeDL:
if fdict.get('fps') is not None:
if res:
res += ', '
res += '%sfps' % fdict['fps']
res += '{}fps'.format(fdict['fps'])
if fdict.get('acodec') is not None:
if res:
res += ', '
@@ -3858,7 +3857,7 @@ class YoutubeDL:
format_field(f, 'format_id'),
format_field(f, 'ext'),
self.format_resolution(f),
self._format_note(f)
self._format_note(f),
] for f in formats if (f.get('preference') or 0) >= -1000]
return render_table(['format code', 'extension', 'resolution', 'note'], table, extra_gap=1)
@@ -3964,11 +3963,11 @@ class YoutubeDL:
from .extractor.extractors import _LAZY_LOADER
from .extractor.extractors import (
_PLUGIN_CLASSES as plugin_ies,
_PLUGIN_OVERRIDES as plugin_ie_overrides
_PLUGIN_OVERRIDES as plugin_ie_overrides,
)
def get_encoding(stream):
ret = str(getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__))
ret = str(getattr(stream, 'encoding', f'missing ({type(stream).__name__})'))
additional_info = []
if os.environ.get('TERM', '').lower() == 'dumb':
additional_info.append('dumb')
@@ -3979,13 +3978,13 @@ class YoutubeDL:
ret = f'{ret} ({",".join(additional_info)})'
return ret
encoding_str = 'Encodings: locale %s, fs %s, pref %s, %s' % (
encoding_str = 'Encodings: locale {}, fs {}, pref {}, {}'.format(
locale.getpreferredencoding(),
sys.getfilesystemencoding(),
self.get_encoding(),
', '.join(
f'{key} {get_encoding(stream)}' for key, stream in self._out_files.items_
if stream is not None and key != 'console')
if stream is not None and key != 'console'),
)
logger = self.params.get('logger')
@@ -4017,7 +4016,7 @@ class YoutubeDL:
else:
write_debug('Lazy loading extractors is disabled')
if self.params['compat_opts']:
write_debug('Compatibility options: %s' % ', '.join(self.params['compat_opts']))
write_debug('Compatibility options: {}'.format(', '.join(self.params['compat_opts'])))
if current_git_head():
write_debug(f'Git HEAD: {current_git_head()}')
@@ -4026,14 +4025,14 @@ class YoutubeDL:
exe_versions, ffmpeg_features = FFmpegPostProcessor.get_versions_and_features(self)
ffmpeg_features = {key for key, val in ffmpeg_features.items() if val}
if ffmpeg_features:
exe_versions['ffmpeg'] += ' (%s)' % ','.join(sorted(ffmpeg_features))
exe_versions['ffmpeg'] += ' ({})'.format(','.join(sorted(ffmpeg_features)))
exe_versions['rtmpdump'] = rtmpdump_version()
exe_versions['phantomjs'] = PhantomJSwrapper._version()
exe_str = ', '.join(
f'{exe} {v}' for exe, v in sorted(exe_versions.items()) if v
) or 'none'
write_debug('exe versions: %s' % exe_str)
write_debug(f'exe versions: {exe_str}')
from .compat.compat_utils import get_package_info
from .dependencies import available_dependencies
@@ -4045,7 +4044,7 @@ class YoutubeDL:
write_debug(f'Proxy map: {self.proxies}')
write_debug(f'Request Handlers: {", ".join(rh.RH_NAME for rh in self._request_director.handlers.values())}')
for plugin_type, plugins in {'Extractor': plugin_ies, 'Post-Processor': plugin_pps}.items():
display_list = ['%s%s' % (
display_list = ['{}{}'.format(
klass.__name__, '' if klass.__name__ == name else f' as {name}')
for name, klass in plugins.items()]
if plugin_type == 'Extractor':
@@ -4062,14 +4061,13 @@ class YoutubeDL:
# Not implemented
if False and self.params.get('call_home'):
ipaddr = self.urlopen('https://yt-dl.org/ip').read().decode()
write_debug('Public IP address: %s' % ipaddr)
write_debug(f'Public IP address: {ipaddr}')
latest_version = self.urlopen(
'https://yt-dl.org/latest/version').read().decode()
if version_tuple(latest_version) > version_tuple(__version__):
self.report_warning(
'You are using an outdated version (newest version: %s)! '
'See https://yt-dl.org/update if you need help updating.' %
latest_version)
f'You are using an outdated version (newest version: {latest_version})! '
'See https://yt-dl.org/update if you need help updating.')
@functools.cached_property
def proxies(self):
@@ -4103,7 +4101,7 @@ class YoutubeDL:
return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies)
def _get_available_impersonate_targets(self):
# todo(future): make available as public API
# TODO(future): make available as public API
return [
(target, rh.RH_NAME)
for rh in self._request_director.handlers.values()
@@ -4112,7 +4110,7 @@ class YoutubeDL:
]
def _impersonate_target_available(self, target):
# todo(future): make available as public API
# TODO(future): make available as public API
return any(
rh.is_supported_target(target)
for rh in self._request_director.handlers.values()
@@ -4238,7 +4236,7 @@ class YoutubeDL:
return encoding
def _write_info_json(self, label, ie_result, infofn, overwrite=None):
''' Write infojson and returns True = written, 'exists' = Already exists, False = skip, None = error '''
""" Write infojson and returns True = written, 'exists' = Already exists, False = skip, None = error """
if overwrite is None:
overwrite = self.params.get('overwrites', True)
if not self.params.get('writeinfojson'):
@@ -4261,7 +4259,7 @@ class YoutubeDL:
return None
def _write_description(self, label, ie_result, descfn):
''' Write description and returns True = written, False = skip, None = error '''
""" Write description and returns True = written, False = skip, None = error """
if not self.params.get('writedescription'):
return False
elif not descfn:
@@ -4285,7 +4283,7 @@ class YoutubeDL:
return True
def _write_subtitles(self, info_dict, filename):
''' Write subtitles to file and return list of (sub_filename, final_sub_filename); or None if error'''
""" Write subtitles to file and return list of (sub_filename, final_sub_filename); or None if error"""
ret = []
subtitles = info_dict.get('requested_subtitles')
if not (self.params.get('writesubtitles') or self.params.get('writeautomaticsub')):
@@ -4331,7 +4329,7 @@ class YoutubeDL:
self.dl(sub_filename, sub_copy, subtitle=True)
sub_info['filepath'] = sub_filename
ret.append((sub_filename, sub_filename_final))
except (DownloadError, ExtractorError, IOError, OSError, ValueError) + network_exceptions as err:
except (DownloadError, ExtractorError, OSError, ValueError, *network_exceptions) as err:
msg = f'Unable to download video subtitles for {sub_lang!r}: {err}'
if self.params.get('ignoreerrors') is not True: # False or 'only_download'
if not self.params.get('ignoreerrors'):
@@ -4341,7 +4339,7 @@ class YoutubeDL:
return ret
def _write_thumbnails(self, label, info_dict, filename, thumb_filename_base=None):
''' Write thumbnails to file and return list of (thumb_filename, final_thumb_filename); or None if error '''
""" Write thumbnails to file and return list of (thumb_filename, final_thumb_filename); or None if error """
write_all = self.params.get('write_all_thumbnails', False)
thumbnails, ret = [], []
if write_all or self.params.get('writethumbnail', False):
@@ -4368,8 +4366,8 @@ class YoutubeDL:
existing_thumb = self.existing_file((thumb_filename_final, thumb_filename))
if existing_thumb:
self.to_screen('[info] %s is already present' % (
thumb_display_id if multiple else f'{label} thumbnail').capitalize())
self.to_screen('[info] {} is already present'.format((
thumb_display_id if multiple else f'{label} thumbnail').capitalize()))
t['filepath'] = existing_thumb
ret.append((existing_thumb, thumb_filename_final))
else: