mirror of
				https://github.com/yt-dlp/yt-dlp.git
				synced 2025-11-04 00:25:15 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			241 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			241 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""No longer used and new code should not use. Exists only for API compat."""
 | 
						|
import platform
 | 
						|
import struct
 | 
						|
import sys
 | 
						|
import urllib.error
 | 
						|
import urllib.parse
 | 
						|
import urllib.request
 | 
						|
import zlib
 | 
						|
 | 
						|
from ._utils import Popen, decode_base_n, preferredencoding
 | 
						|
from .traversal import traverse_obj
 | 
						|
from ..dependencies import certifi, websockets
 | 
						|
from ..networking._helper import make_ssl_context
 | 
						|
from ..networking._urllib import HTTPHandler
 | 
						|
 | 
						|
# isort: split
 | 
						|
from .networking import random_user_agent, std_headers  # noqa: F401
 | 
						|
from ..cookies import YoutubeDLCookieJar  # noqa: F401
 | 
						|
from ..networking._urllib import PUTRequest  # noqa: F401
 | 
						|
from ..networking._urllib import SUPPORTED_ENCODINGS, HEADRequest  # noqa: F401
 | 
						|
from ..networking._urllib import ProxyHandler as PerRequestProxyHandler  # noqa: F401
 | 
						|
from ..networking._urllib import RedirectHandler as YoutubeDLRedirectHandler  # noqa: F401
 | 
						|
from ..networking._urllib import (  # noqa: F401
 | 
						|
    make_socks_conn_class,
 | 
						|
    update_Request,
 | 
						|
)
 | 
						|
from ..networking.exceptions import HTTPError, network_exceptions  # noqa: F401
 | 
						|
 | 
						|
has_certifi = bool(certifi)
 | 
						|
has_websockets = bool(websockets)
 | 
						|
 | 
						|
 | 
						|
def load_plugins(name, suffix, namespace):
 | 
						|
    from ..plugins import load_plugins
 | 
						|
    ret = load_plugins(name, suffix)
 | 
						|
    namespace.update(ret)
 | 
						|
    return ret
 | 
						|
 | 
						|
 | 
						|
def traverse_dict(dictn, keys, casesense=True):
 | 
						|
    return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True)
 | 
						|
 | 
						|
 | 
						|
def decode_base(value, digits):
 | 
						|
    return decode_base_n(value, table=digits)
 | 
						|
 | 
						|
 | 
						|
def platform_name():
 | 
						|
    """ Returns the platform name as a str """
 | 
						|
    return platform.platform()
 | 
						|
 | 
						|
 | 
						|
def get_subprocess_encoding():
 | 
						|
    if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
 | 
						|
        # For subprocess calls, encode with locale encoding
 | 
						|
        # Refer to http://stackoverflow.com/a/9951851/35070
 | 
						|
        encoding = preferredencoding()
 | 
						|
    else:
 | 
						|
        encoding = sys.getfilesystemencoding()
 | 
						|
    if encoding is None:
 | 
						|
        encoding = 'utf-8'
 | 
						|
    return encoding
 | 
						|
 | 
						|
 | 
						|
# UNUSED
 | 
						|
# Based on png2str() written by @gdkchan and improved by @yokrysty
 | 
						|
# Originally posted at https://github.com/ytdl-org/youtube-dl/issues/9706
 | 
						|
def decode_png(png_data):
 | 
						|
    # Reference: https://www.w3.org/TR/PNG/
 | 
						|
    header = png_data[8:]
 | 
						|
 | 
						|
    if png_data[:8] != b'\x89PNG\x0d\x0a\x1a\x0a' or header[4:8] != b'IHDR':
 | 
						|
        raise OSError('Not a valid PNG file.')
 | 
						|
 | 
						|
    int_map = {1: '>B', 2: '>H', 4: '>I'}
 | 
						|
    unpack_integer = lambda x: struct.unpack(int_map[len(x)], x)[0]
 | 
						|
 | 
						|
    chunks = []
 | 
						|
 | 
						|
    while header:
 | 
						|
        length = unpack_integer(header[:4])
 | 
						|
        header = header[4:]
 | 
						|
 | 
						|
        chunk_type = header[:4]
 | 
						|
        header = header[4:]
 | 
						|
 | 
						|
        chunk_data = header[:length]
 | 
						|
        header = header[length:]
 | 
						|
 | 
						|
        header = header[4:]  # Skip CRC
 | 
						|
 | 
						|
        chunks.append({
 | 
						|
            'type': chunk_type,
 | 
						|
            'length': length,
 | 
						|
            'data': chunk_data
 | 
						|
        })
 | 
						|
 | 
						|
    ihdr = chunks[0]['data']
 | 
						|
 | 
						|
    width = unpack_integer(ihdr[:4])
 | 
						|
    height = unpack_integer(ihdr[4:8])
 | 
						|
 | 
						|
    idat = b''
 | 
						|
 | 
						|
    for chunk in chunks:
 | 
						|
        if chunk['type'] == b'IDAT':
 | 
						|
            idat += chunk['data']
 | 
						|
 | 
						|
    if not idat:
 | 
						|
        raise OSError('Unable to read PNG data.')
 | 
						|
 | 
						|
    decompressed_data = bytearray(zlib.decompress(idat))
 | 
						|
 | 
						|
    stride = width * 3
 | 
						|
    pixels = []
 | 
						|
 | 
						|
    def _get_pixel(idx):
 | 
						|
        x = idx % stride
 | 
						|
        y = idx // stride
 | 
						|
        return pixels[y][x]
 | 
						|
 | 
						|
    for y in range(height):
 | 
						|
        basePos = y * (1 + stride)
 | 
						|
        filter_type = decompressed_data[basePos]
 | 
						|
 | 
						|
        current_row = []
 | 
						|
 | 
						|
        pixels.append(current_row)
 | 
						|
 | 
						|
        for x in range(stride):
 | 
						|
            color = decompressed_data[1 + basePos + x]
 | 
						|
            basex = y * stride + x
 | 
						|
            left = 0
 | 
						|
            up = 0
 | 
						|
 | 
						|
            if x > 2:
 | 
						|
                left = _get_pixel(basex - 3)
 | 
						|
            if y > 0:
 | 
						|
                up = _get_pixel(basex - stride)
 | 
						|
 | 
						|
            if filter_type == 1:  # Sub
 | 
						|
                color = (color + left) & 0xff
 | 
						|
            elif filter_type == 2:  # Up
 | 
						|
                color = (color + up) & 0xff
 | 
						|
            elif filter_type == 3:  # Average
 | 
						|
                color = (color + ((left + up) >> 1)) & 0xff
 | 
						|
            elif filter_type == 4:  # Paeth
 | 
						|
                a = left
 | 
						|
                b = up
 | 
						|
                c = 0
 | 
						|
 | 
						|
                if x > 2 and y > 0:
 | 
						|
                    c = _get_pixel(basex - stride - 3)
 | 
						|
 | 
						|
                p = a + b - c
 | 
						|
 | 
						|
                pa = abs(p - a)
 | 
						|
                pb = abs(p - b)
 | 
						|
                pc = abs(p - c)
 | 
						|
 | 
						|
                if pa <= pb and pa <= pc:
 | 
						|
                    color = (color + a) & 0xff
 | 
						|
                elif pb <= pc:
 | 
						|
                    color = (color + b) & 0xff
 | 
						|
                else:
 | 
						|
                    color = (color + c) & 0xff
 | 
						|
 | 
						|
            current_row.append(color)
 | 
						|
 | 
						|
    return width, height, pixels
 | 
						|
 | 
						|
 | 
						|
def register_socks_protocols():
 | 
						|
    # "Register" SOCKS protocols
 | 
						|
    # In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904
 | 
						|
    # URLs with protocols not in urlparse.uses_netloc are not handled correctly
 | 
						|
    for scheme in ('socks', 'socks4', 'socks4a', 'socks5'):
 | 
						|
        if scheme not in urllib.parse.uses_netloc:
 | 
						|
            urllib.parse.uses_netloc.append(scheme)
 | 
						|
 | 
						|
 | 
						|
def handle_youtubedl_headers(headers):
 | 
						|
    filtered_headers = headers
 | 
						|
 | 
						|
    if 'Youtubedl-no-compression' in filtered_headers:
 | 
						|
        filtered_headers = {k: v for k, v in filtered_headers.items() if k.lower() != 'accept-encoding'}
 | 
						|
        del filtered_headers['Youtubedl-no-compression']
 | 
						|
 | 
						|
    return filtered_headers
 | 
						|
 | 
						|
 | 
						|
def request_to_url(req):
 | 
						|
    if isinstance(req, urllib.request.Request):
 | 
						|
        return req.get_full_url()
 | 
						|
    else:
 | 
						|
        return req
 | 
						|
 | 
						|
 | 
						|
def sanitized_Request(url, *args, **kwargs):
 | 
						|
    from ..utils import escape_url, extract_basic_auth, sanitize_url
 | 
						|
    url, auth_header = extract_basic_auth(escape_url(sanitize_url(url)))
 | 
						|
    if auth_header is not None:
 | 
						|
        headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {})
 | 
						|
        headers['Authorization'] = auth_header
 | 
						|
    return urllib.request.Request(url, *args, **kwargs)
 | 
						|
 | 
						|
 | 
						|
class YoutubeDLHandler(HTTPHandler):
 | 
						|
    def __init__(self, params, *args, **kwargs):
 | 
						|
        self._params = params
 | 
						|
        super().__init__(*args, **kwargs)
 | 
						|
 | 
						|
 | 
						|
YoutubeDLHTTPSHandler = YoutubeDLHandler
 | 
						|
 | 
						|
 | 
						|
class YoutubeDLCookieProcessor(urllib.request.HTTPCookieProcessor):
 | 
						|
    def __init__(self, cookiejar=None):
 | 
						|
        urllib.request.HTTPCookieProcessor.__init__(self, cookiejar)
 | 
						|
 | 
						|
    def http_response(self, request, response):
 | 
						|
        return urllib.request.HTTPCookieProcessor.http_response(self, request, response)
 | 
						|
 | 
						|
    https_request = urllib.request.HTTPCookieProcessor.http_request
 | 
						|
    https_response = http_response
 | 
						|
 | 
						|
 | 
						|
def make_HTTPS_handler(params, **kwargs):
 | 
						|
    return YoutubeDLHTTPSHandler(params, context=make_ssl_context(
 | 
						|
        verify=not params.get('nocheckcertificate'),
 | 
						|
        client_certificate=params.get('client_certificate'),
 | 
						|
        client_certificate_key=params.get('client_certificate_key'),
 | 
						|
        client_certificate_password=params.get('client_certificate_password'),
 | 
						|
        legacy_support=params.get('legacyserverconnect'),
 | 
						|
        use_certifi='no-certifi' not in params.get('compat_opts', []),
 | 
						|
    ), **kwargs)
 | 
						|
 | 
						|
 | 
						|
def process_communicate_or_kill(p, *args, **kwargs):
 | 
						|
    return Popen.communicate_or_kill(p, *args, **kwargs)
 |