1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-12-20 06:58:57 +00:00

Merge branch 'master' of https://github.com/yt-dlp/yt-dlp into jsi

This commit is contained in:
c-basalt
2025-03-01 16:19:20 -05:00
58 changed files with 1648 additions and 647 deletions

View File

@@ -101,87 +101,109 @@ def getwebpagetestcases():
md5 = lambda s: hashlib.md5(s.encode()).hexdigest()
def expect_value(self, got, expected, field):
if isinstance(expected, str) and expected.startswith('re:'):
match_str = expected[len('re:'):]
match_rex = re.compile(match_str)
def _iter_differences(got, expected, field):
if isinstance(expected, str):
op, _, val = expected.partition(':')
if op in ('mincount', 'maxcount', 'count'):
if not isinstance(got, (list, dict)):
yield field, f'expected either {list.__name__} or {dict.__name__}, got {type(got).__name__}'
return
self.assertTrue(
isinstance(got, str),
f'Expected a {str.__name__} object, but got {type(got).__name__} for field {field}')
self.assertTrue(
match_rex.match(got),
f'field {field} (value: {got!r}) should match {match_str!r}')
elif isinstance(expected, str) and expected.startswith('startswith:'):
start_str = expected[len('startswith:'):]
self.assertTrue(
isinstance(got, str),
f'Expected a {str.__name__} object, but got {type(got).__name__} for field {field}')
self.assertTrue(
got.startswith(start_str),
f'field {field} (value: {got!r}) should start with {start_str!r}')
elif isinstance(expected, str) and expected.startswith('contains:'):
contains_str = expected[len('contains:'):]
self.assertTrue(
isinstance(got, str),
f'Expected a {str.__name__} object, but got {type(got).__name__} for field {field}')
self.assertTrue(
contains_str in got,
f'field {field} (value: {got!r}) should contain {contains_str!r}')
elif isinstance(expected, type):
self.assertTrue(
isinstance(got, expected),
f'Expected type {expected!r} for field {field}, but got value {got!r} of type {type(got)!r}')
elif isinstance(expected, dict) and isinstance(got, dict):
expect_dict(self, got, expected)
elif isinstance(expected, list) and isinstance(got, list):
self.assertEqual(
len(expected), len(got),
f'Expect a list of length {len(expected)}, but got a list of length {len(got)} for field {field}')
for index, (item_got, item_expected) in enumerate(zip(got, expected)):
type_got = type(item_got)
type_expected = type(item_expected)
self.assertEqual(
type_expected, type_got,
f'Type mismatch for list item at index {index} for field {field}, '
f'expected {type_expected!r}, got {type_got!r}')
expect_value(self, item_got, item_expected, field)
else:
if isinstance(expected, str) and expected.startswith('md5:'):
self.assertTrue(
isinstance(got, str),
f'Expected field {field} to be a unicode object, but got value {got!r} of type {type(got)!r}')
got = 'md5:' + md5(got)
elif isinstance(expected, str) and re.match(r'^(?:min|max)?count:\d+', expected):
self.assertTrue(
isinstance(got, (list, dict)),
f'Expected field {field} to be a list or a dict, but it is of type {type(got).__name__}')
op, _, expected_num = expected.partition(':')
expected_num = int(expected_num)
expected_num = int(val)
got_num = len(got)
if op == 'mincount':
assert_func = assertGreaterEqual
msg_tmpl = 'Expected %d items in field %s, but only got %d'
elif op == 'maxcount':
assert_func = assertLessEqual
msg_tmpl = 'Expected maximum %d items in field %s, but got %d'
elif op == 'count':
assert_func = assertEqual
msg_tmpl = 'Expected exactly %d items in field %s, but got %d'
else:
assert False
assert_func(
self, len(got), expected_num,
msg_tmpl % (expected_num, field, len(got)))
if got_num < expected_num:
yield field, f'expected at least {val} items, got {got_num}'
return
if op == 'maxcount':
if got_num > expected_num:
yield field, f'expected at most {val} items, got {got_num}'
return
assert op == 'count'
if got_num != expected_num:
yield field, f'expected exactly {val} items, got {got_num}'
return
self.assertEqual(
expected, got,
f'Invalid value for field {field}, expected {expected!r}, got {got!r}')
if not isinstance(got, str):
yield field, f'expected {str.__name__}, got {type(got).__name__}'
return
if op == 're':
if not re.match(val, got):
yield field, f'should match {val!r}, got {got!r}'
return
if op == 'startswith':
if not val.startswith(got):
yield field, f'should start with {val!r}, got {got!r}'
return
if op == 'contains':
if not val.startswith(got):
yield field, f'should contain {val!r}, got {got!r}'
return
if op == 'md5':
hash_val = md5(got)
if hash_val != val:
yield field, f'expected hash {val}, got {hash_val}'
return
if got != expected:
yield field, f'expected {expected!r}, got {got!r}'
return
if isinstance(expected, dict) and isinstance(got, dict):
for key, expected_val in expected.items():
if key not in got:
yield field, f'missing key: {key!r}'
continue
field_name = key if field is None else f'{field}.{key}'
yield from _iter_differences(got[key], expected_val, field_name)
return
if isinstance(expected, type):
if not isinstance(got, expected):
yield field, f'expected {expected.__name__}, got {type(got).__name__}'
return
if isinstance(expected, list) and isinstance(got, list):
# TODO: clever diffing algorithm lmao
if len(expected) != len(got):
yield field, f'expected length of {len(expected)}, got {len(got)}'
return
for index, (got_val, expected_val) in enumerate(zip(got, expected)):
field_name = str(index) if field is None else f'{field}.{index}'
yield from _iter_differences(got_val, expected_val, field_name)
return
if got != expected:
yield field, f'expected {expected!r}, got {got!r}'
def _expect_value(message, got, expected, field):
mismatches = list(_iter_differences(got, expected, field))
if not mismatches:
return
fields = [field for field, _ in mismatches if field is not None]
return ''.join((
message, f' ({", ".join(fields)})' if fields else '',
*(f'\n\t{field}: {message}' for field, message in mismatches)))
def expect_value(self, got, expected, field):
if message := _expect_value('values differ', got, expected, field):
self.fail(message)
def expect_dict(self, got_dict, expected_dict):
for info_field, expected in expected_dict.items():
got = got_dict.get(info_field)
expect_value(self, got, expected, info_field)
if message := _expect_value('dictionaries differ', got_dict, expected_dict, None):
self.fail(message)
def sanitize_got_info_dict(got_dict):

