1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-07-10 07:18:33 +00:00

update interface

This commit is contained in:
c-basalt 2024-12-31 06:25:12 -05:00
parent f0c1da2528
commit b086b8635d
5 changed files with 91 additions and 70 deletions

View File

@ -56,47 +56,44 @@ class TestExternalJSI(unittest.TestCase):
def setUp(self):
self.ydl = FakeYDL()
self.jsi = self._JSI_CLASS(self.ydl, 19, {})
if not self.jsi_available():
self.url = ''
if not self._JSI_CLASS.exe_version:
print(f'{self._JSI_CLASS.__name__} is not installed, skipping')
self.skipTest('Not available')
def jsi_available(self):
return self._JSI_CLASS and self._JSI_CLASS.exe_version
@property
def jsi(self):
return self._JSI_CLASS(self.ydl, self.url, 10, {})
def test_execute(self):
self.assertEqual(self.jsi.execute('console.log("Hello, world!");'), 'Hello, world!')
def test_execute_dom_parse(self):
if 'dom' not in self.jsi._SUPPORTED_FEATURES:
print(f'{self._JSI_CLASS.__name__} does not support DOM, skipping')
self.skipTest('DOM not supported')
self.assertEqual(self.jsi.execute(
'console.log(document.getElementById("test-div").innerHTML);',
location='https://example.com',
html='<html><body><div id="test-div">Hello, world!</div></body></html>'),
'Hello, world!')
def test_execute_dom_script(self):
if 'dom' not in self.jsi._SUPPORTED_FEATURES:
print(f'{self._JSI_CLASS.__name__} does not support DOM, skipping')
self.skipTest('DOM not supported')
self.assertEqual(self.jsi.execute(
'console.log(document.getElementById("test-div").innerHTML);',
location='https://example.com',
html='''<html><body>
<div id="test-div"></div>
<script src="https://example.com/script.js"></script>
<script type="text/javascript">
<script>
document.getElementById("test-div").innerHTML = "Hello, world!"
console.log('this should not show up');
</script>
</body></html>'''),
'Hello, world!')
def test_execute_dom_script_with_error(self):
if 'dom' not in self.jsi._SUPPORTED_FEATURES:
self.skipTest('DOM not supported')
self.assertEqual(self.jsi.execute(
'console.log(document.getElementById("test-div").innerHTML);',
location='https://example.com',
html='''<html><body>
<div id="test-div"></div>
<script src="https://example.com/script.js"></script>
@ -108,20 +105,20 @@ def test_execute_dom_script_with_error(self):
</body></html>'''),
'Hello, world!')
def assert_cookiejar_equal(self, cookiejar: http.cookiejar.CookieJar, ref_cookiejar: http.cookiejar.CookieJar):
for cookie in cookiejar:
ref_cookie = next((c for c in ref_cookiejar if c.name == cookie.name and c.domain == cookie.domain), None)
self.assertEqual(repr(cookie), repr(ref_cookie))
def assert_cookie_str_equal(self, cookie_str, ref_cookie_str):
print([cookie_str, ref_cookie_str])
self.assertEqual(set(cookie_str.split('; ')), set(ref_cookie_str.split('; ')))
def test_execute_cookiejar(self):
if 'cookies' not in self.jsi._SUPPORTED_FEATURES:
print(f'{self._JSI_CLASS.__name__} does not support cookies, skipping')
self.skipTest('Cookies not supported')
cookiejar = YoutubeDLCookieJar()
ref_cookiejar = YoutubeDLCookieJar()
def _assert_expected_execute(cookie_str, ref_cookie_str):
self.assertEqual(set(cookie_str.split('; ')), set(ref_cookie_str.split('; ')))
for cookie in cookiejar:
ref_cookie = next((c for c in ref_cookiejar if c.name == cookie.name
and c.domain == cookie.domain), None)
self.assertEqual(repr(cookie), repr(ref_cookie))
for test_cookie in [
NetscapeFields('test1', 'test1', '.example.com', '/', False, int(time.time()) + 1000),
NetscapeFields('test2', 'test2', '.example.com', '/', True, int(time.time()) + 1000),
@ -137,23 +134,20 @@ def test_execute_cookiejar(self):
ref_cookiejar.set_cookie(test_cookie.to_cookie())
# test identity without modification from js
self.assert_cookie_str_equal(self.jsi.execute(
'console.log(document.cookie);',
location='http://example.com/123/456',
html='<html><body><div id="test-div">Hello, world!</div></body></html>',
cookiejar=cookiejar),
self.url = 'http://example.com/123/456'
_assert_expected_execute(self.jsi.execute(
'console.log(document.cookie);', cookiejar=cookiejar),
'test1=test1; test3=test3')
self.assert_cookiejar_equal(cookiejar, ref_cookiejar)
# test modification of existing cookie from js
new_cookie_1 = NetscapeFields('test1', 'new1', '.example.com', '/', True, int(time.time()) + 900)
new_cookie_2 = NetscapeFields('test2', 'new2', '.example.com', '/', True, int(time.time()) + 900)
ref_cookiejar.set_cookie(new_cookie_1.to_cookie())
ref_cookiejar.set_cookie(new_cookie_2.to_cookie())
self.assert_cookie_str_equal(self.jsi.execute(
self.url = 'https://example.com/123/456'
_assert_expected_execute(self.jsi.execute(
f'''document.cookie = "test1=new1; secure; expires={new_cookie_1.expire_str()}; domain=.example.com; path=/";
console.log(document.cookie);''',
location='https://example.com/123/456',
html=f'''<html><body><div id="test-div">Hello, world!</div>
<script>
document.cookie = "test2=new2; secure; expires={new_cookie_2.expire_str()}; domain=.example.com; path=/";
@ -161,7 +155,6 @@ def test_execute_cookiejar(self):
</body></html>''',
cookiejar=cookiejar),
'test1=new1; test2=new2; test3=test3; test5=test5')
self.assert_cookiejar_equal(cookiejar, ref_cookiejar)
class TestDeno(Base.TestExternalJSI):

View File

@ -1,14 +1,12 @@
from .native import JSInterpreter
from .common import _JSI_PREFERENCES, _JSI_HANDLERS, JSIWrapper
from ._phantomjs import PhantomJSwrapper
from ._deno import DenoJSI
from .common import _JSI_PREFERENCES, _JSI_HANDLERS, JSInterp
__all__ = [
JSInterpreter,
PhantomJSwrapper,
DenoJSI,
_JSI_HANDLERS,
_JSI_PREFERENCES,
JSInterp,
JSIWrapper,
]

View File

@ -50,9 +50,9 @@ def _run_deno(self, cmd):
self.report_warning(f'JS console error msg:\n{stderr.strip()}')
return stdout.strip()
def execute(self, jscode, video_id=None, note='Executing JS in Deno', location=None):
def execute(self, jscode, video_id=None, note='Executing JS in Deno'):
self.report_note(video_id, note)
location_args = ['--location', location] if location else []
location_args = ['--location', self._url] if self._url else []
with TempFileWrapper(f'{self._init_script};\n{self._override_navigator_js}\n{jscode}', suffix='.js') as js_file:
cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name]
return self._run_deno(cmd)
@ -128,21 +128,25 @@ def _ensure_jsdom(self):
self._run_deno(cmd)
self._JSDOM_IMPORT_CHECKED = True
def execute(self, jscode, video_id=None, note='Executing JS in Deno', location='', html='', cookiejar=None):
def execute(self, jscode, video_id=None, note='Executing JS in Deno', html='', cookiejar=None):
self.report_note(video_id, note)
self._ensure_jsdom()
callback_varname = f'__callback_{random_string()}'
if cookiejar and not self._url:
self.report_warning('No valid url scope provided, cookiejar is not applied')
cookiejar = None
html, inline_scripts = extract_script_tags(html)
wrapper_scripts = '\n'.join(['try { %s } catch (e) {}' % script for script in inline_scripts])
callback_varname = f'__callback_{random_string()}'
script = f'''{self._init_script};
{self._override_navigator_js};
import jsdom from "{self._JSDOM_URL}";
let {callback_varname} = (() => {{
const jar = jsdom.CookieJar.deserializeSync({json.dumps(self.serialize_cookie(cookiejar, location))});
const jar = jsdom.CookieJar.deserializeSync({json.dumps(self.serialize_cookie(cookiejar, self._url))});
const dom = new jsdom.JSDOM({json.dumps(str(html))}, {{
{'url: %s,' % json.dumps(str(location)) if location else ''}
{'url: %s,' % json.dumps(str(self._url)) if self._url else ''}
cookieJar: jar,
}});
Object.keys(dom.window).forEach((key) => {{try {{window[key] = dom.window[key]}} catch (e) {{}}}});
@ -166,7 +170,7 @@ def execute(self, jscode, video_id=None, note='Executing JS in Deno', location='
}}
'''
location_args = ['--location', location] if location else []
location_args = ['--location', self._url] if self._url else []
with TempFileWrapper(script, suffix='.js') as js_file:
cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name]
result = self._run_deno(cmd)

View File

@ -59,9 +59,11 @@ class PhantomJSJSI(ExternalJSI):
fs.write({cookies_fn}, JSON.stringify(phantom.cookies), write);
phantom.exit();
}};
var loaded = false;
page.onLoadFinished = function(status) {{
if(page.url === "") {{
if(page.url === "" && !loaded) {{
page.setContent(fs.read({html_fn}, read), {url});
loaded = true;
}}
else {{
JSON.parse(fs.read({cookies_fn}, read)).forEach(function(x) {{
@ -135,6 +137,10 @@ def _execute_html(self, jscode: str, url: str, html: str, cookiejar, video_id=No
if 'saveAndExit();' not in jscode:
raise ExtractorError('`saveAndExit();` not found in `jscode`')
if cookiejar and not url:
self.report_warning('No valid url scope provided, cookiejar is not applied')
cookiejar = None
html, inline_scripts = extract_script_tags(html)
wrapped_scripts = '\n'.join([
'page.evaluate(function() { try { %s } catch (e) {} });' % inline for inline in inline_scripts])
@ -157,9 +163,8 @@ def _execute_html(self, jscode: str, url: str, html: str, cookiejar, video_id=No
return new_html, stdout
def execute(self, jscode, video_id=None,
note='Executing JS in PhantomJS', location=None, html='', cookiejar=None):
if location:
def execute(self, jscode, video_id=None, note='Executing JS in PhantomJS', html='', cookiejar=None):
if self._url or html or cookiejar:
jscode = '''console.log(page.evaluate(function() {
var %(std_var)s = [];
console.log = function() {
@ -177,11 +182,7 @@ def execute(self, jscode, video_id=None,
'std_var': f'__stdout__values_{random_string()}',
'jscode': jscode,
}
return self._execute_html(jscode, location, html, cookiejar, video_id=video_id, note=note)[1].strip()
if html:
self.report_warning('`location` is required to use `html`')
if cookiejar:
self.report_warning('`location` and `html` are required to use `cookiejar`')
return self._execute_html(jscode, self._url, html, cookiejar, video_id=video_id, note=note)[1].strip()
return self._execute(jscode, video_id, note=note).strip()

View File

@ -8,8 +8,11 @@
from ..utils import (
classproperty,
format_field,
filter_dict,
get_exe_version,
variadic,
url_or_none,
sanitize_url,
ExtractorError,
)
@ -47,7 +50,7 @@ def require_features(param_features: dict[str, str | typing.Iterable[str]]):
def outer(func):
@functools.wraps(func)
def inner(self: JSInterp, *args, **kwargs):
def inner(self: JSIWrapper, *args, **kwargs):
for kw_name, kw_feature in param_features.items():
if kw_name in kwargs and not self._features.issuperset(variadic(kw_feature)):
raise ExtractorError(f'feature {kw_feature} is required for `{kw_name}` param but not declared')
@ -56,12 +59,29 @@ def inner(self: JSInterp, *args, **kwargs):
return outer
class JSInterp:
class JSIWrapper:
"""
Helper class to forward JS interp request to a concrete JSI that supports it.
Helper class to forward JS interp request to a JSI that supports it.
Usage:
```
def _real_extract(self, url):
...
jsi = JSIWrapper(self, url, features=['js'])
result = jsi.execute(jscode, video_id)
...
```
Features:
- `js`: supports js syntax
- `wasm`: supports WebAssembly interface
- `location`: supports setting window.location
- `dom`: supports DOM interface
- `cookies`: supports document.cookie read & write
@param dl_or_ie: `YoutubeDL` or `InfoExtractor` instance.
@param features: list of features that JSI must support.
@param url: setting url context, used by JSI that supports `location` feature
@param features: list of features that are necessary for JS interpretation.
@param only_include: limit JSI to choose from.
@param exclude: JSI to avoid using.
@param jsi_params: extra kwargs to pass to `JSI.__init__()` for each JSI, using jsi key as dict key.
@ -74,6 +94,7 @@ class JSInterp:
def __init__(
self,
dl_or_ie: YoutubeDL | InfoExtractor,
url: str,
features: typing.Iterable[str] = [],
only_include: typing.Iterable[str | type[JSI]] = [],
exclude: typing.Iterable[str | type[JSI]] = [],
@ -84,7 +105,10 @@ def __init__(
user_agent: str | None = None,
):
self._downloader: YoutubeDL = dl_or_ie._downloader if isinstance(dl_or_ie, InfoExtractor) else dl_or_ie
self._url = sanitize_url(url_or_none(url)) or ''
self._features = set(features)
if url and not self._url:
self.report_warning(f'Invalid URL: "{url}", using empty string instead')
if unsupported_features := self._features - _ALL_FEATURES:
raise ExtractorError(f'Unsupported features: {unsupported_features}, allowed features: {_ALL_FEATURES}')
@ -97,19 +121,13 @@ def __init__(
f'included: {get_jsi_keys(only_include) or "all"}, excluded: {get_jsi_keys(exclude)}')
self._handler_dict = {
cls.JSI_KEY: cls(self._downloader, timeout=timeout, features=self._features, user_agent=user_agent,
cls.JSI_KEY: cls(self._downloader, url=self._url, timeout=timeout,
features=self._features, user_agent=user_agent,
**jsi_params.get(cls.JSI_KEY, {})) for cls in handler_classes}
self.preferences: set[JSIPreference] = {order_to_pref(preferred_order, 100)} | _JSI_PREFERENCES
self._fallback_jsi = get_jsi_keys(handler_classes) if fallback_jsi == 'all' else get_jsi_keys(fallback_jsi)
self._is_test = self._downloader.params.get('test', False)
def add_handler(self, handler: JSI):
"""Add a handler. If a handler of the same JSI_KEY exists, it will overwrite it"""
assert isinstance(handler, JSI), 'handler must be a JSI instance'
if not handler._SUPPORTED_FEATURES.issuperset(self._features):
raise ExtractorError(f'{handler.JSI_NAME} does not support all required features: {self._features}')
self._handler_dict[handler.JSI_KEY] = handler
def write_debug(self, message, only_once=False):
return self._downloader.write_debug(f'[JSIDirector] {message}', only_once=only_once)
@ -178,17 +196,22 @@ def _dispatch_request(self, method_name: str, *args, **kwargs):
raise ExtractorError(msg)
@require_features({'location': 'location', 'html': 'dom', 'cookiejar': 'cookies'})
def execute(self, jscode: str, video_id: str | None, **kwargs) -> str:
def execute(self, jscode: str, video_id: str | None, note: str | None = None,
html: str | None = None, cookiejar: YoutubeDLCookieJar | None = None) -> str:
"""
Execute JS code and return stdout from console.log
@param {str} jscode: JS code to execute
@param video_id: video id
@param note: note
@param {str} location: url to configure window.location, requires `location` feature
@param {str} html: html to load as document, requires `dom` feature
@param {YoutubeDLCookieJar} cookiejar: cookiejar to set cookies, requires url and `cookies` feature
@param jscode: JS code to execute
@param video_id
@param note
@param html: html to load as document, requires `dom` feature
@param cookiejar: cookiejar to read and set cookies, requires `cookies` feature, pass `InfoExtractor.cookiejar` if you want to read and write cookies
"""
kwargs = filter_dict({
'note': note,
'html': html,
'cookiejar': cookiejar,
})
return self._dispatch_request('execute', jscode, video_id, **kwargs)
@ -196,10 +219,11 @@ class JSI(abc.ABC):
_SUPPORTED_FEATURES: set[str] = set()
_BASE_PREFERENCE: int = 0
def __init__(self, downloader: YoutubeDL, timeout: float | int, features: set[str], user_agent=None):
def __init__(self, downloader: YoutubeDL, url: str, timeout: float | int, features: set[str], user_agent=None):
if not self._SUPPORTED_FEATURES.issuperset(features):
raise ExtractorError(f'{self.JSI_NAME} does not support all required features: {features}')
self._downloader = downloader
self._url = url
self.timeout = timeout
self.features = features
self.user_agent: str = user_agent or self._downloader.params['http_headers']['User-Agent']
@ -275,6 +299,7 @@ def _base_preference(handler: JSI, *args):
if typing.TYPE_CHECKING:
from ..YoutubeDL import YoutubeDL
from ..cookies import YoutubeDLCookieJar
JsiClass = typing.TypeVar('JsiClass', bound=type[JSI])
class JSIPreference(typing.Protocol):