From 0061546a8208d6322d0bbd02ecce808f79c12acb Mon Sep 17 00:00:00 2001 From: c-basalt <117849907+c-basalt@users.noreply.github.com> Date: Mon, 28 Apr 2025 23:43:07 -0400 Subject: [PATCH] remove features and prepare for plugin --- test/test_download.py | 19 +-- test/test_jsi_external.py | 277 ++++++++++++++++------------------ yt_dlp/extractor/iqiyi.py | 4 +- yt_dlp/globals.py | 2 + yt_dlp/jsinterp/__init__.py | 22 ++- yt_dlp/jsinterp/_deno.py | 11 +- yt_dlp/jsinterp/_phantomjs.py | 4 +- yt_dlp/jsinterp/common.py | 184 +++++++++------------- 8 files changed, 236 insertions(+), 287 deletions(-) diff --git a/test/test_download.py b/test/test_download.py index 7731726636..8bc5658ef2 100755 --- a/test/test_download.py +++ b/test/test_download.py @@ -25,7 +25,7 @@ import yt_dlp.YoutubeDL # isort: split from yt_dlp.extractor import get_info_extractor -from yt_dlp.jsinterp.common import filter_jsi_keys +from yt_dlp.jsinterp.common import get_included_jsi from yt_dlp.networking.exceptions import HTTPError, TransportError from yt_dlp.utils import ( DownloadError, @@ -85,16 +85,16 @@ def __str__(self): def generator(test_case, tname): - # setting `jsi_matrix` to True, `jsi_matrix_features` to list, or - # setting `jsi_matrix_only_include` or `jsi_matrix_exclude` to non-empty list + # setting `jsi_matrix` to True, or `jsi_matrix_only_include`, `jsi_matrix_exclude` to non-empty list # to trigger matrix behavior for JSI - if isinstance(test_case.get('jsi_matrix_features'), list) or any(test_case.get(key) for key in [ + if any(test_case.get(key) for key in [ 'jsi_matrix', 'jsi_matrix_only_include', 'jsi_matrix_exclude', ]): - jsi_keys = filter_jsi_keys( - test_case.get('jsi_matrix_features'), test_case.get('jsi_matrix_only_include'), - test_case.get('jsi_matrix_exclude')) + jsi_keys = list(get_included_jsi(only_include=test_case.get('jsi_matrix_only_include'), + exclude=test_case.get('jsi_matrix_exclude'))) + # use jsi_preference here, instead of force blocking other jsi runtimes + # exclusion, if needed, should be specified in test case to optimize testing def generate_jsi_sub_case(jsi_key): sub_case = filter_dict(test_case, lambda k, _: not k.startswith('jsi_matrix')) sub_case['params'] = {**test_case.get('params', {}), 'jsi_preference': [jsi_key]} @@ -102,8 +102,9 @@ def generate_jsi_sub_case(jsi_key): def run_sub_cases(self): for i, jsi_key in enumerate(jsi_keys): - print(f'Running case {tname} using JSI: {jsi_key} ({i + 1}/{len(jsi_keys)})') - generate_jsi_sub_case(jsi_key)(self) + with self.subTest(jsi_key): + print(f'Running case {tname} using JSI: {jsi_key} ({i + 1}/{len(jsi_keys)})') + generate_jsi_sub_case(jsi_key)(self) return run_sub_cases def test_template(self): diff --git a/test/test_jsi_external.py b/test/test_jsi_external.py index 02098a6a30..20f0b0c124 100644 --- a/test/test_jsi_external.py +++ b/test/test_jsi_external.py @@ -8,25 +8,21 @@ import sys import unittest import http.cookiejar +import functools +import typing sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from test.helper import ( - FakeYDL, -) -from yt_dlp.utils import ( - variadic, -) +from test.helper import FakeYDL from yt_dlp.cookies import YoutubeDLCookieJar -from yt_dlp.jsinterp import _JSI_HANDLERS -assert set(_JSI_HANDLERS) == {'Deno', 'DenoJSDom', 'PhantomJS'} -from yt_dlp.jsinterp.common import ExternalJSI, _ALL_FEATURES -from yt_dlp.jsinterp._deno import DenoJSI, DenoJSDomJSI -from yt_dlp.jsinterp._phantomjs import PhantomJSJSI +from yt_dlp.jsinterp.common import get_included_jsi from yt_dlp.jsinterp._helper import prepare_wasm_jsmodule +if typing.TYPE_CHECKING: + from yt_dlp.jsinterp.common import JSI + @dataclasses.dataclass class NetscapeFields: @@ -55,169 +51,148 @@ def __eq__(self, other: NetscapeFields | http.cookiejar.Cookie): return all(getattr(self, attr) == getattr(other, attr) for attr in ['name', 'value', 'domain', 'path', 'secure', 'expires']) -covered_features = set() - - -def requires_feature(features): - covered_features.update(variadic(features)) - - def outer(func): - def wrapper(self, *args, **kwargs): - if not self.jsi._SUPPORTED_FEATURES.issuperset(variadic(features)): - print(f'{self._JSI_CLASS.__name__} does not support {features!r}, skipping') - self.skipTest(f'{"&".join(variadic(features))} not supported') - return func(self, *args, **kwargs) +def test_jsi_rumtimes(exclude=[]): + def inner(func: typing.Callable[[unittest.TestCase, type[JSI]], None]): + @functools.wraps(func) + def wrapper(self: unittest.TestCase): + for key, jsi in get_included_jsi(exclude=exclude).items(): + with self.subTest(key): + func(self, jsi) return wrapper - return outer + return inner -class Base: - class TestExternalJSI(unittest.TestCase): - _JSI_CLASS: type[ExternalJSI] = None - _TESTDATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'testdata', 'jsi_external') - maxDiff = 2000 +class TestExternalJSI(unittest.TestCase): + _TESTDATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'testdata', 'jsi_external') + maxDiff = 2000 - def setUp(self): - print() - self.ydl = FakeYDL() - self.url_param = '' - if not self._JSI_CLASS.exe_version: - print(f'{self._JSI_CLASS.__name__} is not installed, skipping') - self.skipTest('Not available') + def setUp(self): + self.ydl = FakeYDL() - @property - def jsi(self): - return self._JSI_CLASS(self.ydl, self.url_param, 10, {}) + @test_jsi_rumtimes() + def test_execute(self, jsi_cls: type[JSI]): + jsi = jsi_cls(self.ydl, '', 10) + self.assertEqual(jsi.execute('console.log("Hello, world!");'), 'Hello, world!') - def test_execute(self): - self.assertEqual(self.jsi.execute('console.log("Hello, world!");'), 'Hello, world!') + @test_jsi_rumtimes() + def test_user_agent(self, jsi_cls: type[JSI]): + ua = self.ydl.params['http_headers']['User-Agent'] - def test_user_agent(self): - ua = self.ydl.params['http_headers']['User-Agent'] - self.assertEqual(self.jsi.execute('console.log(navigator.userAgent);'), ua) - self.assertNotEqual(self.jsi.execute('console.log(JSON.stringify(navigator.webdriver));'), 'true') + jsi = jsi_cls(self.ydl, '', 10) + self.assertEqual(jsi.execute('console.log(navigator.userAgent);'), ua) + self.assertNotEqual(jsi.execute('console.log(JSON.stringify(navigator.webdriver));'), 'true') - jsi = self._JSI_CLASS(self.ydl, self.url_param, 10, {}, user_agent='test/ua') - self.assertEqual(jsi.execute('console.log(navigator.userAgent);'), 'test/ua') + jsi = jsi_cls(self.ydl, '', 10, user_agent='test/ua') + self.assertEqual(jsi.execute('console.log(navigator.userAgent);'), 'test/ua') - @requires_feature('location') - def test_location(self): - self.url_param = 'https://example.com/123/456' - self.assertEqual(self.jsi.execute('console.log(JSON.stringify([location.href, location.hostname]));'), - '["https://example.com/123/456","example.com"]') + @test_jsi_rumtimes() + def test_location(self, jsi_cls: type[JSI]): + jsi = jsi_cls(self.ydl, 'https://example.com/123/456', 10) + self.assertEqual(jsi.execute('console.log(JSON.stringify([location.href, location.hostname]));'), + '["https://example.com/123/456","example.com"]') - @requires_feature('dom') - def test_execute_dom_parse(self): - self.assertEqual(self.jsi.execute( - 'console.log(document.getElementById("test-div").innerHTML);', - html='
Hello, world!
'), - 'Hello, world!') + @test_jsi_rumtimes(exclude=['Deno']) + def test_execute_dom_parse(self, jsi_cls: type[JSI]): + jsi = jsi_cls(self.ydl, '', 10) + self.assertEqual(jsi.execute( + 'console.log(document.getElementById("test-div").innerHTML);', + html='
Hello, world!
'), + 'Hello, world!') - @requires_feature('dom') - def test_execute_dom_script(self): - self.assertEqual(self.jsi.execute( - 'console.log(document.getElementById("test-div").innerHTML);', - html='''Hello, world! -
- - - '''), - 'Hello, world!') + @test_jsi_rumtimes(exclude=['Deno']) + def test_execute_dom_script(self, jsi_cls: type[JSI]): + jsi = jsi_cls(self.ydl, '', 10) + self.assertEqual(jsi.execute( + 'console.log(document.getElementById("test-div").innerHTML);', + html='''Hello, world! +
+ + + '''), + 'Hello, world!') - @requires_feature(['dom', 'location']) - def test_dom_location(self): - self.url_param = 'https://example.com/123/456' - self.assertEqual(self.jsi.execute( - 'console.log(document.getElementById("test-div").innerHTML);', - html=''' -
Hello, world!
'''), - 'example.com') + @test_jsi_rumtimes(exclude=['Deno']) + def test_dom_location(self, jsi_cls: type[JSI]): + jsi = jsi_cls(self.ydl, 'https://example.com/123/456', 10) + self.assertEqual(jsi.execute( + 'console.log(document.getElementById("test-div").innerHTML);', + html=''' +
Hello, world!
'''), + 'example.com') - @requires_feature('cookies') - def test_execute_cookiejar(self): - cookiejar = YoutubeDLCookieJar() - ref_cookiejar = YoutubeDLCookieJar() + @test_jsi_rumtimes(exclude=['Deno']) + def test_execute_cookiejar(self, jsi_cls: type[JSI]): + cookiejar = YoutubeDLCookieJar() + ref_cookiejar = YoutubeDLCookieJar() - def _assert_expected_execute(cookie_str, ref_cookie_str): - self.assertEqual(set(cookie_str.split('; ')), set(ref_cookie_str.split('; '))) - for cookie in cookiejar: - ref_cookie = next((c for c in ref_cookiejar if c.name == cookie.name - and c.domain == cookie.domain), None) - self.assertEqual(repr(cookie), repr(ref_cookie)) + def _assert_expected_execute(cookie_str, ref_cookie_str): + self.assertEqual(set(cookie_str.split('; ')), set(ref_cookie_str.split('; '))) + for cookie in cookiejar: + ref_cookie = next((c for c in ref_cookiejar if c.name == cookie.name + and c.domain == cookie.domain), None) + self.assertEqual(repr(cookie), repr(ref_cookie)) - for test_cookie in [ - NetscapeFields('test1', 'test1', '.example.com', '/', False, int(time.time()) + 1000), - NetscapeFields('test2', 'test2', '.example.com', '/', True, int(time.time()) + 1000), - NetscapeFields('test3', 'test3', '.example.com', '/123', False, int(time.time()) + 1000), - NetscapeFields('test4', 'test4', '.example.com', '/456', False, int(time.time()) + 1000), - NetscapeFields('test5', 'test5', '.example.com', '/123', True, int(time.time()) + 1000), - NetscapeFields('test6', 'test6', '.example.com', '/456', True, int(time.time()) + 1000), - NetscapeFields('test1', 'other1', '.other.com', '/', False, int(time.time()) + 1000), - NetscapeFields('test2', 'other2', '.other.com', '/', False, int(time.time()) + 1000), - NetscapeFields('test7', 'other7', '.other.com', '/', False, int(time.time()) + 1000), - ]: - cookiejar.set_cookie(test_cookie.to_cookie()) - ref_cookiejar.set_cookie(test_cookie.to_cookie()) + for test_cookie in [ + NetscapeFields('test1', 'test1', '.example.com', '/', False, int(time.time()) + 1000), + NetscapeFields('test2', 'test2', '.example.com', '/', True, int(time.time()) + 1000), + NetscapeFields('test3', 'test3', '.example.com', '/123', False, int(time.time()) + 1000), + NetscapeFields('test4', 'test4', '.example.com', '/456', False, int(time.time()) + 1000), + NetscapeFields('test5', 'test5', '.example.com', '/123', True, int(time.time()) + 1000), + NetscapeFields('test6', 'test6', '.example.com', '/456', True, int(time.time()) + 1000), + NetscapeFields('test1', 'other1', '.other.com', '/', False, int(time.time()) + 1000), + NetscapeFields('test2', 'other2', '.other.com', '/', False, int(time.time()) + 1000), + NetscapeFields('test7', 'other7', '.other.com', '/', False, int(time.time()) + 1000), + ]: + cookiejar.set_cookie(test_cookie.to_cookie()) + ref_cookiejar.set_cookie(test_cookie.to_cookie()) - # test identity without modification from js - self.url_param = 'http://example.com/123/456' - _assert_expected_execute(self.jsi.execute( - 'console.log(document.cookie);', cookiejar=cookiejar), - 'test1=test1; test3=test3') + # test identity without modification from js + jsi = jsi_cls(self.ydl, 'http://example.com/123/456', 10) + _assert_expected_execute(jsi.execute( + 'console.log(document.cookie);', cookiejar=cookiejar), + 'test1=test1; test3=test3') - # test modification of existing cookie from js - new_cookie_1 = NetscapeFields('test1', 'new1', '.example.com', '/', True, int(time.time()) + 900) - new_cookie_2 = NetscapeFields('test2', 'new2', '.example.com', '/', True, int(time.time()) + 900) - ref_cookiejar.set_cookie(new_cookie_1.to_cookie()) - ref_cookiejar.set_cookie(new_cookie_2.to_cookie()) - self.url_param = 'https://example.com/123/456' - _assert_expected_execute(self.jsi.execute( - f'''document.cookie = "test1=new1; secure; expires={new_cookie_1.expire_str()}; domain=.example.com; path=/"; - console.log(document.cookie);''', - html=f'''
Hello, world!
- - ''', - cookiejar=cookiejar), - 'test1=new1; test2=new2; test3=test3; test5=test5') + # test modification of existing cookie from js + new_cookie_1 = NetscapeFields('test1', 'new1', '.example.com', '/', True, int(time.time()) + 900) + new_cookie_2 = NetscapeFields('test2', 'new2', '.example.com', '/', True, int(time.time()) + 900) + ref_cookiejar.set_cookie(new_cookie_1.to_cookie()) + ref_cookiejar.set_cookie(new_cookie_2.to_cookie()) - @requires_feature('wasm') - def test_wasm(self): - with open(os.path.join(self._TESTDATA_DIR, 'hello_wasm.js')) as f: - js_mod = f.read() - with open(os.path.join(self._TESTDATA_DIR, 'hello_wasm_bg.wasm'), 'rb') as f: - wasm = f.read() + # change to https url to test secure-domain behavior + jsi = jsi_cls(self.ydl, 'https://example.com/123/456', 10) + _assert_expected_execute(jsi.execute( + f'''document.cookie = "test1=new1; secure; expires={new_cookie_1.expire_str()}; domain=.example.com; path=/"; + console.log(document.cookie);''', + html=f'''
Hello, world!
+ + ''', + cookiejar=cookiejar), + 'test1=new1; test2=new2; test3=test3; test5=test5') - js_base = prepare_wasm_jsmodule(js_mod, wasm) + @test_jsi_rumtimes(exclude=['PhantomJS']) + def test_wasm(self, jsi_cls: type[JSI]): + with open(os.path.join(self._TESTDATA_DIR, 'hello_wasm.js')) as f: + js_mod = f.read() + with open(os.path.join(self._TESTDATA_DIR, 'hello_wasm_bg.wasm'), 'rb') as f: + wasm = f.read() - js_code = js_base + '''; - console.log(add(1, 2)); - greet('world'); - ''' + js_base = prepare_wasm_jsmodule(js_mod, wasm) - self.assertEqual(self.jsi.execute(js_code), '3\nHello, world!') + js_code = js_base + '''; + console.log(add(1, 2)); + greet('world'); + ''' + jsi = jsi_cls(self.ydl, '', 10) + self.assertEqual(jsi.execute(js_code), '3\nHello, world!') -class TestDeno(Base.TestExternalJSI): - _JSI_CLASS = DenoJSI - - -class TestDenoDom(Base.TestExternalJSI): - _JSI_CLASS = DenoJSDomJSI - - -class TestPhantomJS(Base.TestExternalJSI): - _JSI_CLASS = PhantomJSJSI - - -expect_covered_features = set(_ALL_FEATURES) -assert covered_features.issuperset(expect_covered_features), f'Missing tests for features: {expect_covered_features - covered_features}' if __name__ == '__main__': unittest.main() diff --git a/yt_dlp/extractor/iqiyi.py b/yt_dlp/extractor/iqiyi.py index 813984769f..81b18e3e45 100644 --- a/yt_dlp/extractor/iqiyi.py +++ b/yt_dlp/extractor/iqiyi.py @@ -417,7 +417,7 @@ class IqIE(InfoExtractor): 'cast': ['Sangmin Choi', 'Ratana Aiamsaart'], }, 'expected_warnings': ['format is restricted'], - 'jsi_matrix_features': ['dom'], + 'jsi_matrix': True, }, { 'url': 'https://www.iq.com/play/one-piece-episode-1000-1ma1i6ferf4', 'md5': '2d7caf6eeca8a32b407094b33b757d39', @@ -616,7 +616,7 @@ def _real_extract(self, url): else: ut_list = ['0'] - jsi = JSIWrapper(self, url, ['dom'], timeout=120) + jsi = JSIWrapper(self, url, timeout=120) # bid 0 as an initial format checker dash_paths = self._parse_json(jsi.execute(self._DASH_JS % { diff --git a/yt_dlp/globals.py b/yt_dlp/globals.py index 0cf276cc9e..a5a3b228d1 100644 --- a/yt_dlp/globals.py +++ b/yt_dlp/globals.py @@ -15,6 +15,7 @@ def __repr__(self, /): postprocessors = Indirect({}) extractors = Indirect({}) +jsi_runtimes = Indirect({}) # Plugins all_plugins_loaded = Indirect(False) @@ -23,6 +24,7 @@ def __repr__(self, /): plugin_ies = Indirect({}) plugin_pps = Indirect({}) +plugin_jsis = Indirect({}) plugin_ies_overrides = Indirect(defaultdict(list)) # Misc diff --git a/yt_dlp/jsinterp/__init__.py b/yt_dlp/jsinterp/__init__.py index 8133cfeef7..0001ee294d 100644 --- a/yt_dlp/jsinterp/__init__.py +++ b/yt_dlp/jsinterp/__init__.py @@ -1,14 +1,28 @@ # flake8: noqa: F401 from .native import JSInterpreter -from .common import _JSI_PREFERENCES, _JSI_HANDLERS, JSIWrapper -from ._phantomjs import PhantomJSwrapper -from . import _deno # ensure jsi registration +from .common import _JSI_PREFERENCES, JSIWrapper +from ._phantomjs import PhantomJSJSI, PhantomJSwrapper +from ._deno import DenoJSI, DenoJSDomJSI +from ..globals import jsi_runtimes, plugin_jsis +from ..plugins import PluginSpec, register_plugin_spec +jsi_runtimes.value.update({ + name: value + for name, value in globals().items() + if name.endswith('JSI') +}) + +plugin_spec = PluginSpec( + module_name='jsinterp', + suffix='JSI', + destination=jsi_runtimes, + plugin_destination=plugin_jsis, +) +register_plugin_spec(plugin_spec) __all__ = [ JSInterpreter, PhantomJSwrapper, - _JSI_HANDLERS, _JSI_PREFERENCES, JSIWrapper, ] diff --git a/yt_dlp/jsinterp/_deno.py b/yt_dlp/jsinterp/_deno.py index 03e241bf54..8b13646f5b 100644 --- a/yt_dlp/jsinterp/_deno.py +++ b/yt_dlp/jsinterp/_deno.py @@ -16,13 +16,11 @@ unified_timestamp, ) from ._helper import TempFileWrapper, random_string, override_navigator_js, extract_script_tags -from .common import ExternalJSI, register_jsi +from .common import ExternalJSI -@register_jsi class DenoJSI(ExternalJSI): """JS interpreter class using Deno binary""" - _SUPPORTED_FEATURES = {'wasm', 'location'} _BASE_PREFERENCE = 5 _EXE_NAME = 'deno' _DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check'] @@ -58,9 +56,7 @@ def execute(self, jscode, video_id=None, note='Executing JS in Deno'): return self._run_deno(cmd) -@register_jsi class DenoJSDomJSI(DenoJSI): - _SUPPORTED_FEATURES = {'wasm', 'location', 'dom', 'cookies'} _BASE_PREFERENCE = 4 _DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check'] _JSDOM_IMPORT_CHECKED = False @@ -112,8 +108,7 @@ def apply_cookies(cookiejar: YoutubeDLCookieJar | None, cookies: list[dict]): def _ensure_jsdom(self): if self._JSDOM_IMPORT_CHECKED: return - cmd = [self.exe, 'cache', self._JSDOM_URL] - self._run_deno(cmd) + self._run_deno([self.exe, 'cache', self._JSDOM_URL]) self._JSDOM_IMPORT_CHECKED = True def execute(self, jscode, video_id=None, note='Executing JS in Deno with jsdom', html='', cookiejar=None): @@ -180,7 +175,7 @@ def execute(self, jscode, video_id=None, note='Executing JS in Deno with jsdom', ''' # https://github.com/prebuild/node-gyp-build/blob/6822ec5/node-gyp-build.js#L196-L198 - # This jsdom dependency raises fatal error on linux unless read permission is provided + # This jsdom dependency raises fatal error on linux unless read for this file is allowed read_flag = ['--allow-read=/etc/alpine-release'] if platform.system() == 'Linux' else [] location_args = ['--location', self._url] if self._url else [] diff --git a/yt_dlp/jsinterp/_phantomjs.py b/yt_dlp/jsinterp/_phantomjs.py index e48ded44d4..ccd2550529 100644 --- a/yt_dlp/jsinterp/_phantomjs.py +++ b/yt_dlp/jsinterp/_phantomjs.py @@ -17,13 +17,11 @@ shell_quote, ) from ._helper import TempFileWrapper, random_string, extract_script_tags -from .common import ExternalJSI, register_jsi +from .common import ExternalJSI -@register_jsi class PhantomJSJSI(ExternalJSI): _EXE_NAME = 'phantomjs' - _SUPPORTED_FEATURES = {'location', 'cookies', 'dom'} _BASE_PREFERENCE = 3 _BASE_JS = R''' diff --git a/yt_dlp/jsinterp/common.py b/yt_dlp/jsinterp/common.py index 248fbe5569..8292fda81c 100644 --- a/yt_dlp/jsinterp/common.py +++ b/yt_dlp/jsinterp/common.py @@ -2,60 +2,41 @@ import abc import typing -import functools +import inspect +from ..globals import jsi_runtimes from ..extractor.common import InfoExtractor from ..utils import ( classproperty, format_field, filter_dict, get_exe_version, - variadic, url_or_none, sanitize_url, ExtractorError, ) - -_JSI_HANDLERS: dict[str, type[JSI]] = {} _JSI_PREFERENCES: set[JSIPreference] = set() -_ALL_FEATURES = { - 'wasm', - 'location', - 'dom', - 'cookies', -} -def get_jsi_keys(jsi_or_keys: typing.Iterable[str | type[JSI] | JSI]) -> list[str]: +def all_handlers() -> dict[str, type[JSI]]: + return {jsi.JSI_KEY: jsi for jsi in jsi_runtimes.value.values()} + + +def to_jsi_keys(jsi_or_keys: typing.Iterable[str | type[JSI] | JSI]) -> list[str]: return [jok if isinstance(jok, str) else jok.JSI_KEY for jok in jsi_or_keys] -def filter_jsi_keys(features=None, only_include=None, exclude=None): - keys = list(_JSI_HANDLERS) - if features: - keys = [key for key in keys if key in _JSI_HANDLERS - and _JSI_HANDLERS[key]._SUPPORTED_FEATURES.issuperset(features)] - if only_include: - keys = [key for key in keys if key in get_jsi_keys(only_include)] - if exclude: - keys = [key for key in keys if key not in get_jsi_keys(exclude)] - return keys - - -def filter_jsi_include(only_include: typing.Iterable[str] | None, exclude: typing.Iterable[str] | None): - keys = get_jsi_keys(only_include) if only_include else _JSI_HANDLERS.keys() - return [key for key in keys if key not in (exclude or [])] - - -def filter_jsi_feature(features: typing.Iterable[str], keys=None): - keys = keys if keys is not None else _JSI_HANDLERS.keys() - return [key for key in keys if key in _JSI_HANDLERS - and _JSI_HANDLERS[key]._SUPPORTED_FEATURES.issuperset(features)] +def get_included_jsi(only_include=None, exclude=None): + return { + key: value for key, value in all_handlers().items() + if (not only_include or key in to_jsi_keys(only_include)) + and (not exclude or key not in to_jsi_keys(exclude)) + } def order_to_pref(jsi_order: typing.Iterable[str | type[JSI] | JSI], multiplier: int) -> JSIPreference: - jsi_order = reversed(get_jsi_keys(jsi_order)) + jsi_order = reversed(to_jsi_keys(jsi_order)) pref_score = {jsi_cls: (i + 1) * multiplier for i, jsi_cls in enumerate(jsi_order)} def _pref(jsi: JSI, *args): @@ -63,20 +44,6 @@ def _pref(jsi: JSI, *args): return _pref -def require_features(param_features: dict[str, str | typing.Iterable[str]]): - assert all(_ALL_FEATURES.issuperset(variadic(kw_feature)) for kw_feature in param_features.values()) - - def outer(func): - @functools.wraps(func) - def inner(self: JSIWrapper, *args, **kwargs): - for kw_name, kw_feature in param_features.items(): - if kw_name in kwargs and not self._features.issuperset(variadic(kw_feature)): - raise ExtractorError(f'feature {kw_feature} is required for `{kw_name}` param but not declared') - return func(self, *args, **kwargs) - return inner - return outer - - class JSIWrapper: """ Helper class to forward JS interp request to a JSI that supports it. @@ -85,25 +52,17 @@ class JSIWrapper: ``` def _real_extract(self, url): ... - jsi = JSIWrapper(self, url, features=['js']) + jsi = JSIWrapper(self, url) result = jsi.execute(jscode, video_id) ... ``` - Features: - - `wasm`: supports window.WebAssembly - - `location`: supports mocking window.location - - `dom`: supports DOM interface (not necessarily rendering) - - `cookies`: supports document.cookie read & write - @param dl_or_ie: `YoutubeDL` or `InfoExtractor` instance. - @param url: setting url context, used by JSI that supports `location` feature - @param features: only JSI that supports all of these features will be selected + @param url: setting url context @param only_include: limit JSI to choose from. @param exclude: JSI to avoid using. @param jsi_params: extra kwargs to pass to `JSI.__init__()` for each JSI, using jsi key as dict key. @param preferred_order: list of JSI to use. First in list is tested first. - @param fallback_jsi: list of JSI that may fail and should act non-fatal and fallback to other JSI. Pass `"all"` to always fallback @param timeout: timeout parameter for all chosen JSI @param user_agent: override user-agent to use for supported JSI """ @@ -112,46 +71,57 @@ def __init__( self, dl_or_ie: YoutubeDL | InfoExtractor, url: str = '', - features: typing.Iterable[str] = [], only_include: typing.Iterable[str | type[JSI]] = [], exclude: typing.Iterable[str | type[JSI]] = [], jsi_params: dict[str, dict] = {}, preferred_order: typing.Iterable[str | type[JSI]] = [], - fallback_jsi: typing.Iterable[str | type[JSI]] | typing.Literal['all'] = [], timeout: float | int = 10, user_agent: str | None = None, ): - self._downloader: YoutubeDL = dl_or_ie._downloader if isinstance(dl_or_ie, InfoExtractor) else dl_or_ie - self._url = sanitize_url(url_or_none(url)) or '' - self._features = set(features) - if url and not self._url: - self.report_warning(f'Invalid URL: "{url}", using empty string instead') + if isinstance(dl_or_ie, InfoExtractor): + self._downloader = dl_or_ie._downloader + self._ie_key = dl_or_ie.ie_key() + else: + self._downloader = dl_or_ie + self._ie_key = None - if unsupported_features := self._features - _ALL_FEATURES: - raise ExtractorError(f'Unsupported features: {unsupported_features}, allowed features: {_ALL_FEATURES}') + self._url = self._sanitize_url(url) + self.preferences: set[JSIPreference] = { + order_to_pref(self._load_pref_from_option(), 10000), + order_to_pref(preferred_order, 100) + } | _JSI_PREFERENCES - user_prefs = self._downloader.params.get('jsi_preference', []) - for invalid_key in [jsi_key for jsi_key in user_prefs if jsi_key not in _JSI_HANDLERS]: - self.report_warning(f'`{invalid_key}` is not a valid JSI, ignoring preference setting') - user_prefs.remove(invalid_key) - - handler_classes = [_JSI_HANDLERS[key] for key in filter_jsi_keys(self._features, only_include, exclude)] - self.write_debug(f'Select JSI for features={self._features}: {get_jsi_keys(handler_classes)}, ' - f'included: {get_jsi_keys(only_include) or "all"}, excluded: {get_jsi_keys(exclude)}') + handler_classes = self._load_allowed_jsi_cls(only_include, exclude) if not handler_classes: - raise ExtractorError(f'No JSI supports features={self._features}') + raise ExtractorError('No JSI is allowed to use') self._handler_dict = {cls.JSI_KEY: cls( - self._downloader, url=self._url, timeout=timeout, features=self._features, + self._downloader, url=self._url, timeout=timeout, user_agent=user_agent, **jsi_params.get(cls.JSI_KEY, {}), - ) for cls in handler_classes} + ) for cls in handler_classes.values()} - self.preferences: set[JSIPreference] = { - order_to_pref(user_prefs, 10000), order_to_pref(preferred_order, 100)} | _JSI_PREFERENCES - - self._fallback_jsi = get_jsi_keys(handler_classes) if fallback_jsi == 'all' else get_jsi_keys(fallback_jsi) self._is_test = self._downloader.params.get('test', False) + def _sanitize_url(self, url): + sanitized = sanitize_url(url_or_none(url)) or '' + if url and not sanitized: + self.report_warning(f'Invalid URL: "{url}", using empty string instead') + return sanitized + + def _load_pref_from_option(self): + user_prefs = self._downloader.params.get('jsi_preference', []) + valid_handlers = list(all_handlers()) + for invalid_key in [jsi_key for jsi_key in user_prefs if jsi_key not in valid_handlers]: + self.report_warning(f'`{invalid_key}` is not a valid JSI, ignoring preference setting') + user_prefs.remove(invalid_key) + return user_prefs + + def _load_allowed_jsi_cls(self, only_include, exclude): + handler_classes = get_included_jsi(only_include, exclude) + self.write_debug(f'Select JSI: {to_jsi_keys(handler_classes)}, ' + f'included: {to_jsi_keys(only_include) or "all"}, excluded: {to_jsi_keys(exclude)}') + return handler_classes + def write_debug(self, message, only_once=False): return self._downloader.write_debug(f'[JSIDirector] {message}', only_once=only_once) @@ -159,11 +129,19 @@ def report_warning(self, message, only_once=False): return self._downloader.report_warning(f'[JSIDirector] {message}', only_once=only_once) def _get_handlers(self, method_name: str, *args, **kwargs) -> list[JSI]: - handlers = [h for h in self._handler_dict.values() if callable(getattr(h, method_name, None))] - self.write_debug(f'Choosing handlers for method `{method_name}`: {get_jsi_keys(handlers)}') + def _supports(jsi: JSI): + if not callable(method := getattr(jsi, method_name, None)): + return False + method_params = inspect.signature(method).parameters + return all(key in method_params for key in kwargs) + + handlers = [h for h in self._handler_dict.values() if _supports(h)] + self.write_debug(f'Choosing handlers for method `{method_name}` with kwargs {list(kwargs)}' + f': {to_jsi_keys(handlers)}') + if not handlers: - raise ExtractorError(f'No JSI supports method `{method_name}`, ' - f'included handlers: {get_jsi_keys(self._handler_dict.values())}') + raise ExtractorError(f'No JSI supports method `{method_name}` with kwargs {list(kwargs)}, ' + f'included handlers: {to_jsi_keys(self._handler_dict.values())}') preferences = { handler.JSI_KEY: sum(pref_func(handler, method_name, args, kwargs) for pref_func in self.preferences) @@ -188,25 +166,25 @@ def _dispatch_request(self, method_name: str, *args, **kwargs): self.write_debug(f'{handler.JSI_KEY} is not available') unavailable.append(handler.JSI_NAME) continue + try: self.write_debug(f'Dispatching `{method_name}` task to {handler.JSI_NAME}') return getattr(handler, method_name)(*args, **kwargs) except ExtractorError as e: - if handler.JSI_KEY not in self._fallback_jsi: - raise - else: - exceptions.append((handler, e)) - self.write_debug(f'{handler.JSI_NAME} encountered error, fallback to next handler: {e}') + if self._is_test: + raise ExtractorError(f'{handler.JSI_NAME} got error while evaluating js, ' + f'add "{handler.JSI_KEY}" in `exclude` if it should not be used') + exceptions.append((handler, e)) + self.write_debug(f'{handler.JSI_NAME} encountered error, fallback to next handler: {e}') if not exceptions: msg = f'No available JSI installed, please install one of: {", ".join(unavailable)}' else: msg = f'Failed to perform {method_name}, total {len(exceptions)} errors' if unavailable: - msg = f'{msg}. You can try installing one of unavailable JSI: {", ".join(unavailable)}' + msg = f'{msg}. You may try installing one of unavailable JSI: {", ".join(unavailable)}' raise ExtractorError(msg) - @require_features({'location': 'location', 'html': 'dom', 'cookiejar': 'cookies'}) def execute(self, jscode: str, video_id: str | None, note: str | None = None, html: str | None = None, cookiejar: YoutubeDLCookieJar | None = None) -> str: """ @@ -215,24 +193,20 @@ def execute(self, jscode: str, video_id: str | None, note: str | None = None, @param jscode: JS code to execute @param video_id @param note - @param html: html to load as document, requires `dom` feature - @param cookiejar: cookiejar to read and set cookies, requires `cookies` feature, pass `InfoExtractor.cookiejar` if you want to read and write cookies + @param html: html to load as document + @param cookiejar: cookiejar to read and set cookies, pass `InfoExtractor.cookiejar` if you want to read and write cookies """ return self._dispatch_request('execute', jscode, video_id, **filter_dict({ 'note': note, 'html': html, 'cookiejar': cookiejar})) class JSI(abc.ABC): - _SUPPORTED_FEATURES: set[str] = set() _BASE_PREFERENCE: int = 0 - def __init__(self, downloader: YoutubeDL, url: str, timeout: float | int, features: set[str], user_agent=None): - if not self._SUPPORTED_FEATURES.issuperset(features): - raise ExtractorError(f'{self.JSI_NAME} does not support all required features: {features}') + def __init__(self, downloader: YoutubeDL, url: str, timeout: float | int, user_agent=None): self._downloader = downloader self._url = url self.timeout = timeout - self.features = features self.user_agent: str = user_agent or self._downloader.params['http_headers']['User-Agent'] @abc.abstractmethod @@ -277,15 +251,6 @@ def is_available(cls): return bool(cls.exe) -def register_jsi(jsi_cls: JsiClass) -> JsiClass: - """Register a JS interpreter class""" - assert issubclass(jsi_cls, JSI), f'{jsi_cls} must be a subclass of JSI' - assert jsi_cls.JSI_KEY not in _JSI_HANDLERS, f'JSI {jsi_cls.JSI_KEY} already registered' - assert jsi_cls._SUPPORTED_FEATURES.issubset(_ALL_FEATURES), f'{jsi_cls._SUPPORTED_FEATURES - _ALL_FEATURES} not declared in `_All_FEATURES`' - _JSI_HANDLERS[jsi_cls.JSI_KEY] = jsi_cls - return jsi_cls - - def register_jsi_preference(*handlers: type[JSI]): assert all(issubclass(handler, JSI) for handler in handlers), f'{handlers} must all be a subclass of JSI' @@ -301,13 +266,12 @@ def inner(handler: JSI, *args): @register_jsi_preference() def _base_preference(handler: JSI, *args): - return getattr(handler, '_BASE_PREFERENCE', 0) + return min(10, getattr(handler, '_BASE_PREFERENCE', 0)) if typing.TYPE_CHECKING: from ..YoutubeDL import YoutubeDL from ..cookies import YoutubeDLCookieJar - JsiClass = typing.TypeVar('JsiClass', bound=type[JSI]) class JSIPreference(typing.Protocol): def __call__(self, handler: JSI, method_name: str, *args, **kwargs) -> int: