mirror of
				https://github.com/yt-dlp/yt-dlp.git
				synced 2025-10-25 03:31:03 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			377 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			377 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import abc
 | |
| import base64
 | |
| import contextlib
 | |
| import functools
 | |
| import json
 | |
| import os
 | |
| import random
 | |
| import ssl
 | |
| import threading
 | |
| from http.server import BaseHTTPRequestHandler
 | |
| from socketserver import ThreadingTCPServer
 | |
| 
 | |
| import pytest
 | |
| 
 | |
| from test.helper import http_server_port, verify_address_availability
 | |
| from test.test_networking import TEST_DIR
 | |
| from test.test_socks import IPv6ThreadingTCPServer
 | |
| from yt_dlp.dependencies import urllib3
 | |
| from yt_dlp.networking import Request
 | |
| from yt_dlp.networking.exceptions import HTTPError, ProxyError, SSLError
 | |
| 
 | |
| 
 | |
| class HTTPProxyAuthMixin:
 | |
| 
 | |
|     def proxy_auth_error(self):
 | |
|         self.send_response(407)
 | |
|         self.send_header('Proxy-Authenticate', 'Basic realm="test http proxy"')
 | |
|         self.end_headers()
 | |
|         return False
 | |
| 
 | |
|     def do_proxy_auth(self, username, password):
 | |
|         if username is None and password is None:
 | |
|             return True
 | |
| 
 | |
|         proxy_auth_header = self.headers.get('Proxy-Authorization', None)
 | |
|         if proxy_auth_header is None:
 | |
|             return self.proxy_auth_error()
 | |
| 
 | |
|         if not proxy_auth_header.startswith('Basic '):
 | |
|             return self.proxy_auth_error()
 | |
| 
 | |
|         auth = proxy_auth_header[6:]
 | |
| 
 | |
|         try:
 | |
|             auth_username, auth_password = base64.b64decode(auth).decode().split(':', 1)
 | |
|         except Exception:
 | |
|             return self.proxy_auth_error()
 | |
| 
 | |
|         if auth_username != (username or '') or auth_password != (password or ''):
 | |
|             return self.proxy_auth_error()
 | |
|         return True
 | |
| 
 | |
| 
 | |
| class HTTPProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin):
 | |
|     def __init__(self, *args, proxy_info=None, username=None, password=None, request_handler=None, **kwargs):
 | |
|         self.username = username
 | |
|         self.password = password
 | |
|         self.proxy_info = proxy_info
 | |
|         super().__init__(*args, **kwargs)
 | |
| 
 | |
|     def do_GET(self):
 | |
|         if not self.do_proxy_auth(self.username, self.password):
 | |
|             self.server.close_request(self.request)
 | |
|             return
 | |
|         if self.path.endswith('/proxy_info'):
 | |
|             payload = json.dumps(self.proxy_info or {
 | |
|                 'client_address': self.client_address,
 | |
|                 'connect': False,
 | |
|                 'connect_host': None,
 | |
|                 'connect_port': None,
 | |
|                 'headers': dict(self.headers),
 | |
|                 'path': self.path,
 | |
|                 'proxy': ':'.join(str(y) for y in self.connection.getsockname()),
 | |
|             })
 | |
|             self.send_response(200)
 | |
|             self.send_header('Content-Type', 'application/json; charset=utf-8')
 | |
|             self.send_header('Content-Length', str(len(payload)))
 | |
|             self.end_headers()
 | |
|             self.wfile.write(payload.encode())
 | |
|         else:
 | |
|             self.send_response(404)
 | |
|             self.end_headers()
 | |
| 
 | |
|         self.server.close_request(self.request)
 | |
| 
 | |
| 
 | |
| if urllib3:
 | |
|     import urllib3.util.ssltransport
 | |
| 
 | |
|     class SSLTransport(urllib3.util.ssltransport.SSLTransport):
 | |
|         """
 | |
|         Modified version of urllib3 SSLTransport to support server side SSL
 | |
| 
 | |
|         This allows us to chain multiple TLS connections.
 | |
|         """
 | |
| 
 | |
|         def __init__(self, socket, ssl_context, server_hostname=None, suppress_ragged_eofs=True, server_side=False):
 | |
|             self.incoming = ssl.MemoryBIO()
 | |
|             self.outgoing = ssl.MemoryBIO()
 | |
| 
 | |
|             self.suppress_ragged_eofs = suppress_ragged_eofs
 | |
|             self.socket = socket
 | |