View File

@@ -6,6 +6,8 @@ import sys
import unittest
from unittest.mock import patch
from yt_dlp.globals import all_plugins_loaded
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -1427,6 +1429,12 @@ class TestYoutubeDL(unittest.TestCase):
self.assertFalse(result.get('cookies'), msg='Cookies set in cookies field for wrong domain')
self.assertFalse(ydl.cookiejar.get_cookie_header(fmt['url']), msg='Cookies set in cookiejar for wrong domain')
def test_load_plugins_compat(self):
# Should try to reload plugins if they haven't already been loaded
all_plugins_loaded.value = False
FakeYDL().close()
assert all_plugins_loaded.value
if __name__ == '__main__':
unittest.main()

View File

@@ -10,22 +10,71 @@ TEST_DATA_DIR = Path(os.path.dirname(os.path.abspath(__file__)), 'testdata')
sys.path.append(str(TEST_DATA_DIR))
importlib.invalidate_caches()
from yt_dlp.utils import Config
from yt_dlp.plugins import PACKAGE_NAME, directories, load_plugins
from yt_dlp.plugins import (
PACKAGE_NAME,
PluginSpec,
directories,
load_plugins,
load_all_plugins,
register_plugin_spec,
)
from yt_dlp.globals import (
extractors,
postprocessors,
plugin_dirs,
plugin_ies,
plugin_pps,
all_plugins_loaded,
plugin_specs,
)
EXTRACTOR_PLUGIN_SPEC = PluginSpec(
module_name='extractor',
suffix='IE',
destination=extractors,
plugin_destination=plugin_ies,
)
POSTPROCESSOR_PLUGIN_SPEC = PluginSpec(
module_name='postprocessor',
suffix='PP',
destination=postprocessors,
plugin_destination=plugin_pps,
)
def reset_plugins():
plugin_ies.value = {}
plugin_pps.value = {}
plugin_dirs.value = ['default']
plugin_specs.value = {}
all_plugins_loaded.value = False
# Clearing override plugins is probably difficult
for module_name in tuple(sys.modules):
for plugin_type in ('extractor', 'postprocessor'):
if module_name.startswith(f'{PACKAGE_NAME}.{plugin_type}.'):
del sys.modules[module_name]
importlib.invalidate_caches()
class TestPlugins(unittest.TestCase):
TEST_PLUGIN_DIR = TEST_DATA_DIR / PACKAGE_NAME
def setUp(self):
reset_plugins()
def tearDown(self):
reset_plugins()
def test_directories_containing_plugins(self):
self.assertIn(self.TEST_PLUGIN_DIR, map(Path, directories()))
def test_extractor_classes(self):
for module_name in tuple(sys.modules):
if module_name.startswith(f'{PACKAGE_NAME}.extractor'):
del sys.modules[module_name]
plugins_ie = load_plugins('extractor', 'IE')
plugins_ie = load_plugins(EXTRACTOR_PLUGIN_SPEC)
self.assertIn(f'{PACKAGE_NAME}.extractor.normal', sys.modules.keys())
self.assertIn('NormalPluginIE', plugins_ie.keys())
@@ -35,17 +84,29 @@ class TestPlugins(unittest.TestCase):
f'{PACKAGE_NAME}.extractor._ignore' in sys.modules,
'loaded module beginning with underscore')
self.assertNotIn('IgnorePluginIE', plugins_ie.keys())
self.assertNotIn('IgnorePluginIE', plugin_ies.value)
# Don't load extractors with underscore prefix
self.assertNotIn('_IgnoreUnderscorePluginIE', plugins_ie.keys())
self.assertNotIn('_IgnoreUnderscorePluginIE', plugin_ies.value)
# Don't load extractors not specified in __all__ (if supplied)
self.assertNotIn('IgnoreNotInAllPluginIE', plugins_ie.keys())
self.assertNotIn('IgnoreNotInAllPluginIE', plugin_ies.value)
self.assertIn('InAllPluginIE', plugins_ie.keys())
self.assertIn('InAllPluginIE', plugin_ies.value)
# Don't load override extractors
self.assertNotIn('OverrideGenericIE', plugins_ie.keys())
self.assertNotIn('OverrideGenericIE', plugin_ies.value)
self.assertNotIn('_UnderscoreOverrideGenericIE', plugins_ie.keys())
self.assertNotIn('_UnderscoreOverrideGenericIE', plugin_ies.value)
def test_postprocessor_classes(self):
plugins_pp = load_plugins('postprocessor', 'PP')
plugins_pp = load_plugins(POSTPROCESSOR_PLUGIN_SPEC)
self.assertIn('NormalPluginPP', plugins_pp.keys())
self.assertIn(f'{PACKAGE_NAME}.postprocessor.normal', sys.modules.keys())
self.assertIn('NormalPluginPP', plugin_pps.value)
def test_importing_zipped_module(self):
zip_path = TEST_DATA_DIR / 'zipped_plugins.zip'
@@ -58,10 +119,10 @@ class TestPlugins(unittest.TestCase):
package = importlib.import_module(f'{PACKAGE_NAME}.{plugin_type}')
self.assertIn(zip_path / PACKAGE_NAME / plugin_type, map(Path, package.__path__))
plugins_ie = load_plugins('extractor', 'IE')
plugins_ie = load_plugins(EXTRACTOR_PLUGIN_SPEC)
self.assertIn('ZippedPluginIE', plugins_ie.keys())
plugins_pp = load_plugins('postprocessor', 'PP')
plugins_pp = load_plugins(POSTPROCESSOR_PLUGIN_SPEC)
self.assertIn('ZippedPluginPP', plugins_pp.keys())
finally:
@@ -69,23 +130,116 @@ class TestPlugins(unittest.TestCase):
os.remove(zip_path)
importlib.invalidate_caches() # reset the import caches
def test_plugin_dirs(self):
# Internal plugin dirs hack for CLI --plugin-dirs
# To be replaced with proper system later
custom_plugin_dir = TEST_DATA_DIR / 'plugin_packages'
Config._plugin_dirs = [str(custom_plugin_dir)]
importlib.invalidate_caches() # reset the import caches
def test_reloading_plugins(self):
reload_plugins_path = TEST_DATA_DIR / 'reload_plugins'
load_plugins(EXTRACTOR_PLUGIN_SPEC)
load_plugins(POSTPROCESSOR_PLUGIN_SPEC)
# Remove default folder and add reload_plugin path
sys.path.remove(str(TEST_DATA_DIR))
sys.path.append(str(reload_plugins_path))
importlib.invalidate_caches()
try:
package = importlib.import_module(f'{PACKAGE_NAME}.extractor')
self.assertIn(custom_plugin_dir / 'testpackage' / PACKAGE_NAME / 'extractor', map(Path, package.__path__))
for plugin_type in ('extractor', 'postprocessor'):
package = importlib.import_module(f'{PACKAGE_NAME}.{plugin_type}')
self.assertIn(reload_plugins_path / PACKAGE_NAME / plugin_type, map(Path, package.__path__))
plugins_ie = load_plugins('extractor', 'IE')
self.assertIn('PackagePluginIE', plugins_ie.keys())
plugins_ie = load_plugins(EXTRACTOR_PLUGIN_SPEC)
self.assertIn('NormalPluginIE', plugins_ie.keys())
self.assertTrue(
plugins_ie['NormalPluginIE'].REPLACED,
msg='Reloading has not replaced original extractor plugin')
self.assertTrue(
extractors.value['NormalPluginIE'].REPLACED,
msg='Reloading has not replaced original extractor plugin globally')
plugins_pp = load_plugins(POSTPROCESSOR_PLUGIN_SPEC)
self.assertIn('NormalPluginPP', plugins_pp.keys())
self.assertTrue(plugins_pp['NormalPluginPP'].REPLACED,
msg='Reloading has not replaced original postprocessor plugin')
self.assertTrue(
postprocessors.value['NormalPluginPP'].REPLACED,
msg='Reloading has not replaced original postprocessor plugin globally')
finally:
Config._plugin_dirs = []
importlib.invalidate_caches() # reset the import caches
sys.path.remove(str(reload_plugins_path))
sys.path.append(str(TEST_DATA_DIR))
importlib.invalidate_caches()
def test_extractor_override_plugin(self):
load_plugins(EXTRACTOR_PLUGIN_SPEC)
from yt_dlp.extractor.generic import GenericIE
self.assertEqual(GenericIE.TEST_FIELD, 'override')
self.assertEqual(GenericIE.SECONDARY_TEST_FIELD, 'underscore-override')
self.assertEqual(GenericIE.IE_NAME, 'generic+override+underscore-override')
importlib.invalidate_caches()
# test that loading a second time doesn't wrap a second time
load_plugins(EXTRACTOR_PLUGIN_SPEC)
from yt_dlp.extractor.generic import GenericIE
self.assertEqual(GenericIE.IE_NAME, 'generic+override+underscore-override')
def test_load_all_plugin_types(self):
# no plugin specs registered
load_all_plugins()
self.assertNotIn(f'{PACKAGE_NAME}.extractor.normal', sys.modules.keys())
self.assertNotIn(f'{PACKAGE_NAME}.postprocessor.normal', sys.modules.keys())
register_plugin_spec(EXTRACTOR_PLUGIN_SPEC)
register_plugin_spec(POSTPROCESSOR_PLUGIN_SPEC)
load_all_plugins()
self.assertTrue(all_plugins_loaded.value)
self.assertIn(f'{PACKAGE_NAME}.extractor.normal', sys.modules.keys())
self.assertIn(f'{PACKAGE_NAME}.postprocessor.normal', sys.modules.keys())
def test_no_plugin_dirs(self):
register_plugin_spec(EXTRACTOR_PLUGIN_SPEC)
register_plugin_spec(POSTPROCESSOR_PLUGIN_SPEC)
plugin_dirs.value = []
load_all_plugins()
self.assertNotIn(f'{PACKAGE_NAME}.extractor.normal', sys.modules.keys())
self.assertNotIn(f'{PACKAGE_NAME}.postprocessor.normal', sys.modules.keys())
def test_set_plugin_dirs(self):
custom_plugin_dir = str(TEST_DATA_DIR / 'plugin_packages')
plugin_dirs.value = [custom_plugin_dir]
load_plugins(EXTRACTOR_PLUGIN_SPEC)
self.assertIn(f'{PACKAGE_NAME}.extractor.package', sys.modules.keys())
self.assertIn('PackagePluginIE', plugin_ies.value)
def test_invalid_plugin_dir(self):
plugin_dirs.value = ['invalid_dir']
with self.assertRaises(ValueError):
load_plugins(EXTRACTOR_PLUGIN_SPEC)
def test_append_plugin_dirs(self):
custom_plugin_dir = str(TEST_DATA_DIR / 'plugin_packages')
self.assertEqual(plugin_dirs.value, ['default'])
plugin_dirs.value.append(custom_plugin_dir)
self.assertEqual(plugin_dirs.value, ['default', custom_plugin_dir])
load_plugins(EXTRACTOR_PLUGIN_SPEC)
self.assertIn(f'{PACKAGE_NAME}.extractor.package', sys.modules.keys())
self.assertIn('PackagePluginIE', plugin_ies.value)
def test_get_plugin_spec(self):
register_plugin_spec(EXTRACTOR_PLUGIN_SPEC)
register_plugin_spec(POSTPROCESSOR_PLUGIN_SPEC)
self.assertEqual(plugin_specs.value.get('extractor'), EXTRACTOR_PLUGIN_SPEC)
self.assertEqual(plugin_specs.value.get('postprocessor'), POSTPROCESSOR_PLUGIN_SPEC)
self.assertIsNone(plugin_specs.value.get('invalid'))
if __name__ == '__main__':

