diff --git a/yt_dlp/jsinterp/__init__.py b/yt_dlp/jsinterp/__init__.py index 3daaba894..94b471329 100644 --- a/yt_dlp/jsinterp/__init__.py +++ b/yt_dlp/jsinterp/__init__.py @@ -1,6 +1,6 @@ from .native import JSInterpreter as NativeJSI from .external import PhantomJSwrapper, DenoJSI, PuppeteerJSI -from .common import _JSI_PREFERENCES, _JSI_HANDLERS, JSIDirector +from .common import _JSI_PREFERENCES, _JSI_HANDLERS, JSInterp __all__ = [ @@ -10,5 +10,5 @@ PuppeteerJSI, _JSI_HANDLERS, _JSI_PREFERENCES, - JSIDirector, + JSInterp, ] diff --git a/yt_dlp/jsinterp/common.py b/yt_dlp/jsinterp/common.py index 6651ee648..b898a2a21 100644 --- a/yt_dlp/jsinterp/common.py +++ b/yt_dlp/jsinterp/common.py @@ -2,8 +2,10 @@ import abc import typing +import functools -from ..utils import classproperty +from ..utils import classproperty, variadic, ExtractorError +from ..extractor.common import InfoExtractor DEFAULT_TIMEOUT = 10000 @@ -33,29 +35,36 @@ def join_jsi_name(jsi_list: typing.Iterable[str | type[JSI] | JSI], sep=', '): return sep.join(get_jsi_keys(jok if isinstance(jok, str) else jok.JSI_NAME for jok in jsi_list)) -class JSIExec(typing.Protocol): - @abc.abstractmethod - def execute(self, jscode: str) -> str: - """Execute JS code and return console.log contents, using `html` requires `dom` feature""" +def require_features(param_features: dict[str, str | typing.Iterable[str]]): + assert all(_ALL_FEATURES.issuperset(variadic(kw_feature)) for kw_feature in param_features.values()) + + def outer(func): + @functools.wraps(func) + def inner(self: JSInterp, *args, **kwargs): + for kw_name, kw_feature in param_features.items(): + if kw_name in kwargs and not self._features.issuperset(variadic(kw_feature)): + raise ExtractorError(f'feature {kw_feature} is required for `{kw_name}` param but not declared') + return func(self, *args, **kwargs) + return inner + return outer -class JSIDirector(JSIExec): - """JSIDirector class +class JSInterp: + """ + Helper class to forward JS interp request to a concrete JSI that supports it. - Helper class to forward JS interpretation need to a JSI that supports it. - - @param downloader: downloader instance. + @param dl_or_ie: `YoutubeDL` or `InfoExtractor` instance. @param features: list of features that JSI must support. - @param only_include: list of JSI to choose from. - @param exclude: list of JSI to avoid using. - @param jsi_params: extra parameters to pass to `JSI.__init__()`. + @param only_include: limit JSI to choose from. + @param exclude: JSI to avoid using. + @param jsi_params: extra kwargs to pass to `JSI.__init__()` for each JSI, using jsi key as dict key. @param preferred_order: list of JSI to use. First in list is tested first. @param fallback_jsi: list of JSI that may fail and should act non-fatal and fallback to other JSI. Pass `"all"` to always fallback - @param timeout: timeout in miliseconds for JS interpretation + @param timeout: explicit timeout parameter in miliseconds for all chosen JSI """ def __init__( self, - downloader: YoutubeDL, + dl_or_ie: YoutubeDL | InfoExtractor, features: typing.Iterable[str] = [], only_include: typing.Iterable[str | type[JSI]] = [], exclude: typing.Iterable[str | type[JSI]] = [], @@ -63,140 +72,135 @@ def __init__( preferred_order: typing.Iterable[str | type[JSI]] = [], fallback_jsi: typing.Iterable[str | type[JSI]] | typing.Literal['all'] = [], timeout: float | None = None, - verbose=False, ): - self._downloader = downloader - self._verbose = verbose - self._features = features + self._downloader: YoutubeDL = dl_or_ie._downloader if isinstance(dl_or_ie, InfoExtractor) else dl_or_ie + self._features = set(features) - jsi_keys = set(get_jsi_keys(only_include or _JSI_HANDLERS)) - set(get_jsi_keys(exclude)) + if unsupported_features := self._features - _ALL_FEATURES: + raise ExtractorError(f'Unsupported features: {unsupported_features}, allowed features: {_ALL_FEATURES}') + + jsi_keys = [key for key in get_jsi_keys(only_include or _JSI_HANDLERS) if key not in get_jsi_keys(exclude)] + self.write_debug(f'Allowed JSI keys: {jsi_keys}') handler_classes = [_JSI_HANDLERS[key] for key in jsi_keys - if _JSI_HANDLERS[key]._SUPPORTED_FEATURES.issuperset(features)] - if not handler_classes: - raise Exception(f'No JSI can be selected for features: {features}, ' - f'included: {get_jsi_keys(only_include) or "all"}, excluded: {get_jsi_keys(exclude)}') + if _JSI_HANDLERS[key]._SUPPORT_FEATURES.issuperset(self._features)] + self.write_debug(f'Selected JSI classes for given features: {get_jsi_keys(handler_classes)}, ' + f'included: {get_jsi_keys(only_include) or "all"}, excluded: {get_jsi_keys(exclude)}') - self._handler_dict = {cls.JSI_KEY: cls(downloader, timeout, **jsi_params.get(cls.JSI_KEY, {})) + self._handler_dict = {cls.JSI_KEY: cls(self._downloader, timeout, **jsi_params.get(cls.JSI_KEY, {})) for cls in handler_classes} self.preferences: set[JSIPreference] = {order_to_pref(preferred_order, 100)} | _JSI_PREFERENCES self._fallback_jsi = get_jsi_keys(handler_classes) if fallback_jsi == 'all' else get_jsi_keys(fallback_jsi) + self._is_test = self._downloader.params.get('test', False) def add_handler(self, handler: JSI): """Add a handler. If a handler of the same JSI_KEY exists, it will overwrite it""" assert isinstance(handler, JSI), 'handler must be a JSI instance' + if not handler._SUPPORT_FEATURES.issuperset(self._features): + raise ExtractorError(f'{handler.JSI_NAME} does not support all required features: {self._features}') self._handler_dict[handler.JSI_KEY] = handler - @property - def write_debug(self): - return self._downloader.write_debug + def write_debug(self, message, only_once=False): + return self._downloader.write_debug(f'[JSIDirector] {message}', only_once=only_once) - @property - def report_warning(self): - return self._downloader.report_warning + def report_warning(self, message, only_once=False): + return self._downloader.report_warning(f'[JSIDirector] {message}', only_once=only_once) - def _get_handlers(self, method: str, *args, **kwargs) -> list[JSI]: - handlers = [h for h in self._handler_dict.values() if getattr(h, method, None)] - self.write_debug(f'JSIDirector has handlers for `{method}`: {handlers}') + def _get_handlers(self, method_name: str, *args, **kwargs) -> list[JSI]: + handlers = [h for h in self._handler_dict.values() if callable(getattr(h, method_name, None))] + self.write_debug(f'Choosing handlers for method `{method_name}`: {get_jsi_keys(handlers)}') if not handlers: - raise Exception(f'No JSI supports method `{method}`, ' - f'included handlers: {[handler.JSI_KEY for handler in self._handler_dict.values()]}') + raise ExtractorError(f'No JSI supports method `{method_name}`, ' + f'included handlers: {get_jsi_keys(self._handler_dict.values())}') preferences = { - handler: sum(pref_func(handler, method, args, kwargs) for pref_func in self.preferences) + handler.JSI_KEY: sum(pref_func(handler, method_name, args, kwargs) for pref_func in self.preferences) for handler in handlers } - self._downloader.write_debug('JSI preferences for this request: {}'.format(', '.join( - f'{jsi.JSI_NAME}={pref}' for jsi, pref in preferences.items()))) + self.write_debug('JSI preferences for `{}` request: {}'.format( + method_name, ', '.join(f'{key}={pref}' for key, pref in preferences.items()))) - return sorted(self._handler_dict.values(), key=preferences.get, reverse=True) + return sorted(handlers, key=lambda h: preferences[h.JSI_KEY], reverse=True) - # def _send(self, request: JSIRequest): - # unavailable_handlers = [] - # exec_errors = [] - # for handler in self._get_handlers(request): - # if not handler.is_available: - # unavailable_handlers.append(handler) - # continue - # try: - # return handler.handle(request) - # except Exception as e: - # exec_errors.append(e) - # if not request.fallback: - # raise - # raise EvaluationError + def _dispatch_request(self, method_name: str, *args, **kwargs): + handlers = self._get_handlers(method_name, *args, **kwargs) - def _get_handler_method(method_name: str, feature_params: dict): - assert all(feature in _ALL_FEATURES for feature in feature_params.values()) + unavailable: list[JSI] = [] + exceptions: list[tuple[JSI, Exception]] = [] + test_results: list[tuple[JSI, typing.Any]] = [] - def handler(self: JSIDirector, *args, **kwargs): - for name, feature in feature_params.items(): - if name in kwargs and feature not in self._features: - raise Exception(f'feature {feature} is required for using {name} params ' - f'but not provided when instantiating {self.__class__.__name__}') - - unavailable: list[JSI] = [] - exceptions: list[tuple[JSI, Exception]] = [] - is_test = self._downloader.params.get('test', False) - results: list[tuple[JSI, typing.Any]] = [] - - for handler in self._get_handlers(method_name, *args, **kwargs): - if not handler.is_available: - if is_test: - raise Exception(f'{handler.JSI_NAME} is not available for testing, ' - f'add "{handler.JSI_KEY}" in `exclude` if it should not be used') - self.write_debug(f'{handler.JSI_NAME} is not available') - unavailable.append(handler) - continue - try: - self.write_debug(f'Dispatching `{method_name}` task to {handler.JSI_NAME}') - result = getattr(handler, method_name)(*args, **kwargs) - if is_test: - results.append((handler, result)) - else: - return result - except Exception as e: - if handler.JSI_KEY not in self._fallback_jsi: - raise - else: - exceptions.append((handler, e)) - self.write_debug(f'{handler.JSI_NAME} encountered error, fallback to next handler: {e}') - - if not is_test or not results: - if not exceptions: - msg = f'No available JSI installed, please install one of: {join_jsi_name(unavailable)}' + for handler in handlers: + if not handler.is_available(): + if self._is_test: + raise Exception(f'{handler.JSI_NAME} is not available for testing, ' + f'add "{handler.JSI_KEY}" in `exclude` if it should not be used') + self.write_debug(f'{handler.JSI_NAME} is not available') + unavailable.append(handler) + continue + try: + self.write_debug(f'Dispatching `{method_name}` task to {handler.JSI_NAME}') + result = getattr(handler, method_name)(*args, **kwargs) + if self._is_test: + test_results.append((handler, result)) else: - msg = f'Failed to perform {method_name}, total {len(exceptions)} errors' - if unavailable: - msg = f'{msg}. You can try installing one of unavailable JSI: {join_jsi_name(unavailable)}' - raise Exception(msg) + return result + except Exception as e: + if handler.JSI_KEY not in self._fallback_jsi: + raise + else: + exceptions.append((handler, e)) + self.write_debug(f'{handler.JSI_NAME} encountered error, fallback to next handler: {e}') - if is_test: - ref_handler, ref_result = results[0] - for handler, result in results[1:]: - if result != ref_result: - self.report_warning( - f'Different JSI results produced from {ref_handler.JSI_NAME} and {handler.JSI_NAME}') - return ref_result + if self._is_test and test_results: + ref_handler, ref_result = test_results[0] + for handler, result in test_results[1:]: + if result != ref_result: + self.report_warning( + f'Different JSI results produced from {ref_handler.JSI_NAME} and {handler.JSI_NAME}') + return ref_result - return handler + if not exceptions: + msg = f'No available JSI installed, please install one of: {join_jsi_name(unavailable)}' + else: + msg = f'Failed to perform {method_name}, total {len(exceptions)} errors' + if unavailable: + msg = f'{msg}. You can try installing one of unavailable JSI: {join_jsi_name(unavailable)}' + raise ExtractorError(msg) - execute = _get_handler_method('execute', {'html': 'dom'}) - evaluate = _get_handler_method('evaluate', {'html': 'dom'}) + @require_features({'html': 'dom'}) + def execute(self, jscode: str, url: str | None = None, html: str | None = None) -> str: + """ + Execute JS code and return stdout from console.log + `html` requires `dom` feature + """ + return self._dispatch_request('execute', jscode, url=url, html=html) + + @require_features({'html': 'dom'}) + def evaluate(self, jscode: str, url: str | None = None, html: str | None = None) -> typing.Any: + """ + Evaluate JS code and return result + `html` requires `dom` feature + """ + return self._dispatch_request('evaluate', jscode, url=url, html=html) class JSI(abc.ABC): - _SUPPORTED_FEATURES: set[str] = set() + _SUPPORT_FEATURES: set[str] = set() _BASE_PREFERENCE: int = 0 def __init__(self, downloader: YoutubeDL, timeout: float | int | None = None): self._downloader = downloader self.timeout = float(timeout or DEFAULT_TIMEOUT) - @property @abc.abstractmethod def is_available(self) -> bool: raise NotImplementedError + def write_debug(self, message, only_once=False): + return self._downloader.write_debug(f'[{self.JSI_KEY}] {message}', only_once=only_once) + + def report_warning(self, message, only_once=False): + return self._downloader.report_warning(f'[{self.JSI_KEY}] {message}', only_once=only_once) + @classproperty def JSI_NAME(cls) -> str: return cls.__name__[:-3] @@ -207,13 +211,13 @@ def JSI_KEY(cls) -> str: return cls.__name__[:-3] -def register_jsi(handler_cls: TYPE_JSI) -> TYPE_JSI: +def register_jsi(jsi_cls: JsiClass) -> JsiClass: """Register a JS interpreter class""" - assert issubclass(handler_cls, JSI), f'{handler_cls} must be a subclass of JSI' - assert handler_cls.JSI_KEY not in _JSI_HANDLERS, f'JSI {handler_cls.JSI_KEY} already registered' - assert handler_cls._SUPPORTED_FEATURES.issubset(_ALL_FEATURES), f'{handler_cls._SUPPORTED_FEATURES - _ALL_FEATURES} is not declared in `_All_FEATURES`' - _JSI_HANDLERS[handler_cls.JSI_KEY] = handler_cls - return handler_cls + assert issubclass(jsi_cls, JSI), f'{jsi_cls} must be a subclass of JSI' + assert jsi_cls.JSI_KEY not in _JSI_HANDLERS, f'JSI {jsi_cls.JSI_KEY} already registered' + assert jsi_cls._SUPPORT_FEATURES.issubset(_ALL_FEATURES), f'{jsi_cls._SUPPORT_FEATURES - _ALL_FEATURES} not declared in `_All_FEATURES`' + _JSI_HANDLERS[jsi_cls.JSI_KEY] = jsi_cls + return jsi_cls def register_jsi_preference(*handlers: type[JSI]): @@ -236,5 +240,8 @@ def _base_preference(handler: JSI, *args): if typing.TYPE_CHECKING: from ..YoutubeDL import YoutubeDL - JSIPreference = typing.Callable[[JSI, str, list, dict], int] - TYPE_JSI = typing.TypeVar('TYPE_JSI') + JsiClass = typing.TypeVar('JsiClass', bound=typing.Type[JSI]) + + class JSIPreference(typing.Protocol): + def __call__(self, handler: JSI, method_name: str, *args, **kwargs) -> int: + ... diff --git a/yt_dlp/jsinterp/external.py b/yt_dlp/jsinterp/external.py index b02b5e25c..36c668938 100644 --- a/yt_dlp/jsinterp/external.py +++ b/yt_dlp/jsinterp/external.py @@ -113,8 +113,8 @@ def exe(cls): return cls._EXE_NAME if cls.version else None @classproperty - def is_available(self): - return bool(self.exe) + def is_available(cls): + return bool(cls.exe) @register_jsi