| 
 | |
|             self.sslobj = ssl_context.wrap_bio(
 | |
|                 self.incoming,
 | |
|                 self.outgoing,
 | |
|                 server_hostname=server_hostname,
 | |
|                 server_side=server_side,
 | |
|             )
 | |
|             self._ssl_io_loop(self.sslobj.do_handshake)
 | |
| 
 | |
|         @property
 | |
|         def _io_refs(self):
 | |
|             return self.socket._io_refs
 | |
| 
 | |
|         @_io_refs.setter
 | |
|         def _io_refs(self, value):
 | |
|             self.socket._io_refs = value
 | |
| 
 | |
|         def shutdown(self, *args, **kwargs):
 | |
|             self.socket.shutdown(*args, **kwargs)
 | |
| else:
 | |
|     SSLTransport = None
 | |
| 
 | |
| 
 | |
| class HTTPSProxyHandler(HTTPProxyHandler):
 | |
|     def __init__(self, request, *args, **kwargs):
 | |
|         certfn = os.path.join(TEST_DIR, 'testcert.pem')
 | |
|         sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
 | |
|         sslctx.load_cert_chain(certfn, None)
 | |
|         if isinstance(request, ssl.SSLSocket):
 | |
|             request = SSLTransport(request, ssl_context=sslctx, server_side=True)
 | |
|         else:
 | |
|             request = sslctx.wrap_socket(request, server_side=True)
 | |
|         super().__init__(request, *args, **kwargs)
 | |
| 
 | |
| 
 | |
| class HTTPConnectProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin):
 | |
|     protocol_version = 'HTTP/1.1'
 | |
|     default_request_version = 'HTTP/1.1'
 | |
| 
 | |
|     def __init__(self, *args, username=None, password=None, request_handler=None, **kwargs):
 | |
|         self.username = username
 | |
|         self.password = password
 | |
|         self.request_handler = request_handler
 | |
|         super().__init__(*args, **kwargs)
 | |
| 
 | |
|     def do_CONNECT(self):
 | |
|         if not self.do_proxy_auth(self.username, self.password):
 | |
|             self.server.close_request(self.request)
 | |
|             return
 | |
|         self.send_response(200)
 | |
|         self.end_headers()
 | |
|         proxy_info = {
 | |
|             'client_address': self.client_address,
 | |
|             'connect': True,
 | |
|             'connect_host': self.path.split(':')[0],
 | |
|             'connect_port': int(self.path.split(':')[1]),
 | |
|             'headers': dict(self.headers),
 | |
|             'path': self.path,
 | |
|             'proxy': ':'.join(str(y) for y in self.connection.getsockname()),
 | |
|         }
 | |
|         self.request_handler(self.request, self.client_address, self.server, proxy_info=proxy_info)
 | |
|         self.server.close_request(self.request)
 | |
| 
 | |
| 
 | |
| class HTTPSConnectProxyHandler(HTTPConnectProxyHandler):
 | |
|     def __init__(self, request, *args, **kwargs):
 | |
|         certfn = os.path.join(TEST_DIR, 'testcert.pem')
 | |
|         sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
 | |
|         sslctx.load_cert_chain(certfn, None)
 | |
|         request = sslctx.wrap_socket(request, server_side=True)
 | |
|         self._original_request = request
 | |
|         super().__init__(request, *args, **kwargs)
 | |
| 
 | |
|     def do_CONNECT(self):
 | |
|         super().do_CONNECT()
 | |
|         self.server.close_request(self._original_request)
 | |
| 
 | |
| 
 | |
| @contextlib.contextmanager
 | |
| def proxy_server(proxy_server_class, request_handler, bind_ip=None, **proxy_server_kwargs):
 | |
|     server = server_thread = None
 | |
|     try:
 | |
|         bind_address = bind_ip or '127.0.0.1'
 | |
|         server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
 | |
|         server = server_type(
 | |
|             (bind_address, 0), functools.partial(proxy_server_class, request_handler=request_handler, **proxy_server_kwargs))
 | |
|         server_port = http_server_port(server)
 | |
|         server_thread = threading.Thread(target=server.serve_forever)
 | |
|         server_thread.daemon = True
 | |
|         server_thread.start()
 | |
|         if '.' not in bind_address:
 | |
|             yield f'[{bind_address}]:{server_port}'
 | |
|         else:
 | |
|             yield f'{bind_address}:{server_port}'
 | |
|     finally:
 | |
|         server.shutdown()
 | |
|         server.server_close()
 | |
|         server_thread.join(2.0)
 | |
| 
 | |
| 
 | |
| class HTTPProxyTestContext(abc.ABC):
 | |
|     REQUEST_HANDLER_CLASS = None
 | |
|     REQUEST_PROTO = None
 | |
| 
 | |
|     def http_server(self, server_class, *args, **kwargs):
 | |
|         return proxy_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
 | |
| 
 | |
|     @abc.abstractmethod
 | |
|     def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
 | |
|         """return a dict of proxy_info"""
 | |
| 
 | |
| 
 | |
| class HTTPProxyHTTPTestContext(HTTPProxyTestContext):
 | |
|     # Standard HTTP Proxy for http requests
 | |
|     REQUEST_HANDLER_CLASS = HTTPProxyHandler
 | |
|     REQUEST_PROTO = 'http'
 | |
| 
 | |
|     def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
 | |
|         request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs)
 | |
|         handler.validate(request)
 | |
|         return json.loads(handler.send(request).read().decode())
 | |
| 
 | |
| 
 | |
| class HTTPProxyHTTPSTestContext(HTTPProxyTestContext):
 | |
|     # HTTP Connect proxy, for https requests
 | |
|     REQUEST_HANDLER_CLASS = HTTPSProxyHandler
 | |
|     REQUEST_PROTO = 'https'
 | |
| 
 | |
|     def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
 | |
|         request = Request(f'https://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs)
 | |
|         handler.validate(request)
 | |
|         return json.loads(handler.send(request).read().decode())
 | |
| 
 | |
| 
 | |
| CTX_MAP = {
 | |
|     'http': HTTPProxyHTTPTestContext,
 | |
|     'https': HTTPProxyHTTPSTestContext,
 | |
| }
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope='module')
 | |
| def ctx(request):
 | |