View File

@@ -2,4 +2,5 @@ from yt_dlp.extractor.common import InfoExtractor
class PackagePluginIE(InfoExtractor):
_VALID_URL = 'package'
pass

View File

@@ -0,0 +1,10 @@
from yt_dlp.extractor.common import InfoExtractor
class NormalPluginIE(InfoExtractor):
_VALID_URL = 'normal'
REPLACED = True
class _IgnoreUnderscorePluginIE(InfoExtractor):
pass

View File

@@ -0,0 +1,5 @@
from yt_dlp.postprocessor.common import PostProcessor
class NormalPluginPP(PostProcessor):
REPLACED = True

View File

@@ -6,6 +6,7 @@ class IgnoreNotInAllPluginIE(InfoExtractor):
class InAllPluginIE(InfoExtractor):
_VALID_URL = 'inallpluginie'
pass

View File

@@ -2,8 +2,10 @@ from yt_dlp.extractor.common import InfoExtractor
class NormalPluginIE(InfoExtractor):
pass
_VALID_URL = 'normalpluginie'
REPLACED = False
class _IgnoreUnderscorePluginIE(InfoExtractor):
_VALID_URL = 'ignoreunderscorepluginie'
pass

View File

@@ -0,0 +1,5 @@
from yt_dlp.extractor.generic import GenericIE
class OverrideGenericIE(GenericIE, plugin_name='override'):
TEST_FIELD = 'override'

View File

@@ -0,0 +1,5 @@
from yt_dlp.extractor.generic import GenericIE
class _UnderscoreOverrideGenericIE(GenericIE, plugin_name='underscore-override'):
SECONDARY_TEST_FIELD = 'underscore-override'

View File

@@ -2,4 +2,4 @@ from yt_dlp.postprocessor.common import PostProcessor
class NormalPluginPP(PostProcessor):
pass
REPLACED = False

View File

@@ -2,4 +2,5 @@ from yt_dlp.extractor.common import InfoExtractor
class ZippedPluginIE(InfoExtractor):
_VALID_URL = 'zippedpluginie'
pass