mirror of
				https://github.com/yt-dlp/yt-dlp.git
				synced 2025-11-04 00:25:15 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			174 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			174 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import contextlib
 | 
						|
import importlib
 | 
						|
import importlib.abc
 | 
						|
import importlib.machinery
 | 
						|
import importlib.util
 | 
						|
import inspect
 | 
						|
import itertools
 | 
						|
import pkgutil
 | 
						|
import sys
 | 
						|
import traceback
 | 
						|
import zipimport
 | 
						|
from pathlib import Path
 | 
						|
from zipfile import ZipFile
 | 
						|
 | 
						|
from .compat import functools  # isort: split
 | 
						|
from .utils import (
 | 
						|
    get_executable_path,
 | 
						|
    get_system_config_dirs,
 | 
						|
    get_user_config_dirs,
 | 
						|
    orderedSet,
 | 
						|
    write_string,
 | 
						|
)
 | 
						|
 | 
						|
PACKAGE_NAME = 'yt_dlp_plugins'
 | 
						|
COMPAT_PACKAGE_NAME = 'ytdlp_plugins'
 | 
						|
 | 
						|
 | 
						|
class PluginLoader(importlib.abc.Loader):
 | 
						|
    """Dummy loader for virtual namespace packages"""
 | 
						|
 | 
						|
    def exec_module(self, module):
 | 
						|
        return None
 | 
						|
 | 
						|
 | 
						|
@functools.cache
 | 
						|
def dirs_in_zip(archive):
 | 
						|
    try:
 | 
						|
        with ZipFile(archive) as zip_:
 | 
						|
            return set(itertools.chain.from_iterable(
 | 
						|
                Path(file).parents for file in zip_.namelist()))
 | 
						|
    except FileNotFoundError:
 | 
						|
        pass
 | 
						|
    except Exception as e:
 | 
						|
        write_string(f'WARNING: Could not read zip file {archive}: {e}\n')
 | 
						|
    return set()
 | 
						|
 | 
						|
 | 
						|
