1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-07-08 14:28:35 +00:00

remove features and prepare for plugin

This commit is contained in:
c-basalt 2025-04-28 23:43:07 -04:00
parent 17f18868a1
commit 0061546a82
8 changed files with 236 additions and 287 deletions

View File

@ -25,7 +25,7 @@
import yt_dlp.YoutubeDL # isort: split
from yt_dlp.extractor import get_info_extractor
from yt_dlp.jsinterp.common import filter_jsi_keys
from yt_dlp.jsinterp.common import get_included_jsi
from yt_dlp.networking.exceptions import HTTPError, TransportError
from yt_dlp.utils import (
DownloadError,
@ -85,16 +85,16 @@ def __str__(self):
def generator(test_case, tname):
# setting `jsi_matrix` to True, `jsi_matrix_features` to list, or
# setting `jsi_matrix_only_include` or `jsi_matrix_exclude` to non-empty list
# setting `jsi_matrix` to True, or `jsi_matrix_only_include`, `jsi_matrix_exclude` to non-empty list
# to trigger matrix behavior for JSI
if isinstance(test_case.get('jsi_matrix_features'), list) or any(test_case.get(key) for key in [
if any(test_case.get(key) for key in [
'jsi_matrix', 'jsi_matrix_only_include', 'jsi_matrix_exclude',
]):
jsi_keys = filter_jsi_keys(
test_case.get('jsi_matrix_features'), test_case.get('jsi_matrix_only_include'),
test_case.get('jsi_matrix_exclude'))
jsi_keys = list(get_included_jsi(only_include=test_case.get('jsi_matrix_only_include'),
exclude=test_case.get('jsi_matrix_exclude')))
# use jsi_preference here, instead of force blocking other jsi runtimes
# exclusion, if needed, should be specified in test case to optimize testing
def generate_jsi_sub_case(jsi_key):
sub_case = filter_dict(test_case, lambda k, _: not k.startswith('jsi_matrix'))
sub_case['params'] = {**test_case.get('params', {}), 'jsi_preference': [jsi_key]}
@ -102,8 +102,9 @@ def generate_jsi_sub_case(jsi_key):
def run_sub_cases(self):
for i, jsi_key in enumerate(jsi_keys):
print(f'Running case {tname} using JSI: {jsi_key} ({i + 1}/{len(jsi_keys)})')
generate_jsi_sub_case(jsi_key)(self)
with self.subTest(jsi_key):
print(f'Running case {tname} using JSI: {jsi_key} ({i + 1}/{len(jsi_keys)})')
generate_jsi_sub_case(jsi_key)(self)
return run_sub_cases
def test_template(self):

View File

@ -8,25 +8,21 @@
import sys
import unittest
import http.cookiejar
import functools
import typing
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from test.helper import (
FakeYDL,
)
from yt_dlp.utils import (
variadic,
)
from test.helper import FakeYDL
from yt_dlp.cookies import YoutubeDLCookieJar
from yt_dlp.jsinterp import _JSI_HANDLERS
assert set(_JSI_HANDLERS) == {'Deno', 'DenoJSDom', 'PhantomJS'}
from yt_dlp.jsinterp.common import ExternalJSI, _ALL_FEATURES
from yt_dlp.jsinterp._deno import DenoJSI, DenoJSDomJSI
from yt_dlp.jsinterp._phantomjs import PhantomJSJSI
from yt_dlp.jsinterp.common import get_included_jsi
from yt_dlp.jsinterp._helper import prepare_wasm_jsmodule
if typing.TYPE_CHECKING:
from yt_dlp.jsinterp.common import JSI
@dataclasses.dataclass
class NetscapeFields:
@ -55,169 +51,148 @@ def __eq__(self, other: NetscapeFields | http.cookiejar.Cookie):
return all(getattr(self, attr) == getattr(other, attr) for attr in ['name', 'value', 'domain', 'path', 'secure', 'expires'])
covered_features = set()
def requires_feature(features):
covered_features.update(variadic(features))
def outer(func):
def wrapper(self, *args, **kwargs):
if not self.jsi._SUPPORTED_FEATURES.issuperset(variadic(features)):
print(f'{self._JSI_CLASS.__name__} does not support {features!r}, skipping')
self.skipTest(f'{"&".join(variadic(features))} not supported')
return func(self, *args, **kwargs)
def test_jsi_rumtimes(exclude=[]):
def inner(func: typing.Callable[[unittest.TestCase, type[JSI]], None]):
@functools.wraps(func)
def wrapper(self: unittest.TestCase):
for key, jsi in get_included_jsi(exclude=exclude).items():
with self.subTest(key):
func(self, jsi)
return wrapper
return outer
return inner
class Base:
class TestExternalJSI(unittest.TestCase):
_JSI_CLASS: type[ExternalJSI] = None
_TESTDATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'testdata', 'jsi_external')
maxDiff = 2000
class TestExternalJSI(unittest.TestCase):
_TESTDATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'testdata', 'jsi_external')
maxDiff = 2000
def setUp(self):
print()
self.ydl = FakeYDL()
self.url_param = ''
if not self._JSI_CLASS.exe_version:
print(f'{self._JSI_CLASS.__name__} is not installed, skipping')
self.skipTest('Not available')
def setUp(self):
self.ydl = FakeYDL()
@property
def jsi(self):
return self._JSI_CLASS(self.ydl, self.url_param, 10, {})
@test_jsi_rumtimes()
def test_execute(self, jsi_cls: type[JSI]):
jsi = jsi_cls(self.ydl, '', 10)
self.assertEqual(jsi.execute('console.log("Hello, world!");'), 'Hello, world!')
def test_execute(self):
self.assertEqual(self.jsi.execute('console.log("Hello, world!");'), 'Hello, world!')
@test_jsi_rumtimes()
def test_user_agent(self, jsi_cls: type[JSI]):
ua = self.ydl.params['http_headers']['User-Agent']
def test_user_agent(self):
ua = self.ydl.params['http_headers']['User-Agent']
self.assertEqual(self.jsi.execute('console.log(navigator.userAgent);'), ua)
self.assertNotEqual(self.jsi.execute('console.log(JSON.stringify(navigator.webdriver));'), 'true')
jsi = jsi_cls(self.ydl, '', 10)
self.assertEqual(jsi.execute('console.log(navigator.userAgent);'), ua)
self.assertNotEqual(jsi.execute('console.log(JSON.stringify(navigator.webdriver));'), 'true')
jsi = self._JSI_CLASS(self.ydl, self.url_param, 10, {}, user_agent='test/ua')
self.assertEqual(jsi.execute('console.log(navigator.userAgent);'), 'test/ua')
jsi = jsi_cls(self.ydl, '', 10, user_agent='test/ua')
self.assertEqual(jsi.execute('console.log(navigator.userAgent);'), 'test/ua')
@requires_feature('location')
def test_location(self):
self.url_param = 'https://example.com/123/456'
self.assertEqual(self.jsi.execute('console.log(JSON.stringify([location.href, location.hostname]));'),
'["https://example.com/123/456","example.com"]')
@test_jsi_rumtimes()
def test_location(self, jsi_cls: type[JSI]):
jsi = jsi_cls(self.ydl, 'https://example.com/123/456', 10)
self.assertEqual(jsi.execute('console.log(JSON.stringify([location.href, location.hostname]));'),
'["https://example.com/123/456","example.com"]')
@requires_feature('dom')
def test_execute_dom_parse(self):
self.assertEqual(self.jsi.execute(
'console.log(document.getElementById("test-div").innerHTML);',
html='<html><body><div id="test-div">Hello, world!</div></body></html>'),
'Hello, world!')
@test_jsi_rumtimes(exclude=['Deno'])
def test_execute_dom_parse(self, jsi_cls: type[JSI]):
jsi = jsi_cls(self.ydl, '', 10)
self.assertEqual(jsi.execute(
'console.log(document.getElementById("test-div").innerHTML);',
html='<html><body><div id="test-div">Hello, world!</div></body></html>'),
'Hello, world!')
@requires_feature('dom')
def test_execute_dom_script(self):
self.assertEqual(self.jsi.execute(
'console.log(document.getElementById("test-div").innerHTML);',
html='''<html><head><title>Hello, world!</title><body>
<div id="test-div"></div>
<script src="https://example.com/script.js"></script>
<script type="text/javascript">
document.getElementById("test-div").innerHTML = document.title;
console.log('this should not show up');
a = b; // Errors should be ignored
</script>
</body></html>'''),
'Hello, world!')
@test_jsi_rumtimes(exclude=['Deno'])
def test_execute_dom_script(self, jsi_cls: type[JSI]):
jsi = jsi_cls(self.ydl, '', 10)
self.assertEqual(jsi.execute(
'console.log(document.getElementById("test-div").innerHTML);',
html='''<html><head><title>Hello, world!</title><body>
<div id="test-div"></div>
<script src="https://example.com/script.js"></script>
<script type="text/javascript">
document.getElementById("test-div").innerHTML = document.title;
console.log('this should not show up');
a = b; // Errors should be ignored
</script>
</body></html>'''),
'Hello, world!')
@requires_feature(['dom', 'location'])
def test_dom_location(self):
self.url_param = 'https://example.com/123/456'
self.assertEqual(self.jsi.execute(
'console.log(document.getElementById("test-div").innerHTML);',
html='''<html><head><script>
document.querySelector("#test-div").innerHTML = document.domain</script></head>
<body><div id="test-div">Hello, world!</div></body></html>'''),
'example.com')
@test_jsi_rumtimes(exclude=['Deno'])
def test_dom_location(self, jsi_cls: type[JSI]):
jsi = jsi_cls(self.ydl, 'https://example.com/123/456', 10)
self.assertEqual(jsi.execute(
'console.log(document.getElementById("test-div").innerHTML);',
html='''<html><head><script>
document.querySelector("#test-div").innerHTML = document.domain</script></head>
<body><div id="test-div">Hello, world!</div></body></html>'''),
'example.com')
@requires_feature('cookies')
def test_execute_cookiejar(self):
cookiejar = YoutubeDLCookieJar()
ref_cookiejar = YoutubeDLCookieJar()
@test_jsi_rumtimes(exclude=['Deno'])
def test_execute_cookiejar(self, jsi_cls: type[JSI]):
cookiejar = YoutubeDLCookieJar()
ref_cookiejar = YoutubeDLCookieJar()
def _assert_expected_execute(cookie_str, ref_cookie_str):
self.assertEqual(set(cookie_str.split('; ')), set(ref_cookie_str.split('; ')))
for cookie in cookiejar:
ref_cookie = next((c for c in ref_cookiejar if c.name == cookie.name
and c.domain == cookie.domain), None)
self.assertEqual(repr(cookie), repr(ref_cookie))
def _assert_expected_execute(cookie_str, ref_cookie_str):
self.assertEqual(set(cookie_str.split('; ')), set(ref_cookie_str.split('; ')))
for cookie in cookiejar:
ref_cookie = next((c for c in ref_cookiejar if c.name == cookie.name
and c.domain == cookie.domain), None)
self.assertEqual(repr(cookie), repr(ref_cookie))
for test_cookie in [
NetscapeFields('test1', 'test1', '.example.com', '/', False, int(time.time()) + 1000),
NetscapeFields('test2', 'test2', '.example.com', '/', True, int(time.time()) + 1000),
NetscapeFields('test3', 'test3', '.example.com', '/123', False, int(time.time()) + 1000),
NetscapeFields('test4', 'test4', '.example.com', '/456', False, int(time.time()) + 1000),
NetscapeFields('test5', 'test5', '.example.com', '/123', True, int(time.time()) + 1000),
NetscapeFields('test6', 'test6', '.example.com', '/456', True, int(time.time()) + 1000),
NetscapeFields('test1', 'other1', '.other.com', '/', False, int(time.time()) + 1000),
NetscapeFields('test2', 'other2', '.other.com', '/', False, int(time.time()) + 1000),
NetscapeFields('test7', 'other7', '.other.com', '/', False, int(time.time()) + 1000),
]:
cookiejar.set_cookie(test_cookie.to_cookie())
ref_cookiejar.set_cookie(test_cookie.to_cookie())
for test_cookie in [
NetscapeFields('test1', 'test1', '.example.com', '/', False, int(time.time()) + 1000),
NetscapeFields('test2', 'test2', '.example.com', '/', True, int(time.time()) + 1000),
NetscapeFields('test3', 'test3', '.example.com', '/123', False, int(time.time()) + 1000),
NetscapeFields('test4', 'test4', '.example.com', '/456', False, int(time.time()) + 1000),
NetscapeFields('test5', 'test5', '.example.com', '/123', True, int(time.time()) + 1000),
NetscapeFields('test6', 'test6', '.example.com', '/456', True, int(time.time()) + 1000),
NetscapeFields('test1', 'other1', '.other.com', '/', False, int(time.time()) + 1000),
NetscapeFields('test2', 'other2', '.other.com', '/', False, int(time.time()) + 1000),
NetscapeFields('test7', 'other7', '.other.com', '/', False, int(time.time()) + 1000),
]:
cookiejar.set_cookie(test_cookie.to_cookie())
ref_cookiejar.set_cookie(test_cookie.to_cookie())
# test identity without modification from js
self.url_param = 'http://example.com/123/456'
_assert_expected_execute(self.jsi.execute(
'console.log(document.cookie);', cookiejar=cookiejar),
'test1=test1; test3=test3')
# test identity without modification from js
jsi = jsi_cls(self.ydl, 'http://example.com/123/456', 10)
_assert_expected_execute(jsi.execute(
'console.log(document.cookie);', cookiejar=cookiejar),
'test1=test1; test3=test3')
# test modification of existing cookie from js
new_cookie_1 = NetscapeFields('test1', 'new1', '.example.com', '/', True, int(time.time()) + 900)
new_cookie_2 = NetscapeFields('test2', 'new2', '.example.com', '/', True, int(time.time()) + 900)
ref_cookiejar.set_cookie(new_cookie_1.to_cookie())
ref_cookiejar.set_cookie(new_cookie_2.to_cookie())
self.url_param = 'https://example.com/123/456'
_assert_expected_execute(self.jsi.execute(
f'''document.cookie = "test1=new1; secure; expires={new_cookie_1.expire_str()}; domain=.example.com; path=/";
console.log(document.cookie);''',
html=f'''<html><body><div id="test-div">Hello, world!</div>
<script>
document.cookie = "test2=new2; secure; expires={new_cookie_2.expire_str()}; domain=.example.com; path=/";
</script>
</body></html>''',
cookiejar=cookiejar),
'test1=new1; test2=new2; test3=test3; test5=test5')
# test modification of existing cookie from js
new_cookie_1 = NetscapeFields('test1', 'new1', '.example.com', '/', True, int(time.time()) + 900)
new_cookie_2 = NetscapeFields('test2', 'new2', '.example.com', '/', True, int(time.time()) + 900)
ref_cookiejar.set_cookie(new_cookie_1.to_cookie())
ref_cookiejar.set_cookie(new_cookie_2.to_cookie())
@requires_feature('wasm')
def test_wasm(self):
with open(os.path.join(self._TESTDATA_DIR, 'hello_wasm.js')) as f:
js_mod = f.read()
with open(os.path.join(self._TESTDATA_DIR, 'hello_wasm_bg.wasm'), 'rb') as f:
wasm = f.read()
# change to https url to test secure-domain behavior
jsi = jsi_cls(self.ydl, 'https://example.com/123/456', 10)
_assert_expected_execute(jsi.execute(
f'''document.cookie = "test1=new1; secure; expires={new_cookie_1.expire_str()}; domain=.example.com; path=/";
console.log(document.cookie);''',
html=f'''<html><body><div id="test-div">Hello, world!</div>
<script>
document.cookie = "test2=new2; secure; expires={new_cookie_2.expire_str()}; domain=.example.com; path=/";
</script>
</body></html>''',
cookiejar=cookiejar),
'test1=new1; test2=new2; test3=test3; test5=test5')
js_base = prepare_wasm_jsmodule(js_mod, wasm)
@test_jsi_rumtimes(exclude=['PhantomJS'])
def test_wasm(self, jsi_cls: type[JSI]):
with open(os.path.join(self._TESTDATA_DIR, 'hello_wasm.js')) as f:
js_mod = f.read()
with open(os.path.join(self._TESTDATA_DIR, 'hello_wasm_bg.wasm'), 'rb') as f:
wasm = f.read()
js_code = js_base + ''';
console.log(add(1, 2));
greet('world');
'''
js_base = prepare_wasm_jsmodule(js_mod, wasm)
self.assertEqual(self.jsi.execute(js_code), '3\nHello, world!')
js_code = js_base + ''';
console.log(add(1, 2));
greet('world');
'''
jsi = jsi_cls(self.ydl, '', 10)
self.assertEqual(jsi.execute(js_code), '3\nHello, world!')
class TestDeno(Base.TestExternalJSI):
_JSI_CLASS = DenoJSI
class TestDenoDom(Base.TestExternalJSI):
_JSI_CLASS = DenoJSDomJSI
class TestPhantomJS(Base.TestExternalJSI):
_JSI_CLASS = PhantomJSJSI
expect_covered_features = set(_ALL_FEATURES)
assert covered_features.issuperset(expect_covered_features), f'Missing tests for features: {expect_covered_features - covered_features}'
if __name__ == '__main__':
unittest.main()

View File

@ -417,7 +417,7 @@ class IqIE(InfoExtractor):
'cast': ['Sangmin Choi', 'Ratana Aiamsaart'],
},
'expected_warnings': ['format is restricted'],
'jsi_matrix_features': ['dom'],
'jsi_matrix': True,
}, {
'url': 'https://www.iq.com/play/one-piece-episode-1000-1ma1i6ferf4',
'md5': '2d7caf6eeca8a32b407094b33b757d39',
@ -616,7 +616,7 @@ def _real_extract(self, url):
else:
ut_list = ['0']
jsi = JSIWrapper(self, url, ['dom'], timeout=120)
jsi = JSIWrapper(self, url, timeout=120)
# bid 0 as an initial format checker
dash_paths = self._parse_json(jsi.execute(self._DASH_JS % {

View File

@ -15,6 +15,7 @@ def __repr__(self, /):
postprocessors = Indirect({})
extractors = Indirect({})
jsi_runtimes = Indirect({})
# Plugins
all_plugins_loaded = Indirect(False)
@ -23,6 +24,7 @@ def __repr__(self, /):
plugin_ies = Indirect({})
plugin_pps = Indirect({})
plugin_jsis = Indirect({})
plugin_ies_overrides = Indirect(defaultdict(list))
# Misc

View File

@ -1,14 +1,28 @@
# flake8: noqa: F401
from .native import JSInterpreter
from .common import _JSI_PREFERENCES, _JSI_HANDLERS, JSIWrapper
from ._phantomjs import PhantomJSwrapper
from . import _deno # ensure jsi registration
from .common import _JSI_PREFERENCES, JSIWrapper
from ._phantomjs import PhantomJSJSI, PhantomJSwrapper
from ._deno import DenoJSI, DenoJSDomJSI
from ..globals import jsi_runtimes, plugin_jsis
from ..plugins import PluginSpec, register_plugin_spec
jsi_runtimes.value.update({
name: value
for name, value in globals().items()
if name.endswith('JSI')
})
plugin_spec = PluginSpec(
module_name='jsinterp',
suffix='JSI',
destination=jsi_runtimes,
plugin_destination=plugin_jsis,
)
register_plugin_spec(plugin_spec)
__all__ = [
JSInterpreter,
PhantomJSwrapper,
_JSI_HANDLERS,
_JSI_PREFERENCES,
JSIWrapper,
]

View File

@ -16,13 +16,11 @@
unified_timestamp,
)
from ._helper import TempFileWrapper, random_string, override_navigator_js, extract_script_tags
from .common import ExternalJSI, register_jsi
from .common import ExternalJSI
@register_jsi
class DenoJSI(ExternalJSI):
"""JS interpreter class using Deno binary"""
_SUPPORTED_FEATURES = {'wasm', 'location'}
_BASE_PREFERENCE = 5
_EXE_NAME = 'deno'
_DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check']
@ -58,9 +56,7 @@ def execute(self, jscode, video_id=None, note='Executing JS in Deno'):
return self._run_deno(cmd)
@register_jsi
class DenoJSDomJSI(DenoJSI):
_SUPPORTED_FEATURES = {'wasm', 'location', 'dom', 'cookies'}
_BASE_PREFERENCE = 4
_DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check']
_JSDOM_IMPORT_CHECKED = False
@ -112,8 +108,7 @@ def apply_cookies(cookiejar: YoutubeDLCookieJar | None, cookies: list[dict]):
def _ensure_jsdom(self):
if self._JSDOM_IMPORT_CHECKED:
return
cmd = [self.exe, 'cache', self._JSDOM_URL]
self._run_deno(cmd)
self._run_deno([self.exe, 'cache', self._JSDOM_URL])
self._JSDOM_IMPORT_CHECKED = True
def execute(self, jscode, video_id=None, note='Executing JS in Deno with jsdom', html='', cookiejar=None):
@ -180,7 +175,7 @@ def execute(self, jscode, video_id=None, note='Executing JS in Deno with jsdom',
'''
# https://github.com/prebuild/node-gyp-build/blob/6822ec5/node-gyp-build.js#L196-L198
# This jsdom dependency raises fatal error on linux unless read permission is provided
# This jsdom dependency raises fatal error on linux unless read for this file is allowed
read_flag = ['--allow-read=/etc/alpine-release'] if platform.system() == 'Linux' else []
location_args = ['--location', self._url] if self._url else []

View File

@ -17,13 +17,11 @@
shell_quote,
)
from ._helper import TempFileWrapper, random_string, extract_script_tags
from .common import ExternalJSI, register_jsi
from .common import ExternalJSI
@register_jsi
class PhantomJSJSI(ExternalJSI):
_EXE_NAME = 'phantomjs'
_SUPPORTED_FEATURES = {'location', 'cookies', 'dom'}
_BASE_PREFERENCE = 3
_BASE_JS = R'''

View File

@ -2,60 +2,41 @@
import abc
import typing
import functools
import inspect
from ..globals import jsi_runtimes
from ..extractor.common import InfoExtractor
from ..utils import (
classproperty,
format_field,
filter_dict,
get_exe_version,
variadic,
url_or_none,
sanitize_url,
ExtractorError,
)
_JSI_HANDLERS: dict[str, type[JSI]] = {}
_JSI_PREFERENCES: set[JSIPreference] = set()
_ALL_FEATURES = {
'wasm',
'location',
'dom',
'cookies',
}
def get_jsi_keys(jsi_or_keys: typing.Iterable[str | type[JSI] | JSI]) -> list[str]:
def all_handlers() -> dict[str, type[JSI]]:
return {jsi.JSI_KEY: jsi for jsi in jsi_runtimes.value.values()}
def to_jsi_keys(jsi_or_keys: typing.Iterable[str | type[JSI] | JSI]) -> list[str]:
return [jok if isinstance(jok, str) else jok.JSI_KEY for jok in jsi_or_keys]
def filter_jsi_keys(features=None, only_include=None, exclude=None):
keys = list(_JSI_HANDLERS)
if features:
keys = [key for key in keys if key in _JSI_HANDLERS
and _JSI_HANDLERS[key]._SUPPORTED_FEATURES.issuperset(features)]
if only_include:
keys = [key for key in keys if key in get_jsi_keys(only_include)]
if exclude:
keys = [key for key in keys if key not in get_jsi_keys(exclude)]
return keys
def filter_jsi_include(only_include: typing.Iterable[str] | None, exclude: typing.Iterable[str] | None):
keys = get_jsi_keys(only_include) if only_include else _JSI_HANDLERS.keys()
return [key for key in keys if key not in (exclude or [])]
def filter_jsi_feature(features: typing.Iterable[str], keys=None):
keys = keys if keys is not None else _JSI_HANDLERS.keys()
return [key for key in keys if key in _JSI_HANDLERS
and _JSI_HANDLERS[key]._SUPPORTED_FEATURES.issuperset(features)]
def get_included_jsi(only_include=None, exclude=None):
return {
key: value for key, value in all_handlers().items()
if (not only_include or key in to_jsi_keys(only_include))
and (not exclude or key not in to_jsi_keys(exclude))
}
def order_to_pref(jsi_order: typing.Iterable[str | type[JSI] | JSI], multiplier: int) -> JSIPreference:
jsi_order = reversed(get_jsi_keys(jsi_order))
jsi_order = reversed(to_jsi_keys(jsi_order))
pref_score = {jsi_cls: (i + 1) * multiplier for i, jsi_cls in enumerate(jsi_order)}
def _pref(jsi: JSI, *args):
@ -63,20 +44,6 @@ def _pref(jsi: JSI, *args):
return _pref
def require_features(param_features: dict[str, str | typing.Iterable[str]]):
assert all(_ALL_FEATURES.issuperset(variadic(kw_feature)) for kw_feature in param_features.values())
def outer(func):
@functools.wraps(func)
def inner(self: JSIWrapper, *args, **kwargs):
for kw_name, kw_feature in param_features.items():
if kw_name in kwargs and not self._features.issuperset(variadic(kw_feature)):
raise ExtractorError(f'feature {kw_feature} is required for `{kw_name}` param but not declared')
return func(self, *args, **kwargs)
return inner
return outer
class JSIWrapper:
"""
Helper class to forward JS interp request to a JSI that supports it.
@ -85,25 +52,17 @@ class JSIWrapper:
```
def _real_extract(self, url):
...
jsi = JSIWrapper(self, url, features=['js'])
jsi = JSIWrapper(self, url)
result = jsi.execute(jscode, video_id)
...
```
Features:
- `wasm`: supports window.WebAssembly
- `location`: supports mocking window.location
- `dom`: supports DOM interface (not necessarily rendering)
- `cookies`: supports document.cookie read & write
@param dl_or_ie: `YoutubeDL` or `InfoExtractor` instance.
@param url: setting url context, used by JSI that supports `location` feature
@param features: only JSI that supports all of these features will be selected
@param url: setting url context
@param only_include: limit JSI to choose from.
@param exclude: JSI to avoid using.
@param jsi_params: extra kwargs to pass to `JSI.__init__()` for each JSI, using jsi key as dict key.
@param preferred_order: list of JSI to use. First in list is tested first.
@param fallback_jsi: list of JSI that may fail and should act non-fatal and fallback to other JSI. Pass `"all"` to always fallback
@param timeout: timeout parameter for all chosen JSI
@param user_agent: override user-agent to use for supported JSI
"""
@ -112,46 +71,57 @@ def __init__(
self,
dl_or_ie: YoutubeDL | InfoExtractor,
url: str = '',
features: typing.Iterable[str] = [],
only_include: typing.Iterable[str | type[JSI]] = [],
exclude: typing.Iterable[str | type[JSI]] = [],
jsi_params: dict[str, dict] = {},
preferred_order: typing.Iterable[str | type[JSI]] = [],
fallback_jsi: typing.Iterable[str | type[JSI]] | typing.Literal['all'] = [],
timeout: float | int = 10,
user_agent: str | None = None,
):
self._downloader: YoutubeDL = dl_or_ie._downloader if isinstance(dl_or_ie, InfoExtractor) else dl_or_ie
self._url = sanitize_url(url_or_none(url)) or ''
self._features = set(features)
if url and not self._url:
self.report_warning(f'Invalid URL: "{url}", using empty string instead')
if isinstance(dl_or_ie, InfoExtractor):
self._downloader = dl_or_ie._downloader
self._ie_key = dl_or_ie.ie_key()
else:
self._downloader = dl_or_ie
self._ie_key = None
if unsupported_features := self._features - _ALL_FEATURES:
raise ExtractorError(f'Unsupported features: {unsupported_features}, allowed features: {_ALL_FEATURES}')
self._url = self._sanitize_url(url)
self.preferences: set[JSIPreference] = {
order_to_pref(self._load_pref_from_option(), 10000),
order_to_pref(preferred_order, 100)
} | _JSI_PREFERENCES
user_prefs = self._downloader.params.get('jsi_preference', [])
for invalid_key in [jsi_key for jsi_key in user_prefs if jsi_key not in _JSI_HANDLERS]:
self.report_warning(f'`{invalid_key}` is not a valid JSI, ignoring preference setting')
user_prefs.remove(invalid_key)
handler_classes = [_JSI_HANDLERS[key] for key in filter_jsi_keys(self._features, only_include, exclude)]
self.write_debug(f'Select JSI for features={self._features}: {get_jsi_keys(handler_classes)}, '
f'included: {get_jsi_keys(only_include) or "all"}, excluded: {get_jsi_keys(exclude)}')
handler_classes = self._load_allowed_jsi_cls(only_include, exclude)
if not handler_classes:
raise ExtractorError(f'No JSI supports features={self._features}')
raise ExtractorError('No JSI is allowed to use')
self._handler_dict = {cls.JSI_KEY: cls(
self._downloader, url=self._url, timeout=timeout, features=self._features,
self._downloader, url=self._url, timeout=timeout,
user_agent=user_agent, **jsi_params.get(cls.JSI_KEY, {}),
) for cls in handler_classes}
) for cls in handler_classes.values()}
self.preferences: set[JSIPreference] = {
order_to_pref(user_prefs, 10000), order_to_pref(preferred_order, 100)} | _JSI_PREFERENCES
self._fallback_jsi = get_jsi_keys(handler_classes) if fallback_jsi == 'all' else get_jsi_keys(fallback_jsi)
self._is_test = self._downloader.params.get('test', False)
def _sanitize_url(self, url):
sanitized = sanitize_url(url_or_none(url)) or ''
if url and not sanitized:
self.report_warning(f'Invalid URL: "{url}", using empty string instead')
return sanitized
def _load_pref_from_option(self):
user_prefs = self._downloader.params.get('jsi_preference', [])
valid_handlers = list(all_handlers())
for invalid_key in [jsi_key for jsi_key in user_prefs if jsi_key not in valid_handlers]:
self.report_warning(f'`{invalid_key}` is not a valid JSI, ignoring preference setting')
user_prefs.remove(invalid_key)
return user_prefs
def _load_allowed_jsi_cls(self, only_include, exclude):
handler_classes = get_included_jsi(only_include, exclude)
self.write_debug(f'Select JSI: {to_jsi_keys(handler_classes)}, '
f'included: {to_jsi_keys(only_include) or "all"}, excluded: {to_jsi_keys(exclude)}')
return handler_classes
def write_debug(self, message, only_once=False):
return self._downloader.write_debug(f'[JSIDirector] {message}', only_once=only_once)
@ -159,11 +129,19 @@ def report_warning(self, message, only_once=False):
return self._downloader.report_warning(f'[JSIDirector] {message}', only_once=only_once)
def _get_handlers(self, method_name: str, *args, **kwargs) -> list[JSI]:
handlers = [h for h in self._handler_dict.values() if callable(getattr(h, method_name, None))]
self.write_debug(f'Choosing handlers for method `{method_name}`: {get_jsi_keys(handlers)}')
def _supports(jsi: JSI):
if not callable(method := getattr(jsi, method_name, None)):
return False
method_params = inspect.signature(method).parameters
return all(key in method_params for key in kwargs)
handlers = [h for h in self._handler_dict.values() if _supports(h)]
self.write_debug(f'Choosing handlers for method `{method_name}` with kwargs {list(kwargs)}'
f': {to_jsi_keys(handlers)}')
if not handlers:
raise ExtractorError(f'No JSI supports method `{method_name}`, '
f'included handlers: {get_jsi_keys(self._handler_dict.values())}')
raise ExtractorError(f'No JSI supports method `{method_name}` with kwargs {list(kwargs)}, '
f'included handlers: {to_jsi_keys(self._handler_dict.values())}')
preferences = {
handler.JSI_KEY: sum(pref_func(handler, method_name, args, kwargs) for pref_func in self.preferences)
@ -188,25 +166,25 @@ def _dispatch_request(self, method_name: str, *args, **kwargs):
self.write_debug(f'{handler.JSI_KEY} is not available')
unavailable.append(handler.JSI_NAME)
continue
try:
self.write_debug(f'Dispatching `{method_name}` task to {handler.JSI_NAME}')
return getattr(handler, method_name)(*args, **kwargs)
except ExtractorError as e:
if handler.JSI_KEY not in self._fallback_jsi:
raise
else:
exceptions.append((handler, e))
self.write_debug(f'{handler.JSI_NAME} encountered error, fallback to next handler: {e}')
if self._is_test:
raise ExtractorError(f'{handler.JSI_NAME} got error while evaluating js, '
f'add "{handler.JSI_KEY}" in `exclude` if it should not be used')
exceptions.append((handler, e))
self.write_debug(f'{handler.JSI_NAME} encountered error, fallback to next handler: {e}')
if not exceptions:
msg = f'No available JSI installed, please install one of: {", ".join(unavailable)}'
else:
msg = f'Failed to perform {method_name}, total {len(exceptions)} errors'
if unavailable:
msg = f'{msg}. You can try installing one of unavailable JSI: {", ".join(unavailable)}'
msg = f'{msg}. You may try installing one of unavailable JSI: {", ".join(unavailable)}'
raise ExtractorError(msg)
@require_features({'location': 'location', 'html': 'dom', 'cookiejar': 'cookies'})
def execute(self, jscode: str, video_id: str | None, note: str | None = None,
html: str | None = None, cookiejar: YoutubeDLCookieJar | None = None) -> str:
"""
@ -215,24 +193,20 @@ def execute(self, jscode: str, video_id: str | None, note: str | None = None,
@param jscode: JS code to execute
@param video_id
@param note
@param html: html to load as document, requires `dom` feature
@param cookiejar: cookiejar to read and set cookies, requires `cookies` feature, pass `InfoExtractor.cookiejar` if you want to read and write cookies
@param html: html to load as document
@param cookiejar: cookiejar to read and set cookies, pass `InfoExtractor.cookiejar` if you want to read and write cookies
"""
return self._dispatch_request('execute', jscode, video_id, **filter_dict({
'note': note, 'html': html, 'cookiejar': cookiejar}))
class JSI(abc.ABC):
_SUPPORTED_FEATURES: set[str] = set()
_BASE_PREFERENCE: int = 0
def __init__(self, downloader: YoutubeDL, url: str, timeout: float | int, features: set[str], user_agent=None):
if not self._SUPPORTED_FEATURES.issuperset(features):
raise ExtractorError(f'{self.JSI_NAME} does not support all required features: {features}')
def __init__(self, downloader: YoutubeDL, url: str, timeout: float | int, user_agent=None):
self._downloader = downloader
self._url = url
self.timeout = timeout
self.features = features
self.user_agent: str = user_agent or self._downloader.params['http_headers']['User-Agent']
@abc.abstractmethod
@ -277,15 +251,6 @@ def is_available(cls):
return bool(cls.exe)
def register_jsi(jsi_cls: JsiClass) -> JsiClass:
"""Register a JS interpreter class"""
assert issubclass(jsi_cls, JSI), f'{jsi_cls} must be a subclass of JSI'
assert jsi_cls.JSI_KEY not in _JSI_HANDLERS, f'JSI {jsi_cls.JSI_KEY} already registered'
assert jsi_cls._SUPPORTED_FEATURES.issubset(_ALL_FEATURES), f'{jsi_cls._SUPPORTED_FEATURES - _ALL_FEATURES} not declared in `_All_FEATURES`'
_JSI_HANDLERS[jsi_cls.JSI_KEY] = jsi_cls
return jsi_cls
def register_jsi_preference(*handlers: type[JSI]):
assert all(issubclass(handler, JSI) for handler in handlers), f'{handlers} must all be a subclass of JSI'
@ -301,13 +266,12 @@ def inner(handler: JSI, *args):
@register_jsi_preference()
def _base_preference(handler: JSI, *args):
return getattr(handler, '_BASE_PREFERENCE', 0)
return min(10, getattr(handler, '_BASE_PREFERENCE', 0))
if typing.TYPE_CHECKING:
from ..YoutubeDL import YoutubeDL
from ..cookies import YoutubeDLCookieJar
JsiClass = typing.TypeVar('JsiClass', bound=type[JSI])
class JSIPreference(typing.Protocol):
def __call__(self, handler: JSI, method_name: str, *args, **kwargs) -> int: