1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-12-29 02:51:30 +00:00

Merge remote-tracking branch 'upstream/master' into wait-retries

This commit is contained in:
Paul Storkman
2025-03-14 23:30:13 +01:00
106 changed files with 7106 additions and 5172 deletions

View File

@@ -8,6 +8,7 @@ import contextlib
import datetime as dt
import email.header
import email.utils
import enum
import errno
import functools
import hashlib
@@ -51,6 +52,7 @@ from ..compat import (
compat_HTMLParseError,
)
from ..dependencies import xattr
from ..globals import IN_CLI
__name__ = __name__.rsplit('.', 1)[0] # noqa: A001: Pretend to be the parent module
@@ -1495,8 +1497,7 @@ def write_string(s, out=None, encoding=None):
# TODO: Use global logger
def deprecation_warning(msg, *, printer=None, stacklevel=0, **kwargs):
from .. import _IN_CLI
if _IN_CLI:
if IN_CLI.value:
if msg in deprecation_warning._cache:
return
deprecation_warning._cache.add(msg)
@@ -4899,10 +4900,6 @@ class Config:
filename = None
__initialized = False
# Internal only, do not use! Hack to enable --plugin-dirs
# TODO(coletdjnz): remove when plugin globals system is implemented
_plugin_dirs = None
def __init__(self, parser, label=None):
self.parser, self.label = parser, label
self._loaded_paths, self.configs = set(), []
@@ -5640,6 +5637,24 @@ def filesize_from_tbr(tbr, duration):
return int(duration * tbr * (1000 / 8))
def _request_dump_filename(url, video_id, data=None, trim_length=None):
if data is not None:
data = hashlib.md5(data).hexdigest()
basen = join_nonempty(video_id, data, url, delim='_')
trim_length = trim_length or 240
if len(basen) > trim_length:
h = '___' + hashlib.md5(basen.encode()).hexdigest()
basen = basen[:trim_length - len(h)] + h
filename = sanitize_filename(f'{basen}.dump', restricted=True)
# Working around MAX_PATH limitation on Windows (see
# http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx)
if os.name == 'nt':
absfilepath = os.path.abspath(filename)
if len(absfilepath) > 259:
filename = fR'\\?\{absfilepath}'
return filename
# XXX: Temporary
class _YDLLogger:
def __init__(self, ydl=None):
@@ -5668,3 +5683,32 @@ class _YDLLogger:
def stderr(self, message):
if self._ydl:
self._ydl.to_stderr(message)
class _ProgressState(enum.Enum):
"""
Represents a state for a progress bar.
See: https://conemu.github.io/en/AnsiEscapeCodes.html#ConEmu_specific_OSC
"""
HIDDEN = 0
INDETERMINATE = 3
VISIBLE = 1
WARNING = 4
ERROR = 2
@classmethod
def from_dict(cls, s, /):
if s['status'] == 'finished':
return cls.INDETERMINATE
# Not currently used
if s['status'] == 'error':
return cls.ERROR
return cls.INDETERMINATE if s.get('_percent') is None else cls.VISIBLE
def get_ansi_escape(self, /, percent=None):
percent = 0 if percent is None else int(percent)
return f'\033]9;4;{self.value};{percent}\007'

View File

@@ -1,9 +1,16 @@
from __future__ import annotations
import collections
import collections.abc
import random
import typing
import urllib.parse
import urllib.request
from ._utils import remove_start
if typing.TYPE_CHECKING:
T = typing.TypeVar('T')
from ._utils import NO_DEFAULT, remove_start
def random_user_agent():
@@ -51,32 +58,141 @@ def random_user_agent():
return _USER_AGENT_TPL % random.choice(_CHROME_VERSIONS)
class HTTPHeaderDict(collections.UserDict, dict):
class HTTPHeaderDict(dict):
"""
Store and access keys case-insensitively.
The constructor can take multiple dicts, in which keys in the latter are prioritised.
Retains a case sensitive mapping of the headers, which can be accessed via `.sensitive()`.
"""
def __new__(cls, *args: typing.Any, **kwargs: typing.Any) -> typing.Self:
obj = dict.__new__(cls, *args, **kwargs)
obj.__sensitive_map = {}
return obj
def __init__(self, *args, **kwargs):
def __init__(self, /, *args, **kwargs):
super().__init__()
for dct in args:
if dct is not None:
self.update(dct)
self.update(kwargs)
self.__sensitive_map = {}
def __setitem__(self, key, value):
if isinstance(value, bytes):
value = value.decode('latin-1')
super().__setitem__(key.title(), str(value).strip())
for dct in filter(None, args):
self.update(dct)
if kwargs:
self.update(kwargs)
def __getitem__(self, key):
def sensitive(self, /) -> dict[str, str]:
return {
self.__sensitive_map[key]: value
for key, value in self.items()
}
def __contains__(self, key: str, /) -> bool:
return super().__contains__(key.title() if isinstance(key, str) else key)
def __delitem__(self, key: str, /) -> None:
key = key.title()
del self.__sensitive_map[key]
super().__delitem__(key)
def __getitem__(self, key, /) -> str:
return super().__getitem__(key.title())
def __delitem__(self, key):
super().__delitem__(key.title())
def __ior__(self, other, /):
if isinstance(other, type(self)):
other = other.sensitive()
if isinstance(other, dict):
self.update(other)
return
return NotImplemented
def __contains__(self, key):
return super().__contains__(key.title() if isinstance(key, str) else key)
def __or__(self, other, /) -> typing.Self:
if isinstance(other, type(self)):
other = other.sensitive()
if isinstance(other, dict):
return type(self)(self.sensitive(), other)
return NotImplemented
def __ror__(self, other, /) -> typing.Self:
if isinstance(other, type(self)):
other = other.sensitive()
if isinstance(other, dict):
return type(self)(other, self.sensitive())
return NotImplemented
def __setitem__(self, key: str, value, /) -> None:
if isinstance(value, bytes):
value = value.decode('latin-1')
key_title = key.title()
self.__sensitive_map[key_title] = key
super().__setitem__(key_title, str(value).strip())
def clear(self, /) -> None:
self.__sensitive_map.clear()
super().clear()
def copy(self, /) -> typing.Self:
return type(self)(self.sensitive())
@typing.overload
def get(self, key: str, /) -> str | None: ...
@typing.overload
def get(self, key: str, /, default: T) -> str | T: ...
def get(self, key, /, default=NO_DEFAULT):
key = key.title()
if default is NO_DEFAULT:
return super().get(key)
return super().get(key, default)
@typing.overload
def pop(self, key: str, /) -> str: ...
@typing.overload
def pop(self, key: str, /, default: T) -> str | T: ...
def pop(self, key, /, default=NO_DEFAULT):
key = key.title()
if default is NO_DEFAULT:
self.__sensitive_map.pop(key)
return super().pop(key)
self.__sensitive_map.pop(key, default)
return super().pop(key, default)
def popitem(self) -> tuple[str, str]:
self.__sensitive_map.popitem()
return super().popitem()
@typing.overload
def setdefault(self, key: str, /) -> str: ...
@typing.overload
def setdefault(self, key: str, /, default) -> str: ...
def setdefault(self, key, /, default=None) -> str:
key = key.title()
if key in self.__sensitive_map:
return super().__getitem__(key)
self[key] = default or ''
return self[key]
def update(self, other, /, **kwargs) -> None:
if isinstance(other, type(self)):
other = other.sensitive()
if isinstance(other, collections.abc.Mapping):
for key, value in other.items():
self[key] = value
elif hasattr(other, 'keys'):
for key in other.keys(): # noqa: SIM118
self[key] = other[key]
else:
for key, value in other:
self[key] = value
for key, value in kwargs.items():
self[key] = value
std_headers = HTTPHeaderDict({