class PluginFinder(importlib.abc.MetaPathFinder):
 | 
						|
    """
 | 
						|
    This class provides one or multiple namespace packages.
 | 
						|
    It searches in sys.path and yt-dlp config folders for
 | 
						|
    the existing subdirectories from which the modules can be imported
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, *packages):
 | 
						|
        self._zip_content_cache = {}
 | 
						|
        self.packages = set(itertools.chain.from_iterable(
 | 
						|
            itertools.accumulate(name.split('.'), lambda a, b: '.'.join((a, b)))
 | 
						|
            for name in packages))
 | 
						|
 | 
						|
    def search_locations(self, fullname):
 | 
						|
        candidate_locations = []
 | 
						|
 | 
						|
        def _get_package_paths(*root_paths, containing_folder='plugins'):
 | 
						|
            for config_dir in orderedSet(map(Path, root_paths), lazy=True):
 | 
						|
                with contextlib.suppress(OSError):
 | 
						|
                    yield from (config_dir / containing_folder).iterdir()
 | 
						|
 | 
						|
        # Load from yt-dlp config folders
 | 
						|
        candidate_locations.extend(_get_package_paths(
 | 
						|
            *get_user_config_dirs('yt-dlp'),
 | 
						|
            *get_system_config_dirs('yt-dlp'),
 | 
						|
            containing_folder='plugins'))
 | 
						|
 | 
						|
        # Load from yt-dlp-plugins folders
 | 
						|
        candidate_locations.extend(_get_package_paths(
 | 
						|
            get_executable_path(),
 | 
						|
            *get_user_config_dirs(''),
 | 
						|
            *get_system_config_dirs(''),
 | 
						|
            containing_folder='yt-dlp-plugins'))
 | 
						|
 | 
						|
        candidate_locations.extend(map(Path, sys.path))  # PYTHONPATH
 | 
						|
        with contextlib.suppress(ValueError):  # Added when running __main__.py directly
 | 
						|
            candidate_locations.remove(Path(__file__).parent)
 | 
						|
 | 
						|
        parts = Path(*fullname.split('.'))
 | 
						|
        for path in orderedSet(candidate_locations, lazy=True):
 | 
						|
            candidate = path / parts
 | 
						|
            if candidate.is_dir():
 | 
						|
                yield candidate
 | 
						|
            elif path.suffix in ('.zip', '.egg', '.whl') and path.is_file():
 | 
						|
                if parts in dirs_in_zip(path):
 | 
						|
                    yield candidate
 | 
						|
 | 
						|
    def find_spec(self, fullname, path=None, target=None):
 | 
						|
        if fullname not in self.packages:
 | 
						|
            return None
 | 
						|
 | 
						|
        search_locations = list(map(str, self.search_locations(fullname)))
 | 
						|
        if not search_locations:
 | 
						|
            return None
 | 
						|
 | 
						|
        spec = importlib.machinery.ModuleSpec(fullname, PluginLoader(), is_package=True)
 | 
						|
        spec.submodule_search_locations = search_locations
 | 
						|
        return spec
 | 
						|
 | 
						|
    def invalidate_caches(self):
 | 
						|
        dirs_in_zip.cache_clear()
 | 
						|
        for package in self.packages:
 | 
						|
            if package in sys.modules:
 | 
						|
                del sys.modules[package]
 | 
						|
 | 
						|
 | 
						|
def directories():
 | 
						|
    spec = importlib.util.find_spec(PACKAGE_NAME)
 | 
						|
    return spec.submodule_search_locations if spec else []
 | 
						|
 | 
						|
 | 
						|
def iter_modules(subpackage):
 | 
						|
    fullname = f'{PACKAGE_NAME}.{subpackage}'
 | 
						|
    with contextlib.suppress(ModuleNotFoundError):
 | 
						|
        pkg = importlib.import_module(fullname)
 | 
						|
        yield from pkgutil.iter_modules(path=pkg.__path__, prefix=f'{fullname}.')
 | 
						|
 | 
						|
 | 
						|
def load_module(module, module_name, suffix):
 | 
						|
    return inspect.getmembers(module, lambda obj: (
 | 
						|
        inspect.isclass(obj)
 | 
						|
        and obj.__name__.endswith(suffix)
 | 
						|
        and obj.__module__.startswith(module_name)
 | 
						|
        and not obj.__name__.startswith('_')
 | 
						|
        and obj.__name__ in getattr(module, '__all__', [obj.__name__])))
 | 
						|
 | 
						|
 | 
						|
def load_plugins(name, suffix):
 | 
						|
    classes = {}
 | 
						|
 | 
						|
    for finder, module_name, _ in iter_modules(name):
 | 
						|
        if any(x.startswith('_') for x in module_name.split('.')):
 | 
						|
            continue
 | 
						|
        try:
 | 
						|
            if sys.version_info < (3, 10) and isinstance(finder, zipimport.zipimporter):
 | 
						|
                # zipimporter.load_module() is deprecated in 3.10 and removed in 3.12
 | 
						|
                # The exec_module branch below is the replacement for >= 3.10
 | 
						|
                # See: https://docs.python.org/3/library/zipimport.html#zipimport.zipimporter.exec_module
 | 
						|
                module = finder.load_module(module_name)
 | 
						|
            else:
 | 
						|
                spec = finder.find_spec(module_name)
 | 
						|
                module = importlib.util.module_from_spec(spec)
 | 
						|
                sys.modules[module_name] = module
 | 
						|
                spec.loader.exec_module(module)
 | 
						|
        except Exception:
 | 
						|
            write_string(f'Error while importing module {module_name!r}\n{traceback.format_exc(limit=-1)}')
 | 
						|
            continue
 | 
						|
        classes.update(load_module(module, module_name, suffix))
 | 
						|
 | 
						|
    # Compat: old plugin system using __init__.py
 | 
						|
    # Note: plugins imported this way do not show up in directories()
 | 
						|
    # nor are considered part of the yt_dlp_plugins namespace package
 | 
						|
    with contextlib.suppress(FileNotFoundError):
 | 
						|
        spec = importlib.util.spec_from_file_location(
 | 
						|
            name, Path(get_executable_path(), COMPAT_PACKAGE_NAME, name, '__init__.py'))
 | 
						|
        plugins = importlib.util.module_from_spec(spec)
 | 
						|
        sys.modules[spec.name] = plugins
 | 
						|
        spec.loader.exec_module(plugins)
 | 
						|
        classes.update(load_module(plugins, spec.name, suffix))
 | 
						|
 | 
						|
    return classes
 | 
						|
 | 
						|
 | 
						|
sys.meta_path.insert(0, PluginFinder(f'{PACKAGE_NAME}.extractor', f'{PACKAGE_NAME}.postprocessor'))
 | 
						|
 | 
						|
__all__ = ['directories', 'load_plugins', 'PACKAGE_NAME', 'COMPAT_PACKAGE_NAME']
 |