mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2025-10-30 22:25:19 +00:00
[cleanup] Misc cleanup and refactor (#2173)
This commit is contained in:
103
yt_dlp/utils.py
103
yt_dlp/utils.py
@@ -70,6 +70,7 @@ from .socks import ProxyType, sockssocket
|
||||
|
||||
try:
|
||||
import certifi
|
||||
|
||||
# The certificate may not be bundled in executable
|
||||
has_certifi = os.path.exists(certifi.where())
|
||||
except ImportError:
|
||||
@@ -282,22 +283,16 @@ def write_json_file(obj, fn):
|
||||
if sys.platform == 'win32':
|
||||
# Need to remove existing file on Windows, else os.rename raises
|
||||
# WindowsError or FileExistsError.
|
||||
try:
|
||||
with contextlib.suppress(OSError):
|
||||
os.unlink(fn)
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
with contextlib.suppress(OSError):
|
||||
mask = os.umask(0)
|
||||
os.umask(mask)
|
||||
os.chmod(tf.name, 0o666 & ~mask)
|
||||
except OSError:
|
||||
pass
|
||||
os.rename(tf.name, fn)
|
||||
except Exception:
|
||||
try:
|
||||
with contextlib.suppress(OSError):
|
||||
os.remove(tf.name)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
|
||||
|
||||
@@ -575,12 +570,9 @@ def extract_attributes(html_element):
|
||||
}.
|
||||
"""
|
||||
parser = HTMLAttributeParser()
|
||||
try:
|
||||
with contextlib.suppress(compat_HTMLParseError):
|
||||
parser.feed(html_element)
|
||||
parser.close()
|
||||
# Older Python may throw HTMLParseError in case of malformed HTML
|
||||
except compat_HTMLParseError:
|
||||
pass
|
||||
return parser.attrs
|
||||
|
||||
|
||||
@@ -800,10 +792,8 @@ def _htmlentity_transform(entity_with_semicolon):
|
||||
else:
|
||||
base = 10
|
||||
# See https://github.com/ytdl-org/youtube-dl/issues/7518
|
||||
try:
|
||||
with contextlib.suppress(ValueError):
|
||||
return compat_chr(int(numstr, base))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Unknown entity in name, return its literal representation
|
||||
return '&%s;' % entity
|
||||
@@ -812,7 +802,7 @@ def _htmlentity_transform(entity_with_semicolon):
|
||||
def unescapeHTML(s):
|
||||
if s is None:
|
||||
return None
|
||||
assert type(s) == compat_str
|
||||
assert isinstance(s, str)
|
||||
|
||||
return re.sub(
|
||||
r'&([^&;]+;)', lambda m: _htmlentity_transform(m.group(1)), s)
|
||||
@@ -865,7 +855,7 @@ def get_subprocess_encoding():
|
||||
|
||||
|
||||
def encodeFilename(s, for_subprocess=False):
|
||||
assert type(s) == str
|
||||
assert isinstance(s, str)
|
||||
return s
|
||||
|
||||
|
||||
@@ -924,10 +914,8 @@ def _ssl_load_windows_store_certs(ssl_context, storename):
|
||||
except PermissionError:
|
||||
return
|
||||
for cert in certs:
|
||||
try:
|
||||
with contextlib.suppress(ssl.SSLError):
|
||||
ssl_context.load_verify_locations(cadata=cert)
|
||||
except ssl.SSLError:
|
||||
pass
|
||||
|
||||
|
||||
def make_HTTPS_handler(params, **kwargs):
|
||||
@@ -1391,7 +1379,7 @@ def make_socks_conn_class(base_class, socks_proxy):
|
||||
def connect(self):
|
||||
self.sock = sockssocket()
|
||||
self.sock.setproxy(*proxy_args)
|
||||
if type(self.timeout) in (int, float):
|
||||
if isinstance(self.timeout, (int, float)):
|
||||
self.sock.settimeout(self.timeout)
|
||||
self.sock.connect((self.host, self.port))
|
||||
|
||||
@@ -1526,9 +1514,7 @@ class YoutubeDLCookieJar(compat_cookiejar.MozillaCookieJar):
|
||||
try:
|
||||
cf.write(prepare_line(line))
|
||||
except compat_cookiejar.LoadError as e:
|
||||
write_string(
|
||||
'WARNING: skipping cookie file entry due to %s: %r\n'
|
||||
% (e, line), sys.stderr)
|
||||
write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
|
||||
continue
|
||||
cf.seek(0)
|
||||
self._really_load(cf, filename, ignore_discard, ignore_expires)
|
||||
@@ -1646,12 +1632,10 @@ def parse_iso8601(date_str, delimiter='T', timezone=None):
|
||||
if timezone is None:
|
||||
timezone, date_str = extract_timezone(date_str)
|
||||
|
||||
try:
|
||||
with contextlib.suppress(ValueError):
|
||||
date_format = f'%Y-%m-%d{delimiter}%H:%M:%S'
|
||||
dt = datetime.datetime.strptime(date_str, date_format) - timezone
|
||||
return calendar.timegm(dt.timetuple())
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def date_formats(day_first=True):
|
||||
@@ -1671,17 +1655,13 @@ def unified_strdate(date_str, day_first=True):
|
||||
_, date_str = extract_timezone(date_str)
|
||||
|
||||
for expression in date_formats(day_first):
|
||||
try:
|
||||
with contextlib.suppress(ValueError):
|
||||
upload_date = datetime.datetime.strptime(date_str, expression).strftime('%Y%m%d')
|
||||
except ValueError:
|
||||
pass
|
||||
if upload_date is None:
|
||||
timetuple = email.utils.parsedate_tz(date_str)
|
||||
if timetuple:
|
||||
try:
|
||||
with contextlib.suppress(ValueError):
|
||||
upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d')
|
||||
except ValueError:
|
||||
pass
|
||||
if upload_date is not None:
|
||||
return compat_str(upload_date)
|
||||
|
||||
@@ -1709,11 +1689,9 @@ def unified_timestamp(date_str, day_first=True):
|
||||
date_str = m.group(1)
|
||||
|
||||
for expression in date_formats(day_first):
|
||||
try:
|
||||
with contextlib.suppress(ValueError):
|
||||
dt = datetime.datetime.strptime(date_str, expression) - timezone + datetime.timedelta(hours=pm_delta)
|
||||
return calendar.timegm(dt.timetuple())
|
||||
except ValueError:
|
||||
pass
|
||||
timetuple = email.utils.parsedate_tz(date_str)
|
||||
if timetuple:
|
||||
return calendar.timegm(timetuple) + pm_delta * 3600
|
||||
@@ -1879,9 +1857,8 @@ def get_windows_version():
|
||||
|
||||
|
||||
def write_string(s, out=None, encoding=None):
|
||||
if out is None:
|
||||
out = sys.stderr
|
||||
assert type(s) == compat_str
|
||||
assert isinstance(s, str)
|
||||
out = out or sys.stderr
|
||||
|
||||
if 'b' in getattr(out, 'mode', ''):
|
||||
byt = s.encode(encoding or preferredencoding(), 'ignore')
|
||||
@@ -2483,18 +2460,10 @@ def parse_duration(s):
|
||||
else:
|
||||
return None
|
||||
|
||||
duration = 0
|
||||
if secs:
|
||||
duration += float(secs)
|
||||
if mins:
|
||||
duration += float(mins) * 60
|
||||
if hours:
|
||||
duration += float(hours) * 60 * 60
|
||||
if days:
|
||||
duration += float(days) * 24 * 60 * 60
|
||||
if ms:
|
||||
duration += float(ms.replace(':', '.'))
|
||||
return duration
|
||||
ms = ms.replace(':', '.')
|
||||
return sum(float(part or 0) * mult for part, mult in (
|
||||
(days, 86400), (hours, 3600), (mins, 60), (secs, 1), (ms, 1)))
|
||||
|
||||
|
||||
def prepend_extension(filename, ext, expected_real_ext=None):
|
||||
@@ -2957,9 +2926,10 @@ TV_PARENTAL_GUIDELINES = {
|
||||
|
||||
|
||||
def parse_age_limit(s):
|
||||
if type(s) == int:
|
||||
# isinstance(False, int) is True. So type() must be used instead
|
||||
if type(s) is int:
|
||||
return s if 0 <= s <= 21 else None
|
||||
if not isinstance(s, str):
|
||||
elif not isinstance(s, str):
|
||||
return None
|
||||
m = re.match(r'^(?P<age>\d{1,2})\+?$', s)
|
||||
if m:
|
||||
@@ -3227,7 +3197,7 @@ def parse_codecs(codecs_str):
|
||||
if not tcodec:
|
||||
tcodec = full_codec
|
||||
else:
|
||||
write_string('WARNING: Unknown codec %s\n' % full_codec, sys.stderr)
|
||||
write_string(f'WARNING: Unknown codec {full_codec}\n')
|
||||
if vcodec or acodec or tcodec:
|
||||
return {
|
||||
'vcodec': vcodec or 'none',
|
||||
@@ -4934,7 +4904,7 @@ def get_executable_path():
|
||||
|
||||
def load_plugins(name, suffix, namespace):
|
||||
classes = {}
|
||||
try:
|
||||
with contextlib.suppress(FileNotFoundError):
|
||||
plugins_spec = importlib.util.spec_from_file_location(
|
||||
name, os.path.join(get_executable_path(), 'ytdlp_plugins', name, '__init__.py'))
|
||||
plugins = importlib.util.module_from_spec(plugins_spec)
|
||||
@@ -4947,8 +4917,6 @@ def load_plugins(name, suffix, namespace):
|
||||
continue
|
||||
klass = getattr(plugins, name)
|
||||
classes[name] = namespace[name] = klass
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
return classes
|
||||
|
||||
|
||||
@@ -4957,13 +4925,14 @@ def traverse_obj(
|
||||
casesense=True, is_user_input=False, traverse_string=False):
|
||||
''' Traverse nested list/dict/tuple
|
||||
@param path_list A list of paths which are checked one by one.
|
||||
Each path is a list of keys where each key is a string,
|
||||
a function, a tuple of strings/None or "...".
|
||||
When a fuction is given, it takes the key and value as arguments
|
||||
and returns whether the key matches or not. When a tuple is given,
|
||||
all the keys given in the tuple are traversed, and
|
||||
"..." traverses all the keys in the object
|
||||
"None" returns the object without traversal
|
||||
Each path is a list of keys where each key is a:
|
||||
- None: Do nothing
|
||||
- string: A dictionary key
|
||||
- int: An index into a list
|
||||
- tuple: A list of keys all of which will be traversed
|
||||
- Ellipsis: Fetch all values in the object
|
||||
- Function: Takes the key and value as arguments
|
||||
and returns whether the key matches or not
|
||||
@param default Default value to return
|
||||
@param expected_type Only accept final value of this type (Can also be any callable)
|
||||
@param get_all Return all the values obtained from a path or only the first one
|
||||
@@ -5253,7 +5222,7 @@ class Config:
|
||||
yield from self.own_args or []
|
||||
|
||||
def parse_args(self):
|
||||
return self._parser.parse_args(list(self.all_args))
|
||||
return self._parser.parse_args(self.all_args)
|
||||
|
||||
|
||||
class WebSocketsWrapper():
|
||||
@@ -5339,3 +5308,7 @@ class classproperty:
|
||||
|
||||
def __get__(self, _, cls):
|
||||
return self.f(cls)
|
||||
|
||||
|
||||
def Namespace(**kwargs):
|
||||
return collections.namedtuple('Namespace', kwargs)(**kwargs)
|
||||
|
||||
Reference in New Issue
Block a user