mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2026-02-16 21:45:44 +00:00
[rh:websockets] Migrate websockets to networking framework (#7720)
* Adds a basic WebSocket framework * Introduces new minimum `websockets` version of 12.0 * Deprecates `WebSocketsWrapper` Fixes https://github.com/yt-dlp/yt-dlp/issues/8439 Authored by: coletdjnz
This commit is contained in:
@@ -4052,6 +4052,7 @@ class YoutubeDL:
|
||||
return self._request_director.send(req)
|
||||
except NoSupportingHandlers as e:
|
||||
for ue in e.unsupported_errors:
|
||||
# FIXME: This depends on the order of errors.
|
||||
if not (ue.handler and ue.msg):
|
||||
continue
|
||||
if ue.handler.RH_KEY == 'Urllib' and 'unsupported url scheme: "file"' in ue.msg.lower():
|
||||
@@ -4061,6 +4062,15 @@ class YoutubeDL:
|
||||
if 'unsupported proxy type: "https"' in ue.msg.lower():
|
||||
raise RequestError(
|
||||
'To use an HTTPS proxy for this request, one of the following dependencies needs to be installed: requests')
|
||||
|
||||
elif (
|
||||
re.match(r'unsupported url scheme: "wss?"', ue.msg.lower())
|
||||
and 'websockets' not in self._request_director.handlers
|
||||
):
|
||||
raise RequestError(
|
||||
'This request requires WebSocket support. '
|
||||
'Ensure one of the following dependencies are installed: websockets',
|
||||
cause=ue) from ue
|
||||
raise
|
||||
except SSLError as e:
|
||||
if 'UNSAFE_LEGACY_RENEGOTIATION_DISABLED' in str(e):
|
||||
|
||||
@@ -6,7 +6,7 @@ from . import get_suitable_downloader
|
||||
from .common import FileDownloader
|
||||
from .external import FFmpegFD
|
||||
from ..networking import Request
|
||||
from ..utils import DownloadError, WebSocketsWrapper, str_or_none, try_get
|
||||
from ..utils import DownloadError, str_or_none, try_get
|
||||
|
||||
|
||||
class NiconicoDmcFD(FileDownloader):
|
||||
@@ -64,7 +64,6 @@ class NiconicoLiveFD(FileDownloader):
|
||||
ws_url = info_dict['url']
|
||||
ws_extractor = info_dict['ws']
|
||||
ws_origin_host = info_dict['origin']
|
||||
cookies = info_dict.get('cookies')
|
||||
live_quality = info_dict.get('live_quality', 'high')
|
||||
live_latency = info_dict.get('live_latency', 'high')
|
||||
dl = FFmpegFD(self.ydl, self.params or {})
|
||||
@@ -76,12 +75,7 @@ class NiconicoLiveFD(FileDownloader):
|
||||
|
||||
def communicate_ws(reconnect):
|
||||
if reconnect:
|
||||
ws = WebSocketsWrapper(ws_url, {
|
||||
'Cookies': str_or_none(cookies) or '',
|
||||
'Origin': f'https://{ws_origin_host}',
|
||||
'Accept': '*/*',
|
||||
'User-Agent': self.params['http_headers']['User-Agent'],
|
||||
})
|
||||
ws = self.ydl.urlopen(Request(ws_url, headers={'Origin': f'https://{ws_origin_host}'}))
|
||||
if self.ydl.params.get('verbose', False):
|
||||
self.to_screen('[debug] Sending startWatching request')
|
||||
ws.send(json.dumps({
|
||||
|
||||
@@ -2,11 +2,9 @@ import re
|
||||
|
||||
from .common import InfoExtractor
|
||||
from ..compat import compat_parse_qs
|
||||
from ..dependencies import websockets
|
||||
from ..networking import Request
|
||||
from ..utils import (
|
||||
ExtractorError,
|
||||
WebSocketsWrapper,
|
||||
js_to_json,
|
||||
traverse_obj,
|
||||
update_url_query,
|
||||
@@ -167,8 +165,6 @@ class FC2LiveIE(InfoExtractor):
|
||||
}]
|
||||
|
||||
def _real_extract(self, url):
|
||||
if not websockets:
|
||||
raise ExtractorError('websockets library is not available. Please install it.', expected=True)
|
||||
video_id = self._match_id(url)
|
||||
webpage = self._download_webpage('https://live.fc2.com/%s/' % video_id, video_id)
|
||||
|
||||
@@ -199,13 +195,9 @@ class FC2LiveIE(InfoExtractor):
|
||||
ws_url = update_url_query(control_server['url'], {'control_token': control_server['control_token']})
|
||||
playlist_data = None
|
||||
|
||||
self.to_screen('%s: Fetching HLS playlist info via WebSocket' % video_id)
|
||||
ws = WebSocketsWrapper(ws_url, {
|
||||
'Cookie': str(self._get_cookies('https://live.fc2.com/'))[12:],
|
||||
ws = self._request_webpage(Request(ws_url, headers={
|
||||
'Origin': 'https://live.fc2.com',
|
||||
'Accept': '*/*',
|
||||
'User-Agent': self.get_param('http_headers')['User-Agent'],
|
||||
})
|
||||
}), video_id, note='Fetching HLS playlist info via WebSocket')
|
||||
|
||||
self.write_debug('Sending HLS server request')
|
||||
|
||||
|
||||
@@ -8,12 +8,11 @@ import time
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from .common import InfoExtractor, SearchInfoExtractor
|
||||
from ..dependencies import websockets
|
||||
from ..networking import Request
|
||||
from ..networking.exceptions import HTTPError
|
||||
from ..utils import (
|
||||
ExtractorError,
|
||||
OnDemandPagedList,
|
||||
WebSocketsWrapper,
|
||||
bug_reports_message,
|
||||
clean_html,
|
||||
float_or_none,
|
||||
@@ -934,8 +933,6 @@ class NiconicoLiveIE(InfoExtractor):
|
||||
_KNOWN_LATENCY = ('high', 'low')
|
||||
|
||||
def _real_extract(self, url):
|
||||
if not websockets:
|
||||
raise ExtractorError('websockets library is not available. Please install it.', expected=True)
|
||||
video_id = self._match_id(url)
|
||||
webpage, urlh = self._download_webpage_handle(f'https://live.nicovideo.jp/watch/{video_id}', video_id)
|
||||
|
||||
@@ -950,17 +947,13 @@ class NiconicoLiveIE(InfoExtractor):
|
||||
})
|
||||
|
||||
hostname = remove_start(urlparse(urlh.url).hostname, 'sp.')
|
||||
cookies = try_get(urlh.url, self._downloader._calc_cookies)
|
||||
latency = try_get(self._configuration_arg('latency'), lambda x: x[0])
|
||||
if latency not in self._KNOWN_LATENCY:
|
||||
latency = 'high'
|
||||
|
||||
ws = WebSocketsWrapper(ws_url, {
|
||||
'Cookies': str_or_none(cookies) or '',
|
||||
'Origin': f'https://{hostname}',
|
||||
'Accept': '*/*',
|
||||
'User-Agent': self.get_param('http_headers')['User-Agent'],
|
||||
})
|
||||
ws = self._request_webpage(
|
||||
Request(ws_url, headers={'Origin': f'https://{hostname}'}),
|
||||
video_id=video_id, note='Connecting to WebSocket server')
|
||||
|
||||
self.write_debug('[debug] Sending HLS server request')
|
||||
ws.send(json.dumps({
|
||||
@@ -1034,7 +1027,6 @@ class NiconicoLiveIE(InfoExtractor):
|
||||
'protocol': 'niconico_live',
|
||||
'ws': ws,
|
||||
'video_id': video_id,
|
||||
'cookies': cookies,
|
||||
'live_latency': latency,
|
||||
'origin': hostname,
|
||||
})
|
||||
|
||||
@@ -21,3 +21,11 @@ except ImportError:
|
||||
pass
|
||||
except Exception as e:
|
||||
warnings.warn(f'Failed to import "requests" request handler: {e}' + bug_reports_message())
|
||||
|
||||
try:
|
||||
from . import _websockets
|
||||
except ImportError:
|
||||
pass
|
||||
except Exception as e:
|
||||
warnings.warn(f'Failed to import "websockets" request handler: {e}' + bug_reports_message())
|
||||
|
||||
|
||||
159
yt_dlp/networking/_websockets.py
Normal file
159
yt_dlp/networking/_websockets.py
Normal file
@@ -0,0 +1,159 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import logging
|
||||
import ssl
|
||||
import sys
|
||||
|
||||
from ._helper import create_connection, select_proxy, make_socks_proxy_opts, create_socks_proxy_socket
|
||||
from .common import Response, register_rh, Features
|
||||
from .exceptions import (
|
||||
CertificateVerifyError,
|
||||
HTTPError,
|
||||
RequestError,
|
||||
SSLError,
|
||||
TransportError, ProxyError,
|
||||
)
|
||||
from .websocket import WebSocketRequestHandler, WebSocketResponse
|
||||
from ..compat import functools
|
||||
from ..dependencies import websockets
|
||||
from ..utils import int_or_none
|
||||
from ..socks import ProxyError as SocksProxyError
|
||||
|
||||
if not websockets:
|
||||
raise ImportError('websockets is not installed')
|
||||
|
||||
import websockets.version
|
||||
|
||||
websockets_version = tuple(map(int_or_none, websockets.version.version.split('.')))
|
||||
if websockets_version < (12, 0):
|
||||
raise ImportError('Only websockets>=12.0 is supported')
|
||||
|
||||
import websockets.sync.client
|
||||
from websockets.uri import parse_uri
|
||||
|
||||
|
||||
class WebsocketsResponseAdapter(WebSocketResponse):
|
||||
|
||||
def __init__(self, wsw: websockets.sync.client.ClientConnection, url):
|
||||
super().__init__(
|
||||
fp=io.BytesIO(wsw.response.body or b''),
|
||||
url=url,
|
||||
headers=wsw.response.headers,
|
||||
status=wsw.response.status_code,
|
||||
reason=wsw.response.reason_phrase,
|
||||
)
|
||||
self.wsw = wsw
|
||||
|
||||
def close(self):
|
||||
self.wsw.close()
|
||||
super().close()
|
||||
|
||||
def send(self, message):
|
||||
# https://websockets.readthedocs.io/en/stable/reference/sync/client.html#websockets.sync.client.ClientConnection.send
|
||||
try:
|
||||
return self.wsw.send(message)
|
||||
except (websockets.exceptions.WebSocketException, RuntimeError, TimeoutError) as e:
|
||||
raise TransportError(cause=e) from e
|
||||
except SocksProxyError as e:
|
||||
raise ProxyError(cause=e) from e
|
||||
except TypeError as e:
|
||||
raise RequestError(cause=e) from e
|
||||
|
||||
def recv(self):
|
||||
# https://websockets.readthedocs.io/en/stable/reference/sync/client.html#websockets.sync.client.ClientConnection.recv
|
||||
try:
|
||||
return self.wsw.recv()
|
||||
except SocksProxyError as e:
|
||||
raise ProxyError(cause=e) from e
|
||||
except (websockets.exceptions.WebSocketException, RuntimeError, TimeoutError) as e:
|
||||
raise TransportError(cause=e) from e
|
||||
|
||||
|
||||
@register_rh
|
||||
class WebsocketsRH(WebSocketRequestHandler):
|
||||
"""
|
||||
Websockets request handler
|
||||
https://websockets.readthedocs.io
|
||||
https://github.com/python-websockets/websockets
|
||||
"""
|
||||
_SUPPORTED_URL_SCHEMES = ('wss', 'ws')
|
||||
_SUPPORTED_PROXY_SCHEMES = ('socks4', 'socks4a', 'socks5', 'socks5h')
|
||||
_SUPPORTED_FEATURES = (Features.ALL_PROXY, Features.NO_PROXY)
|
||||
RH_NAME = 'websockets'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
for name in ('websockets.client', 'websockets.server'):
|
||||
logger = logging.getLogger(name)
|
||||
handler = logging.StreamHandler(stream=sys.stdout)
|
||||
handler.setFormatter(logging.Formatter(f'{self.RH_NAME}: %(message)s'))
|
||||
logger.addHandler(handler)
|
||||
if self.verbose:
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
def _check_extensions(self, extensions):
|
||||
super()._check_extensions(extensions)
|
||||
extensions.pop('timeout', None)
|
||||
extensions.pop('cookiejar', None)
|
||||
|
||||
def _send(self, request):
|
||||
timeout = float(request.extensions.get('timeout') or self.timeout)
|
||||
headers = self._merge_headers(request.headers)
|
||||
if 'cookie' not in headers:
|
||||
cookiejar = request.extensions.get('cookiejar') or self.cookiejar
|
||||
cookie_header = cookiejar.get_cookie_header(request.url)
|
||||
if cookie_header:
|
||||
headers['cookie'] = cookie_header
|
||||
|
||||
wsuri = parse_uri(request.url)
|
||||
create_conn_kwargs = {
|
||||
'source_address': (self.source_address, 0) if self.source_address else None,
|
||||
'timeout': timeout
|
||||
}
|
||||
proxy = select_proxy(request.url, request.proxies or self.proxies or {})
|
||||
try:
|
||||
if proxy:
|
||||
socks_proxy_options = make_socks_proxy_opts(proxy)
|
||||
sock = create_connection(
|
||||
address=(socks_proxy_options['addr'], socks_proxy_options['port']),
|
||||
_create_socket_func=functools.partial(
|
||||
create_socks_proxy_socket, (wsuri.host, wsuri.port), socks_proxy_options),
|
||||
**create_conn_kwargs
|
||||
)
|
||||
else:
|
||||
sock = create_connection(
|
||||
address=(wsuri.host, wsuri.port),
|
||||
**create_conn_kwargs
|
||||
)
|
||||
conn = websockets.sync.client.connect(
|
||||
sock=sock,
|
||||
uri=request.url,
|
||||
additional_headers=headers,
|
||||
open_timeout=timeout,
|
||||
user_agent_header=None,
|
||||
ssl_context=self._make_sslcontext() if wsuri.secure else None,
|
||||
close_timeout=0, # not ideal, but prevents yt-dlp hanging
|
||||
)
|
||||
return WebsocketsResponseAdapter(conn, url=request.url)
|
||||
|
||||
# Exceptions as per https://websockets.readthedocs.io/en/stable/reference/sync/client.html
|
||||
except SocksProxyError as e:
|
||||
raise ProxyError(cause=e) from e
|
||||
except websockets.exceptions.InvalidURI as e:
|
||||
raise RequestError(cause=e) from e
|
||||
except ssl.SSLCertVerificationError as e:
|
||||
raise CertificateVerifyError(cause=e) from e
|
||||
except ssl.SSLError as e:
|
||||
raise SSLError(cause=e) from e
|
||||
except websockets.exceptions.InvalidStatus as e:
|
||||
raise HTTPError(
|
||||
Response(
|
||||
fp=io.BytesIO(e.response.body),
|
||||
url=request.url,
|
||||
headers=e.response.headers,
|
||||
status=e.response.status_code,
|
||||
reason=e.response.reason_phrase),
|
||||
) from e
|
||||
except (OSError, TimeoutError, websockets.exceptions.WebSocketException) as e:
|
||||
raise TransportError(cause=e) from e
|
||||
23
yt_dlp/networking/websocket.py
Normal file
23
yt_dlp/networking/websocket.py
Normal file
@@ -0,0 +1,23 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
|
||||
from .common import Response, RequestHandler
|
||||
|
||||
|
||||
class WebSocketResponse(Response):
|
||||
|
||||
def send(self, message: bytes | str):
|
||||
"""
|
||||
Send a message to the server.
|
||||
|
||||
@param message: The message to send. A string (str) is sent as a text frame, bytes is sent as a binary frame.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def recv(self):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class WebSocketRequestHandler(RequestHandler, abc.ABC):
|
||||
pass
|
||||
@@ -1,4 +1,6 @@
|
||||
"""No longer used and new code should not use. Exists only for API compat."""
|
||||
import asyncio
|
||||
import atexit
|
||||
import platform
|
||||
import struct
|
||||
import sys
|
||||
@@ -32,6 +34,77 @@ has_certifi = bool(certifi)
|
||||
has_websockets = bool(websockets)
|
||||
|
||||
|
||||
class WebSocketsWrapper:
|
||||
"""Wraps websockets module to use in non-async scopes"""
|
||||
pool = None
|
||||
|
||||
def __init__(self, url, headers=None, connect=True, **ws_kwargs):
|
||||
self.loop = asyncio.new_event_loop()
|
||||
# XXX: "loop" is deprecated
|
||||
self.conn = websockets.connect(
|
||||
url, extra_headers=headers, ping_interval=None,
|
||||
close_timeout=float('inf'), loop=self.loop, ping_timeout=float('inf'), **ws_kwargs)
|
||||
if connect:
|
||||
self.__enter__()
|
||||
atexit.register(self.__exit__, None, None, None)
|
||||
|
||||
def __enter__(self):
|
||||
if not self.pool:
|
||||
self.pool = self.run_with_loop(self.conn.__aenter__(), self.loop)
|
||||
return self
|
||||
|
||||
def send(self, *args):
|
||||
self.run_with_loop(self.pool.send(*args), self.loop)
|
||||
|
||||
def recv(self, *args):
|
||||
return self.run_with_loop(self.pool.recv(*args), self.loop)
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
try:
|
||||
return self.run_with_loop(self.conn.__aexit__(type, value, traceback), self.loop)
|
||||
finally:
|
||||
self.loop.close()
|
||||
self._cancel_all_tasks(self.loop)
|
||||
|
||||
# taken from https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py with modifications
|
||||
# for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class
|
||||
@staticmethod
|
||||
def run_with_loop(main, loop):
|
||||
if not asyncio.iscoroutine(main):
|
||||
raise ValueError(f'a coroutine was expected, got {main!r}')
|
||||
|
||||
try:
|
||||
return loop.run_until_complete(main)
|
||||
finally:
|
||||
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||
if hasattr(loop, 'shutdown_default_executor'):
|
||||
loop.run_until_complete(loop.shutdown_default_executor())
|
||||
|
||||
@staticmethod
|
||||
def _cancel_all_tasks(loop):
|
||||
to_cancel = asyncio.all_tasks(loop)
|
||||
|
||||
if not to_cancel:
|
||||
return
|
||||
|
||||
for task in to_cancel:
|
||||
task.cancel()
|
||||
|
||||
# XXX: "loop" is removed in python 3.10+
|
||||
loop.run_until_complete(
|
||||
asyncio.gather(*to_cancel, loop=loop, return_exceptions=True))
|
||||
|
||||
for task in to_cancel:
|
||||
if task.cancelled():
|
||||
continue
|
||||
if task.exception() is not None:
|
||||
loop.call_exception_handler({
|
||||
'message': 'unhandled exception during asyncio.run() shutdown',
|
||||
'exception': task.exception(),
|
||||
'task': task,
|
||||
})
|
||||
|
||||
|
||||
def load_plugins(name, suffix, namespace):
|
||||
from ..plugins import load_plugins
|
||||
ret = load_plugins(name, suffix)
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import asyncio
|
||||
import atexit
|
||||
import base64
|
||||
import binascii
|
||||
import calendar
|
||||
@@ -54,7 +52,7 @@ from ..compat import (
|
||||
compat_os_name,
|
||||
compat_shlex_quote,
|
||||
)
|
||||
from ..dependencies import websockets, xattr
|
||||
from ..dependencies import xattr
|
||||
|
||||
__name__ = __name__.rsplit('.', 1)[0] # Pretend to be the parent module
|
||||
|
||||
@@ -4923,77 +4921,6 @@ class Config:
|
||||
return self.parser.parse_args(self.all_args)
|
||||
|
||||
|
||||
class WebSocketsWrapper:
|
||||
"""Wraps websockets module to use in non-async scopes"""
|
||||
pool = None
|
||||
|
||||
def __init__(self, url, headers=None, connect=True):
|
||||
self.loop = asyncio.new_event_loop()
|
||||
# XXX: "loop" is deprecated
|
||||
self.conn = websockets.connect(
|
||||
url, extra_headers=headers, ping_interval=None,
|
||||
close_timeout=float('inf'), loop=self.loop, ping_timeout=float('inf'))
|
||||
if connect:
|
||||
self.__enter__()
|
||||
atexit.register(self.__exit__, None, None, None)
|
||||
|
||||
def __enter__(self):
|
||||
if not self.pool:
|
||||
self.pool = self.run_with_loop(self.conn.__aenter__(), self.loop)
|
||||
return self
|
||||
|
||||
def send(self, *args):
|
||||
self.run_with_loop(self.pool.send(*args), self.loop)
|
||||
|
||||
def recv(self, *args):
|
||||
return self.run_with_loop(self.pool.recv(*args), self.loop)
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
try:
|
||||
return self.run_with_loop(self.conn.__aexit__(type, value, traceback), self.loop)
|
||||
finally:
|
||||
self.loop.close()
|
||||
self._cancel_all_tasks(self.loop)
|
||||
|
||||
# taken from https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py with modifications
|
||||
# for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class
|
||||
@staticmethod
|
||||
def run_with_loop(main, loop):
|
||||
if not asyncio.iscoroutine(main):
|
||||
raise ValueError(f'a coroutine was expected, got {main!r}')
|
||||
|
||||
try:
|
||||
return loop.run_until_complete(main)
|
||||
finally:
|
||||
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||
if hasattr(loop, 'shutdown_default_executor'):
|
||||
loop.run_until_complete(loop.shutdown_default_executor())
|
||||
|
||||
@staticmethod
|
||||
def _cancel_all_tasks(loop):
|
||||
to_cancel = asyncio.all_tasks(loop)
|
||||
|
||||
if not to_cancel:
|
||||
return
|
||||
|
||||
for task in to_cancel:
|
||||
task.cancel()
|
||||
|
||||
# XXX: "loop" is removed in python 3.10+
|
||||
loop.run_until_complete(
|
||||
asyncio.gather(*to_cancel, loop=loop, return_exceptions=True))
|
||||
|
||||
for task in to_cancel:
|
||||
if task.cancelled():
|
||||
continue
|
||||
if task.exception() is not None:
|
||||
loop.call_exception_handler({
|
||||
'message': 'unhandled exception during asyncio.run() shutdown',
|
||||
'exception': task.exception(),
|
||||
'task': task,
|
||||
})
|
||||
|
||||
|
||||
def merge_headers(*dicts):
|
||||
"""Merge dicts of http headers case insensitively, prioritizing the latter ones"""
|
||||
return {k.title(): v for k, v in itertools.chain.from_iterable(map(dict.items, dicts))}
|
||||
|
||||
Reference in New Issue
Block a user