|     return CTX_MAP[request.param]()
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize(
 | |
|     'handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
 | |
| @pytest.mark.parametrize('ctx', ['http'], indirect=True)  # pure http proxy can only support http
 | |
| class TestHTTPProxy:
 | |
|     def test_http_no_auth(self, handler, ctx):
 | |
|         with ctx.http_server(HTTPProxyHandler) as server_address:
 | |
|             with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
 | |
|                 proxy_info = ctx.proxy_info_request(rh)
 | |
|                 assert proxy_info['proxy'] == server_address
 | |
|                 assert proxy_info['connect'] is False
 | |
|                 assert 'Proxy-Authorization' not in proxy_info['headers']
 | |
| 
 | |
|     def test_http_auth(self, handler, ctx):
 | |
|         with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address:
 | |
|             with handler(proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh:
 | |
|                 proxy_info = ctx.proxy_info_request(rh)
 | |
|                 assert proxy_info['proxy'] == server_address
 | |
|                 assert 'Proxy-Authorization' in proxy_info['headers']
 | |
| 
 | |
|     def test_http_bad_auth(self, handler, ctx):
 | |
|         with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address:
 | |
|             with handler(proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh:
 | |
|                 with pytest.raises(HTTPError) as exc_info:
 | |
|                     ctx.proxy_info_request(rh)
 | |
|                 assert exc_info.value.response.status == 407
 | |
|                 exc_info.value.response.close()
 | |
| 
 | |
|     def test_http_source_address(self, handler, ctx):
 | |
|         with ctx.http_server(HTTPProxyHandler) as server_address:
 | |
|             source_address = f'127.0.0.{random.randint(5, 255)}'
 | |
|             verify_address_availability(source_address)
 | |
|             with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'},
 | |
|                          source_address=source_address) as rh:
 | |
|                 proxy_info = ctx.proxy_info_request(rh)
 | |
|                 assert proxy_info['proxy'] == server_address
 | |
|                 assert proxy_info['client_address'][0] == source_address
 | |
| 
 | |
|     @pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
 | |
|     def test_https(self, handler, ctx):
 | |
|         with ctx.http_server(HTTPSProxyHandler) as server_address:
 | |
|             with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
 | |
|                 proxy_info = ctx.proxy_info_request(rh)
 | |
|                 assert proxy_info['proxy'] == server_address
 | |
|                 assert proxy_info['connect'] is False
 | |
|                 assert 'Proxy-Authorization' not in proxy_info['headers']
 | |
| 
 | |
|     @pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
 | |
|     def test_https_verify_failed(self, handler, ctx):
 | |
|         with ctx.http_server(HTTPSProxyHandler) as server_address:
 | |
|             with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
 | |
|                 # Accept SSLError as may not be feasible to tell if it is proxy or request error.
 | |
|                 # note: if request proto also does ssl verification, this may also be the error of the request.
 | |
|                 # Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases.
 | |
|                 with pytest.raises((ProxyError, SSLError)):
 | |
|                     ctx.proxy_info_request(rh)
 | |
| 
 | |
|     def test_http_with_idn(self, handler, ctx):
 | |
|         with ctx.http_server(HTTPProxyHandler) as server_address:
 | |
|             with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
 | |
|                 proxy_info = ctx.proxy_info_request(rh, target_domain='中文.tw')
 | |
|                 assert proxy_info['proxy'] == server_address
 | |
|                 assert proxy_info['path'].startswith('http://xn--fiq228c.tw')
 | |
|                 assert proxy_info['headers']['Host'].split(':', 1)[0] == 'xn--fiq228c.tw'
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize(
 | |
|     'handler,ctx', [
 | |
|         ('Requests', 'https'),
 | |
|         ('CurlCFFI', 'https'),
 | |
|     ], indirect=True)
 | |
| class TestHTTPConnectProxy:
 | |
|     def test_http_connect_no_auth(self, handler, ctx):
 | |
|         with ctx.http_server(HTTPConnectProxyHandler) as server_address:
 | |
|             with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
 | |
|                 proxy_info = ctx.proxy_info_request(rh)
 | |
|                 assert proxy_info['proxy'] == server_address
 | |
|                 assert proxy_info['connect'] is True
 | |
|                 assert 'Proxy-Authorization' not in proxy_info['headers']
 | |
| 
 | |
|     def test_http_connect_auth(self, handler, ctx):
 | |
|         with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address:
 | |
|             with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh:
 | |
|                 proxy_info = ctx.proxy_info_request(rh)
 | |
|                 assert proxy_info['proxy'] == server_address
 | |
|                 assert 'Proxy-Authorization' in proxy_info['headers']
 | |
| 
 | |
|     def test_http_connect_bad_auth(self, handler, ctx):
 | |
|         with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address:
 | |
|             with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh:
 | |
|                 with pytest.raises(ProxyError):
 | |
|                     ctx.proxy_info_request(rh)
 | |
| 
 | |
|     def test_http_connect_source_address(self, handler, ctx):
 | |
|         with ctx.http_server(HTTPConnectProxyHandler) as server_address:
 | |
|             source_address = f'127.0.0.{random.randint(5, 255)}'
 | |
|             verify_address_availability(source_address)
 | |
|             with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'},
 | |
|                          source_address=source_address,
 | |
|                          verify=False) as rh:
 | |
|                 proxy_info = ctx.proxy_info_request(rh)
 | |
|                 assert proxy_info['proxy'] == server_address
 | |
|                 assert proxy_info['client_address'][0] == source_address
 | |
| 
 | |
|     @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
 | |
|     def test_https_connect_proxy(self, handler, ctx):
 | |
|         with ctx.http_server(HTTPSConnectProxyHandler) as server_address:
 | |
|             with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
 | |
|                 proxy_info = ctx.proxy_info_request(rh)
 | |
|                 assert proxy_info['proxy'] == server_address
 | |
|                 assert proxy_info['connect'] is True
 | |
|                 assert 'Proxy-Authorization' not in proxy_info['headers']
 | |
| 
 | |
|     @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
 | |
|     def test_https_connect_verify_failed(self, handler, ctx):
 | |
|         with ctx.http_server(HTTPSConnectProxyHandler) as server_address:
 | |
|             with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
 | |
|                 # Accept SSLError as may not be feasible to tell if it is proxy or request error.
 | |
|                 # note: if request proto also does ssl verification, this may also be the error of the request.
 | |
|                 # Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases.
 | |
|                 with pytest.raises((ProxyError, SSLError)):
 | |
|                     ctx.proxy_info_request(rh)
 | |
| 
 | |
|     @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
 | |
|     def test_https_connect_proxy_auth(self, handler, ctx):
 | |
|         with ctx.http_server(HTTPSConnectProxyHandler, username='test', password='test') as server_address:
 | |
|             with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://test:test@{server_address}'}) as rh:
 | |
|                 proxy_info = ctx.proxy_info_request(rh)
 | |
|                 assert proxy_info['proxy'] == server_address
 | |
|                 assert 'Proxy-Authorization' in proxy_info['headers']
 | 
