diff --git a/test/test_download.py b/test/test_download.py
index 7731726636..8bc5658ef2 100755
--- a/test/test_download.py
+++ b/test/test_download.py
@@ -25,7 +25,7 @@
import yt_dlp.YoutubeDL # isort: split
from yt_dlp.extractor import get_info_extractor
-from yt_dlp.jsinterp.common import filter_jsi_keys
+from yt_dlp.jsinterp.common import get_included_jsi
from yt_dlp.networking.exceptions import HTTPError, TransportError
from yt_dlp.utils import (
DownloadError,
@@ -85,16 +85,16 @@ def __str__(self):
def generator(test_case, tname):
- # setting `jsi_matrix` to True, `jsi_matrix_features` to list, or
- # setting `jsi_matrix_only_include` or `jsi_matrix_exclude` to non-empty list
+ # setting `jsi_matrix` to True, or `jsi_matrix_only_include`, `jsi_matrix_exclude` to non-empty list
# to trigger matrix behavior for JSI
- if isinstance(test_case.get('jsi_matrix_features'), list) or any(test_case.get(key) for key in [
+ if any(test_case.get(key) for key in [
'jsi_matrix', 'jsi_matrix_only_include', 'jsi_matrix_exclude',
]):
- jsi_keys = filter_jsi_keys(
- test_case.get('jsi_matrix_features'), test_case.get('jsi_matrix_only_include'),
- test_case.get('jsi_matrix_exclude'))
+ jsi_keys = list(get_included_jsi(only_include=test_case.get('jsi_matrix_only_include'),
+ exclude=test_case.get('jsi_matrix_exclude')))
+ # use jsi_preference here, instead of force blocking other jsi runtimes
+ # exclusion, if needed, should be specified in test case to optimize testing
def generate_jsi_sub_case(jsi_key):
sub_case = filter_dict(test_case, lambda k, _: not k.startswith('jsi_matrix'))
sub_case['params'] = {**test_case.get('params', {}), 'jsi_preference': [jsi_key]}
@@ -102,8 +102,9 @@ def generate_jsi_sub_case(jsi_key):
def run_sub_cases(self):
for i, jsi_key in enumerate(jsi_keys):
- print(f'Running case {tname} using JSI: {jsi_key} ({i + 1}/{len(jsi_keys)})')
- generate_jsi_sub_case(jsi_key)(self)
+ with self.subTest(jsi_key):
+ print(f'Running case {tname} using JSI: {jsi_key} ({i + 1}/{len(jsi_keys)})')
+ generate_jsi_sub_case(jsi_key)(self)
return run_sub_cases
def test_template(self):
diff --git a/test/test_jsi_external.py b/test/test_jsi_external.py
index 02098a6a30..20f0b0c124 100644
--- a/test/test_jsi_external.py
+++ b/test/test_jsi_external.py
@@ -8,25 +8,21 @@
import sys
import unittest
import http.cookiejar
+import functools
+import typing
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-from test.helper import (
- FakeYDL,
-)
-from yt_dlp.utils import (
- variadic,
-)
+from test.helper import FakeYDL
from yt_dlp.cookies import YoutubeDLCookieJar
-from yt_dlp.jsinterp import _JSI_HANDLERS
-assert set(_JSI_HANDLERS) == {'Deno', 'DenoJSDom', 'PhantomJS'}
-from yt_dlp.jsinterp.common import ExternalJSI, _ALL_FEATURES
-from yt_dlp.jsinterp._deno import DenoJSI, DenoJSDomJSI
-from yt_dlp.jsinterp._phantomjs import PhantomJSJSI
+from yt_dlp.jsinterp.common import get_included_jsi
from yt_dlp.jsinterp._helper import prepare_wasm_jsmodule
+if typing.TYPE_CHECKING:
+ from yt_dlp.jsinterp.common import JSI
+
@dataclasses.dataclass
class NetscapeFields:
@@ -55,169 +51,148 @@ def __eq__(self, other: NetscapeFields | http.cookiejar.Cookie):
return all(getattr(self, attr) == getattr(other, attr) for attr in ['name', 'value', 'domain', 'path', 'secure', 'expires'])
-covered_features = set()
-
-
-def requires_feature(features):
- covered_features.update(variadic(features))
-
- def outer(func):
- def wrapper(self, *args, **kwargs):
- if not self.jsi._SUPPORTED_FEATURES.issuperset(variadic(features)):
- print(f'{self._JSI_CLASS.__name__} does not support {features!r}, skipping')
- self.skipTest(f'{"&".join(variadic(features))} not supported')
- return func(self, *args, **kwargs)
+def test_jsi_rumtimes(exclude=[]):
+ def inner(func: typing.Callable[[unittest.TestCase, type[JSI]], None]):
+ @functools.wraps(func)
+ def wrapper(self: unittest.TestCase):
+ for key, jsi in get_included_jsi(exclude=exclude).items():
+ with self.subTest(key):
+ func(self, jsi)
return wrapper
- return outer
+ return inner
-class Base:
- class TestExternalJSI(unittest.TestCase):
- _JSI_CLASS: type[ExternalJSI] = None
- _TESTDATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'testdata', 'jsi_external')
- maxDiff = 2000
+class TestExternalJSI(unittest.TestCase):
+ _TESTDATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'testdata', 'jsi_external')
+ maxDiff = 2000
- def setUp(self):
- print()
- self.ydl = FakeYDL()
- self.url_param = ''
- if not self._JSI_CLASS.exe_version:
- print(f'{self._JSI_CLASS.__name__} is not installed, skipping')
- self.skipTest('Not available')
+ def setUp(self):
+ self.ydl = FakeYDL()
- @property
- def jsi(self):
- return self._JSI_CLASS(self.ydl, self.url_param, 10, {})
+ @test_jsi_rumtimes()
+ def test_execute(self, jsi_cls: type[JSI]):
+ jsi = jsi_cls(self.ydl, '', 10)
+ self.assertEqual(jsi.execute('console.log("Hello, world!");'), 'Hello, world!')
- def test_execute(self):
- self.assertEqual(self.jsi.execute('console.log("Hello, world!");'), 'Hello, world!')
+ @test_jsi_rumtimes()
+ def test_user_agent(self, jsi_cls: type[JSI]):
+ ua = self.ydl.params['http_headers']['User-Agent']
- def test_user_agent(self):
- ua = self.ydl.params['http_headers']['User-Agent']
- self.assertEqual(self.jsi.execute('console.log(navigator.userAgent);'), ua)
- self.assertNotEqual(self.jsi.execute('console.log(JSON.stringify(navigator.webdriver));'), 'true')
+ jsi = jsi_cls(self.ydl, '', 10)
+ self.assertEqual(jsi.execute('console.log(navigator.userAgent);'), ua)
+ self.assertNotEqual(jsi.execute('console.log(JSON.stringify(navigator.webdriver));'), 'true')
- jsi = self._JSI_CLASS(self.ydl, self.url_param, 10, {}, user_agent='test/ua')
- self.assertEqual(jsi.execute('console.log(navigator.userAgent);'), 'test/ua')
+ jsi = jsi_cls(self.ydl, '', 10, user_agent='test/ua')
+ self.assertEqual(jsi.execute('console.log(navigator.userAgent);'), 'test/ua')
- @requires_feature('location')
- def test_location(self):
- self.url_param = 'https://example.com/123/456'
- self.assertEqual(self.jsi.execute('console.log(JSON.stringify([location.href, location.hostname]));'),
- '["https://example.com/123/456","example.com"]')
+ @test_jsi_rumtimes()
+ def test_location(self, jsi_cls: type[JSI]):
+ jsi = jsi_cls(self.ydl, 'https://example.com/123/456', 10)
+ self.assertEqual(jsi.execute('console.log(JSON.stringify([location.href, location.hostname]));'),
+ '["https://example.com/123/456","example.com"]')
- @requires_feature('dom')
- def test_execute_dom_parse(self):
- self.assertEqual(self.jsi.execute(
- 'console.log(document.getElementById("test-div").innerHTML);',
- html='
Hello, world!
'),
- 'Hello, world!')
+ @test_jsi_rumtimes(exclude=['Deno'])
+ def test_execute_dom_parse(self, jsi_cls: type[JSI]):
+ jsi = jsi_cls(self.ydl, '', 10)
+ self.assertEqual(jsi.execute(
+ 'console.log(document.getElementById("test-div").innerHTML);',
+ html='Hello, world!
'),
+ 'Hello, world!')
- @requires_feature('dom')
- def test_execute_dom_script(self):
- self.assertEqual(self.jsi.execute(
- 'console.log(document.getElementById("test-div").innerHTML);',
- html='''Hello, world!
-
-
-
- '''),
- 'Hello, world!')
+ @test_jsi_rumtimes(exclude=['Deno'])
+ def test_execute_dom_script(self, jsi_cls: type[JSI]):
+ jsi = jsi_cls(self.ydl, '', 10)
+ self.assertEqual(jsi.execute(
+ 'console.log(document.getElementById("test-div").innerHTML);',
+ html='''Hello, world!
+
+
+
+ '''),
+ 'Hello, world!')
- @requires_feature(['dom', 'location'])
- def test_dom_location(self):
- self.url_param = 'https://example.com/123/456'
- self.assertEqual(self.jsi.execute(
- 'console.log(document.getElementById("test-div").innerHTML);',
- html='''
- Hello, world!
'''),
- 'example.com')
+ @test_jsi_rumtimes(exclude=['Deno'])
+ def test_dom_location(self, jsi_cls: type[JSI]):
+ jsi = jsi_cls(self.ydl, 'https://example.com/123/456', 10)
+ self.assertEqual(jsi.execute(
+ 'console.log(document.getElementById("test-div").innerHTML);',
+ html='''
+ Hello, world!
'''),
+ 'example.com')
- @requires_feature('cookies')
- def test_execute_cookiejar(self):
- cookiejar = YoutubeDLCookieJar()
- ref_cookiejar = YoutubeDLCookieJar()
+ @test_jsi_rumtimes(exclude=['Deno'])
+ def test_execute_cookiejar(self, jsi_cls: type[JSI]):
+ cookiejar = YoutubeDLCookieJar()
+ ref_cookiejar = YoutubeDLCookieJar()
- def _assert_expected_execute(cookie_str, ref_cookie_str):
- self.assertEqual(set(cookie_str.split('; ')), set(ref_cookie_str.split('; ')))
- for cookie in cookiejar:
- ref_cookie = next((c for c in ref_cookiejar if c.name == cookie.name
- and c.domain == cookie.domain), None)
- self.assertEqual(repr(cookie), repr(ref_cookie))
+ def _assert_expected_execute(cookie_str, ref_cookie_str):
+ self.assertEqual(set(cookie_str.split('; ')), set(ref_cookie_str.split('; ')))
+ for cookie in cookiejar:
+ ref_cookie = next((c for c in ref_cookiejar if c.name == cookie.name
+ and c.domain == cookie.domain), None)
+ self.assertEqual(repr(cookie), repr(ref_cookie))
- for test_cookie in [
- NetscapeFields('test1', 'test1', '.example.com', '/', False, int(time.time()) + 1000),
- NetscapeFields('test2', 'test2', '.example.com', '/', True, int(time.time()) + 1000),
- NetscapeFields('test3', 'test3', '.example.com', '/123', False, int(time.time()) + 1000),
- NetscapeFields('test4', 'test4', '.example.com', '/456', False, int(time.time()) + 1000),
- NetscapeFields('test5', 'test5', '.example.com', '/123', True, int(time.time()) + 1000),
- NetscapeFields('test6', 'test6', '.example.com', '/456', True, int(time.time()) + 1000),
- NetscapeFields('test1', 'other1', '.other.com', '/', False, int(time.time()) + 1000),
- NetscapeFields('test2', 'other2', '.other.com', '/', False, int(time.time()) + 1000),
- NetscapeFields('test7', 'other7', '.other.com', '/', False, int(time.time()) + 1000),
- ]:
- cookiejar.set_cookie(test_cookie.to_cookie())
- ref_cookiejar.set_cookie(test_cookie.to_cookie())
+ for test_cookie in [
+ NetscapeFields('test1', 'test1', '.example.com', '/', False, int(time.time()) + 1000),
+ NetscapeFields('test2', 'test2', '.example.com', '/', True, int(time.time()) + 1000),
+ NetscapeFields('test3', 'test3', '.example.com', '/123', False, int(time.time()) + 1000),
+ NetscapeFields('test4', 'test4', '.example.com', '/456', False, int(time.time()) + 1000),
+ NetscapeFields('test5', 'test5', '.example.com', '/123', True, int(time.time()) + 1000),
+ NetscapeFields('test6', 'test6', '.example.com', '/456', True, int(time.time()) + 1000),
+ NetscapeFields('test1', 'other1', '.other.com', '/', False, int(time.time()) + 1000),
+ NetscapeFields('test2', 'other2', '.other.com', '/', False, int(time.time()) + 1000),
+ NetscapeFields('test7', 'other7', '.other.com', '/', False, int(time.time()) + 1000),
+ ]:
+ cookiejar.set_cookie(test_cookie.to_cookie())
+ ref_cookiejar.set_cookie(test_cookie.to_cookie())
- # test identity without modification from js
- self.url_param = 'http://example.com/123/456'
- _assert_expected_execute(self.jsi.execute(
- 'console.log(document.cookie);', cookiejar=cookiejar),
- 'test1=test1; test3=test3')
+ # test identity without modification from js
+ jsi = jsi_cls(self.ydl, 'http://example.com/123/456', 10)
+ _assert_expected_execute(jsi.execute(
+ 'console.log(document.cookie);', cookiejar=cookiejar),
+ 'test1=test1; test3=test3')
- # test modification of existing cookie from js
- new_cookie_1 = NetscapeFields('test1', 'new1', '.example.com', '/', True, int(time.time()) + 900)
- new_cookie_2 = NetscapeFields('test2', 'new2', '.example.com', '/', True, int(time.time()) + 900)
- ref_cookiejar.set_cookie(new_cookie_1.to_cookie())
- ref_cookiejar.set_cookie(new_cookie_2.to_cookie())
- self.url_param = 'https://example.com/123/456'
- _assert_expected_execute(self.jsi.execute(
- f'''document.cookie = "test1=new1; secure; expires={new_cookie_1.expire_str()}; domain=.example.com; path=/";
- console.log(document.cookie);''',
- html=f'''Hello, world!
-
- ''',
- cookiejar=cookiejar),
- 'test1=new1; test2=new2; test3=test3; test5=test5')
+ # test modification of existing cookie from js
+ new_cookie_1 = NetscapeFields('test1', 'new1', '.example.com', '/', True, int(time.time()) + 900)
+ new_cookie_2 = NetscapeFields('test2', 'new2', '.example.com', '/', True, int(time.time()) + 900)
+ ref_cookiejar.set_cookie(new_cookie_1.to_cookie())
+ ref_cookiejar.set_cookie(new_cookie_2.to_cookie())
- @requires_feature('wasm')
- def test_wasm(self):
- with open(os.path.join(self._TESTDATA_DIR, 'hello_wasm.js')) as f:
- js_mod = f.read()
- with open(os.path.join(self._TESTDATA_DIR, 'hello_wasm_bg.wasm'), 'rb') as f:
- wasm = f.read()
+ # change to https url to test secure-domain behavior
+ jsi = jsi_cls(self.ydl, 'https://example.com/123/456', 10)
+ _assert_expected_execute(jsi.execute(
+ f'''document.cookie = "test1=new1; secure; expires={new_cookie_1.expire_str()}; domain=.example.com; path=/";
+ console.log(document.cookie);''',
+ html=f'''Hello, world!
+
+ ''',
+ cookiejar=cookiejar),
+ 'test1=new1; test2=new2; test3=test3; test5=test5')
- js_base = prepare_wasm_jsmodule(js_mod, wasm)
+ @test_jsi_rumtimes(exclude=['PhantomJS'])
+ def test_wasm(self, jsi_cls: type[JSI]):
+ with open(os.path.join(self._TESTDATA_DIR, 'hello_wasm.js')) as f:
+ js_mod = f.read()
+ with open(os.path.join(self._TESTDATA_DIR, 'hello_wasm_bg.wasm'), 'rb') as f:
+ wasm = f.read()
- js_code = js_base + ''';
- console.log(add(1, 2));
- greet('world');
- '''
+ js_base = prepare_wasm_jsmodule(js_mod, wasm)
- self.assertEqual(self.jsi.execute(js_code), '3\nHello, world!')
+ js_code = js_base + ''';
+ console.log(add(1, 2));
+ greet('world');
+ '''
+ jsi = jsi_cls(self.ydl, '', 10)
+ self.assertEqual(jsi.execute(js_code), '3\nHello, world!')
-class TestDeno(Base.TestExternalJSI):
- _JSI_CLASS = DenoJSI
-
-
-class TestDenoDom(Base.TestExternalJSI):
- _JSI_CLASS = DenoJSDomJSI
-
-
-class TestPhantomJS(Base.TestExternalJSI):
- _JSI_CLASS = PhantomJSJSI
-
-
-expect_covered_features = set(_ALL_FEATURES)
-assert covered_features.issuperset(expect_covered_features), f'Missing tests for features: {expect_covered_features - covered_features}'
if __name__ == '__main__':
unittest.main()
diff --git a/yt_dlp/extractor/iqiyi.py b/yt_dlp/extractor/iqiyi.py
index 813984769f..81b18e3e45 100644
--- a/yt_dlp/extractor/iqiyi.py
+++ b/yt_dlp/extractor/iqiyi.py
@@ -417,7 +417,7 @@ class IqIE(InfoExtractor):
'cast': ['Sangmin Choi', 'Ratana Aiamsaart'],
},
'expected_warnings': ['format is restricted'],
- 'jsi_matrix_features': ['dom'],
+ 'jsi_matrix': True,
}, {
'url': 'https://www.iq.com/play/one-piece-episode-1000-1ma1i6ferf4',
'md5': '2d7caf6eeca8a32b407094b33b757d39',
@@ -616,7 +616,7 @@ def _real_extract(self, url):
else:
ut_list = ['0']
- jsi = JSIWrapper(self, url, ['dom'], timeout=120)
+ jsi = JSIWrapper(self, url, timeout=120)
# bid 0 as an initial format checker
dash_paths = self._parse_json(jsi.execute(self._DASH_JS % {
diff --git a/yt_dlp/globals.py b/yt_dlp/globals.py
index 0cf276cc9e..a5a3b228d1 100644
--- a/yt_dlp/globals.py
+++ b/yt_dlp/globals.py
@@ -15,6 +15,7 @@ def __repr__(self, /):
postprocessors = Indirect({})
extractors = Indirect({})
+jsi_runtimes = Indirect({})
# Plugins
all_plugins_loaded = Indirect(False)
@@ -23,6 +24,7 @@ def __repr__(self, /):
plugin_ies = Indirect({})
plugin_pps = Indirect({})
+plugin_jsis = Indirect({})
plugin_ies_overrides = Indirect(defaultdict(list))
# Misc
diff --git a/yt_dlp/jsinterp/__init__.py b/yt_dlp/jsinterp/__init__.py
index 8133cfeef7..0001ee294d 100644
--- a/yt_dlp/jsinterp/__init__.py
+++ b/yt_dlp/jsinterp/__init__.py
@@ -1,14 +1,28 @@
# flake8: noqa: F401
from .native import JSInterpreter
-from .common import _JSI_PREFERENCES, _JSI_HANDLERS, JSIWrapper
-from ._phantomjs import PhantomJSwrapper
-from . import _deno # ensure jsi registration
+from .common import _JSI_PREFERENCES, JSIWrapper
+from ._phantomjs import PhantomJSJSI, PhantomJSwrapper
+from ._deno import DenoJSI, DenoJSDomJSI
+from ..globals import jsi_runtimes, plugin_jsis
+from ..plugins import PluginSpec, register_plugin_spec
+jsi_runtimes.value.update({
+ name: value
+ for name, value in globals().items()
+ if name.endswith('JSI')
+})
+
+plugin_spec = PluginSpec(
+ module_name='jsinterp',
+ suffix='JSI',
+ destination=jsi_runtimes,
+ plugin_destination=plugin_jsis,
+)
+register_plugin_spec(plugin_spec)
__all__ = [
JSInterpreter,
PhantomJSwrapper,
- _JSI_HANDLERS,
_JSI_PREFERENCES,
JSIWrapper,
]
diff --git a/yt_dlp/jsinterp/_deno.py b/yt_dlp/jsinterp/_deno.py
index 03e241bf54..8b13646f5b 100644
--- a/yt_dlp/jsinterp/_deno.py
+++ b/yt_dlp/jsinterp/_deno.py
@@ -16,13 +16,11 @@
unified_timestamp,
)
from ._helper import TempFileWrapper, random_string, override_navigator_js, extract_script_tags
-from .common import ExternalJSI, register_jsi
+from .common import ExternalJSI
-@register_jsi
class DenoJSI(ExternalJSI):
"""JS interpreter class using Deno binary"""
- _SUPPORTED_FEATURES = {'wasm', 'location'}
_BASE_PREFERENCE = 5
_EXE_NAME = 'deno'
_DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check']
@@ -58,9 +56,7 @@ def execute(self, jscode, video_id=None, note='Executing JS in Deno'):
return self._run_deno(cmd)
-@register_jsi
class DenoJSDomJSI(DenoJSI):
- _SUPPORTED_FEATURES = {'wasm', 'location', 'dom', 'cookies'}
_BASE_PREFERENCE = 4
_DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check']
_JSDOM_IMPORT_CHECKED = False
@@ -112,8 +108,7 @@ def apply_cookies(cookiejar: YoutubeDLCookieJar | None, cookies: list[dict]):
def _ensure_jsdom(self):
if self._JSDOM_IMPORT_CHECKED:
return
- cmd = [self.exe, 'cache', self._JSDOM_URL]
- self._run_deno(cmd)
+ self._run_deno([self.exe, 'cache', self._JSDOM_URL])
self._JSDOM_IMPORT_CHECKED = True
def execute(self, jscode, video_id=None, note='Executing JS in Deno with jsdom', html='', cookiejar=None):
@@ -180,7 +175,7 @@ def execute(self, jscode, video_id=None, note='Executing JS in Deno with jsdom',
'''
# https://github.com/prebuild/node-gyp-build/blob/6822ec5/node-gyp-build.js#L196-L198
- # This jsdom dependency raises fatal error on linux unless read permission is provided
+ # This jsdom dependency raises fatal error on linux unless read for this file is allowed
read_flag = ['--allow-read=/etc/alpine-release'] if platform.system() == 'Linux' else []
location_args = ['--location', self._url] if self._url else []
diff --git a/yt_dlp/jsinterp/_phantomjs.py b/yt_dlp/jsinterp/_phantomjs.py
index e48ded44d4..ccd2550529 100644
--- a/yt_dlp/jsinterp/_phantomjs.py
+++ b/yt_dlp/jsinterp/_phantomjs.py
@@ -17,13 +17,11 @@
shell_quote,
)
from ._helper import TempFileWrapper, random_string, extract_script_tags
-from .common import ExternalJSI, register_jsi
+from .common import ExternalJSI
-@register_jsi
class PhantomJSJSI(ExternalJSI):
_EXE_NAME = 'phantomjs'
- _SUPPORTED_FEATURES = {'location', 'cookies', 'dom'}
_BASE_PREFERENCE = 3
_BASE_JS = R'''
diff --git a/yt_dlp/jsinterp/common.py b/yt_dlp/jsinterp/common.py
index 248fbe5569..8292fda81c 100644
--- a/yt_dlp/jsinterp/common.py
+++ b/yt_dlp/jsinterp/common.py
@@ -2,60 +2,41 @@
import abc
import typing
-import functools
+import inspect
+from ..globals import jsi_runtimes
from ..extractor.common import InfoExtractor
from ..utils import (
classproperty,
format_field,
filter_dict,
get_exe_version,
- variadic,
url_or_none,
sanitize_url,
ExtractorError,
)
-
-_JSI_HANDLERS: dict[str, type[JSI]] = {}
_JSI_PREFERENCES: set[JSIPreference] = set()
-_ALL_FEATURES = {
- 'wasm',
- 'location',
- 'dom',
- 'cookies',
-}
-def get_jsi_keys(jsi_or_keys: typing.Iterable[str | type[JSI] | JSI]) -> list[str]:
+def all_handlers() -> dict[str, type[JSI]]:
+ return {jsi.JSI_KEY: jsi for jsi in jsi_runtimes.value.values()}
+
+
+def to_jsi_keys(jsi_or_keys: typing.Iterable[str | type[JSI] | JSI]) -> list[str]:
return [jok if isinstance(jok, str) else jok.JSI_KEY for jok in jsi_or_keys]
-def filter_jsi_keys(features=None, only_include=None, exclude=None):
- keys = list(_JSI_HANDLERS)
- if features:
- keys = [key for key in keys if key in _JSI_HANDLERS
- and _JSI_HANDLERS[key]._SUPPORTED_FEATURES.issuperset(features)]
- if only_include:
- keys = [key for key in keys if key in get_jsi_keys(only_include)]
- if exclude:
- keys = [key for key in keys if key not in get_jsi_keys(exclude)]
- return keys
-
-
-def filter_jsi_include(only_include: typing.Iterable[str] | None, exclude: typing.Iterable[str] | None):
- keys = get_jsi_keys(only_include) if only_include else _JSI_HANDLERS.keys()
- return [key for key in keys if key not in (exclude or [])]
-
-
-def filter_jsi_feature(features: typing.Iterable[str], keys=None):
- keys = keys if keys is not None else _JSI_HANDLERS.keys()
- return [key for key in keys if key in _JSI_HANDLERS
- and _JSI_HANDLERS[key]._SUPPORTED_FEATURES.issuperset(features)]
+def get_included_jsi(only_include=None, exclude=None):
+ return {
+ key: value for key, value in all_handlers().items()
+ if (not only_include or key in to_jsi_keys(only_include))
+ and (not exclude or key not in to_jsi_keys(exclude))
+ }
def order_to_pref(jsi_order: typing.Iterable[str | type[JSI] | JSI], multiplier: int) -> JSIPreference:
- jsi_order = reversed(get_jsi_keys(jsi_order))
+ jsi_order = reversed(to_jsi_keys(jsi_order))
pref_score = {jsi_cls: (i + 1) * multiplier for i, jsi_cls in enumerate(jsi_order)}
def _pref(jsi: JSI, *args):
@@ -63,20 +44,6 @@ def _pref(jsi: JSI, *args):
return _pref
-def require_features(param_features: dict[str, str | typing.Iterable[str]]):
- assert all(_ALL_FEATURES.issuperset(variadic(kw_feature)) for kw_feature in param_features.values())
-
- def outer(func):
- @functools.wraps(func)
- def inner(self: JSIWrapper, *args, **kwargs):
- for kw_name, kw_feature in param_features.items():
- if kw_name in kwargs and not self._features.issuperset(variadic(kw_feature)):
- raise ExtractorError(f'feature {kw_feature} is required for `{kw_name}` param but not declared')
- return func(self, *args, **kwargs)
- return inner
- return outer
-
-
class JSIWrapper:
"""
Helper class to forward JS interp request to a JSI that supports it.
@@ -85,25 +52,17 @@ class JSIWrapper:
```
def _real_extract(self, url):
...
- jsi = JSIWrapper(self, url, features=['js'])
+ jsi = JSIWrapper(self, url)
result = jsi.execute(jscode, video_id)
...
```
- Features:
- - `wasm`: supports window.WebAssembly
- - `location`: supports mocking window.location
- - `dom`: supports DOM interface (not necessarily rendering)
- - `cookies`: supports document.cookie read & write
-
@param dl_or_ie: `YoutubeDL` or `InfoExtractor` instance.
- @param url: setting url context, used by JSI that supports `location` feature
- @param features: only JSI that supports all of these features will be selected
+ @param url: setting url context
@param only_include: limit JSI to choose from.
@param exclude: JSI to avoid using.
@param jsi_params: extra kwargs to pass to `JSI.__init__()` for each JSI, using jsi key as dict key.
@param preferred_order: list of JSI to use. First in list is tested first.
- @param fallback_jsi: list of JSI that may fail and should act non-fatal and fallback to other JSI. Pass `"all"` to always fallback
@param timeout: timeout parameter for all chosen JSI
@param user_agent: override user-agent to use for supported JSI
"""
@@ -112,46 +71,57 @@ def __init__(
self,
dl_or_ie: YoutubeDL | InfoExtractor,
url: str = '',
- features: typing.Iterable[str] = [],
only_include: typing.Iterable[str | type[JSI]] = [],
exclude: typing.Iterable[str | type[JSI]] = [],
jsi_params: dict[str, dict] = {},
preferred_order: typing.Iterable[str | type[JSI]] = [],
- fallback_jsi: typing.Iterable[str | type[JSI]] | typing.Literal['all'] = [],
timeout: float | int = 10,
user_agent: str | None = None,
):
- self._downloader: YoutubeDL = dl_or_ie._downloader if isinstance(dl_or_ie, InfoExtractor) else dl_or_ie
- self._url = sanitize_url(url_or_none(url)) or ''
- self._features = set(features)
- if url and not self._url:
- self.report_warning(f'Invalid URL: "{url}", using empty string instead')
+ if isinstance(dl_or_ie, InfoExtractor):
+ self._downloader = dl_or_ie._downloader
+ self._ie_key = dl_or_ie.ie_key()
+ else:
+ self._downloader = dl_or_ie
+ self._ie_key = None
- if unsupported_features := self._features - _ALL_FEATURES:
- raise ExtractorError(f'Unsupported features: {unsupported_features}, allowed features: {_ALL_FEATURES}')
+ self._url = self._sanitize_url(url)
+ self.preferences: set[JSIPreference] = {
+ order_to_pref(self._load_pref_from_option(), 10000),
+ order_to_pref(preferred_order, 100)
+ } | _JSI_PREFERENCES
- user_prefs = self._downloader.params.get('jsi_preference', [])
- for invalid_key in [jsi_key for jsi_key in user_prefs if jsi_key not in _JSI_HANDLERS]:
- self.report_warning(f'`{invalid_key}` is not a valid JSI, ignoring preference setting')
- user_prefs.remove(invalid_key)
-
- handler_classes = [_JSI_HANDLERS[key] for key in filter_jsi_keys(self._features, only_include, exclude)]
- self.write_debug(f'Select JSI for features={self._features}: {get_jsi_keys(handler_classes)}, '
- f'included: {get_jsi_keys(only_include) or "all"}, excluded: {get_jsi_keys(exclude)}')
+ handler_classes = self._load_allowed_jsi_cls(only_include, exclude)
if not handler_classes:
- raise ExtractorError(f'No JSI supports features={self._features}')
+ raise ExtractorError('No JSI is allowed to use')
self._handler_dict = {cls.JSI_KEY: cls(
- self._downloader, url=self._url, timeout=timeout, features=self._features,
+ self._downloader, url=self._url, timeout=timeout,
user_agent=user_agent, **jsi_params.get(cls.JSI_KEY, {}),
- ) for cls in handler_classes}
+ ) for cls in handler_classes.values()}
- self.preferences: set[JSIPreference] = {
- order_to_pref(user_prefs, 10000), order_to_pref(preferred_order, 100)} | _JSI_PREFERENCES
-
- self._fallback_jsi = get_jsi_keys(handler_classes) if fallback_jsi == 'all' else get_jsi_keys(fallback_jsi)
self._is_test = self._downloader.params.get('test', False)
+ def _sanitize_url(self, url):
+ sanitized = sanitize_url(url_or_none(url)) or ''
+ if url and not sanitized:
+ self.report_warning(f'Invalid URL: "{url}", using empty string instead')
+ return sanitized
+
+ def _load_pref_from_option(self):
+ user_prefs = self._downloader.params.get('jsi_preference', [])
+ valid_handlers = list(all_handlers())
+ for invalid_key in [jsi_key for jsi_key in user_prefs if jsi_key not in valid_handlers]:
+ self.report_warning(f'`{invalid_key}` is not a valid JSI, ignoring preference setting')
+ user_prefs.remove(invalid_key)
+ return user_prefs
+
+ def _load_allowed_jsi_cls(self, only_include, exclude):
+ handler_classes = get_included_jsi(only_include, exclude)
+ self.write_debug(f'Select JSI: {to_jsi_keys(handler_classes)}, '
+ f'included: {to_jsi_keys(only_include) or "all"}, excluded: {to_jsi_keys(exclude)}')
+ return handler_classes
+
def write_debug(self, message, only_once=False):
return self._downloader.write_debug(f'[JSIDirector] {message}', only_once=only_once)
@@ -159,11 +129,19 @@ def report_warning(self, message, only_once=False):
return self._downloader.report_warning(f'[JSIDirector] {message}', only_once=only_once)
def _get_handlers(self, method_name: str, *args, **kwargs) -> list[JSI]:
- handlers = [h for h in self._handler_dict.values() if callable(getattr(h, method_name, None))]
- self.write_debug(f'Choosing handlers for method `{method_name}`: {get_jsi_keys(handlers)}')
+ def _supports(jsi: JSI):
+ if not callable(method := getattr(jsi, method_name, None)):
+ return False
+ method_params = inspect.signature(method).parameters
+ return all(key in method_params for key in kwargs)
+
+ handlers = [h for h in self._handler_dict.values() if _supports(h)]
+ self.write_debug(f'Choosing handlers for method `{method_name}` with kwargs {list(kwargs)}'
+ f': {to_jsi_keys(handlers)}')
+
if not handlers:
- raise ExtractorError(f'No JSI supports method `{method_name}`, '
- f'included handlers: {get_jsi_keys(self._handler_dict.values())}')
+ raise ExtractorError(f'No JSI supports method `{method_name}` with kwargs {list(kwargs)}, '
+ f'included handlers: {to_jsi_keys(self._handler_dict.values())}')
preferences = {
handler.JSI_KEY: sum(pref_func(handler, method_name, args, kwargs) for pref_func in self.preferences)
@@ -188,25 +166,25 @@ def _dispatch_request(self, method_name: str, *args, **kwargs):
self.write_debug(f'{handler.JSI_KEY} is not available')
unavailable.append(handler.JSI_NAME)
continue
+
try:
self.write_debug(f'Dispatching `{method_name}` task to {handler.JSI_NAME}')
return getattr(handler, method_name)(*args, **kwargs)
except ExtractorError as e:
- if handler.JSI_KEY not in self._fallback_jsi:
- raise
- else:
- exceptions.append((handler, e))
- self.write_debug(f'{handler.JSI_NAME} encountered error, fallback to next handler: {e}')
+ if self._is_test:
+ raise ExtractorError(f'{handler.JSI_NAME} got error while evaluating js, '
+ f'add "{handler.JSI_KEY}" in `exclude` if it should not be used')
+ exceptions.append((handler, e))
+ self.write_debug(f'{handler.JSI_NAME} encountered error, fallback to next handler: {e}')
if not exceptions:
msg = f'No available JSI installed, please install one of: {", ".join(unavailable)}'
else:
msg = f'Failed to perform {method_name}, total {len(exceptions)} errors'
if unavailable:
- msg = f'{msg}. You can try installing one of unavailable JSI: {", ".join(unavailable)}'
+ msg = f'{msg}. You may try installing one of unavailable JSI: {", ".join(unavailable)}'
raise ExtractorError(msg)
- @require_features({'location': 'location', 'html': 'dom', 'cookiejar': 'cookies'})
def execute(self, jscode: str, video_id: str | None, note: str | None = None,
html: str | None = None, cookiejar: YoutubeDLCookieJar | None = None) -> str:
"""
@@ -215,24 +193,20 @@ def execute(self, jscode: str, video_id: str | None, note: str | None = None,
@param jscode: JS code to execute
@param video_id
@param note
- @param html: html to load as document, requires `dom` feature
- @param cookiejar: cookiejar to read and set cookies, requires `cookies` feature, pass `InfoExtractor.cookiejar` if you want to read and write cookies
+ @param html: html to load as document
+ @param cookiejar: cookiejar to read and set cookies, pass `InfoExtractor.cookiejar` if you want to read and write cookies
"""
return self._dispatch_request('execute', jscode, video_id, **filter_dict({
'note': note, 'html': html, 'cookiejar': cookiejar}))
class JSI(abc.ABC):
- _SUPPORTED_FEATURES: set[str] = set()
_BASE_PREFERENCE: int = 0
- def __init__(self, downloader: YoutubeDL, url: str, timeout: float | int, features: set[str], user_agent=None):
- if not self._SUPPORTED_FEATURES.issuperset(features):
- raise ExtractorError(f'{self.JSI_NAME} does not support all required features: {features}')
+ def __init__(self, downloader: YoutubeDL, url: str, timeout: float | int, user_agent=None):
self._downloader = downloader
self._url = url
self.timeout = timeout
- self.features = features
self.user_agent: str = user_agent or self._downloader.params['http_headers']['User-Agent']
@abc.abstractmethod
@@ -277,15 +251,6 @@ def is_available(cls):
return bool(cls.exe)
-def register_jsi(jsi_cls: JsiClass) -> JsiClass:
- """Register a JS interpreter class"""
- assert issubclass(jsi_cls, JSI), f'{jsi_cls} must be a subclass of JSI'
- assert jsi_cls.JSI_KEY not in _JSI_HANDLERS, f'JSI {jsi_cls.JSI_KEY} already registered'
- assert jsi_cls._SUPPORTED_FEATURES.issubset(_ALL_FEATURES), f'{jsi_cls._SUPPORTED_FEATURES - _ALL_FEATURES} not declared in `_All_FEATURES`'
- _JSI_HANDLERS[jsi_cls.JSI_KEY] = jsi_cls
- return jsi_cls
-
-
def register_jsi_preference(*handlers: type[JSI]):
assert all(issubclass(handler, JSI) for handler in handlers), f'{handlers} must all be a subclass of JSI'
@@ -301,13 +266,12 @@ def inner(handler: JSI, *args):
@register_jsi_preference()
def _base_preference(handler: JSI, *args):
- return getattr(handler, '_BASE_PREFERENCE', 0)
+ return min(10, getattr(handler, '_BASE_PREFERENCE', 0))
if typing.TYPE_CHECKING:
from ..YoutubeDL import YoutubeDL
from ..cookies import YoutubeDLCookieJar
- JsiClass = typing.TypeVar('JsiClass', bound=type[JSI])
class JSIPreference(typing.Protocol):
def __call__(self, handler: JSI, method_name: str, *args, **kwargs) -> int: