mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2025-12-14 04:05:16 +00:00
Merge remote-tracking branch 'origin' into yt-live-from-start-range
This commit is contained in:
21
test/conftest.py
Normal file
21
test/conftest.py
Normal file
@@ -0,0 +1,21 @@
|
||||
import functools
|
||||
import inspect
|
||||
|
||||
import pytest
|
||||
|
||||
from yt_dlp.networking import RequestHandler
|
||||
from yt_dlp.networking.common import _REQUEST_HANDLERS
|
||||
from yt_dlp.utils._utils import _YDLLogger as FakeLogger
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def handler(request):
|
||||
RH_KEY = request.param
|
||||
if inspect.isclass(RH_KEY) and issubclass(RH_KEY, RequestHandler):
|
||||
handler = RH_KEY
|
||||
elif RH_KEY in _REQUEST_HANDLERS:
|
||||
handler = _REQUEST_HANDLERS[RH_KEY]
|
||||
else:
|
||||
pytest.skip(f'{RH_KEY} request handler is not available')
|
||||
|
||||
return functools.partial(handler, logger=FakeLogger)
|
||||
@@ -11,7 +11,7 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
import copy
|
||||
import json
|
||||
|
||||
from test.helper import FakeYDL, assertRegexpMatches
|
||||
from test.helper import FakeYDL, assertRegexpMatches, try_rm
|
||||
from yt_dlp import YoutubeDL
|
||||
from yt_dlp.compat import compat_os_name
|
||||
from yt_dlp.extractor import YoutubeIE
|
||||
@@ -24,6 +24,7 @@ from yt_dlp.utils import (
|
||||
int_or_none,
|
||||
match_filter_func,
|
||||
)
|
||||
from yt_dlp.utils.traversal import traverse_obj
|
||||
|
||||
TEST_URL = 'http://localhost/sample.mp4'
|
||||
|
||||
@@ -630,7 +631,6 @@ class TestYoutubeDL(unittest.TestCase):
|
||||
self.assertEqual(test_dict['playlist'], 'funny videos')
|
||||
|
||||
outtmpl_info = {
|
||||
'id': '1234',
|
||||
'id': '1234',
|
||||
'ext': 'mp4',
|
||||
'width': None,
|
||||
@@ -684,7 +684,8 @@ class TestYoutubeDL(unittest.TestCase):
|
||||
test('%(id)s.%(ext)s', '1234.mp4')
|
||||
test('%(duration_string)s', ('27:46:40', '27-46-40'))
|
||||
test('%(resolution)s', '1080p')
|
||||
test('%(playlist_index)s', '001')
|
||||
test('%(playlist_index|)s', '001')
|
||||
test('%(playlist_index&{}!)s', '1!')
|
||||
test('%(playlist_autonumber)s', '02')
|
||||
test('%(autonumber)s', '00001')
|
||||
test('%(autonumber+2)03d', '005', autonumber_start=3)
|
||||
@@ -783,9 +784,9 @@ class TestYoutubeDL(unittest.TestCase):
|
||||
test('%(title4)#S', 'foo_bar_test')
|
||||
test('%(title4).10S', ('foo "bar" ', 'foo "bar"' + ('#' if compat_os_name == 'nt' else ' ')))
|
||||
if compat_os_name == 'nt':
|
||||
test('%(title4)q', ('"foo \\"bar\\" test"', ""foo ⧹"bar⧹" test""))
|
||||
test('%(formats.:.id)#q', ('"id 1" "id 2" "id 3"', '"id 1" "id 2" "id 3"'))
|
||||
test('%(formats.0.id)#q', ('"id 1"', '"id 1"'))
|
||||
test('%(title4)q', ('"foo ""bar"" test"', None))
|
||||
test('%(formats.:.id)#q', ('"id 1" "id 2" "id 3"', None))
|
||||
test('%(formats.0.id)#q', ('"id 1"', None))
|
||||
else:
|
||||
test('%(title4)q', ('\'foo "bar" test\'', '\'foo "bar" test\''))
|
||||
test('%(formats.:.id)#q', "'id 1' 'id 2' 'id 3'")
|
||||
@@ -829,6 +830,7 @@ class TestYoutubeDL(unittest.TestCase):
|
||||
test('%(id&hi {:>10} {}|)s', 'hi 1234 1234')
|
||||
test(R'%(id&{0} {}|)s', 'NA')
|
||||
test(R'%(id&{0.1}|)s', 'NA')
|
||||
test('%(height&{:,d})S', '1,080')
|
||||
|
||||
# Laziness
|
||||
def gen():
|
||||
@@ -1213,6 +1215,129 @@ class TestYoutubeDL(unittest.TestCase):
|
||||
self.assertEqual(downloaded['extractor'], 'Video')
|
||||
self.assertEqual(downloaded['extractor_key'], 'Video')
|
||||
|
||||
def test_header_cookies(self):
|
||||
from http.cookiejar import Cookie
|
||||
|
||||
ydl = FakeYDL()
|
||||
ydl.report_warning = lambda *_, **__: None
|
||||
|
||||
def cookie(name, value, version=None, domain='', path='', secure=False, expires=None):
|
||||
return Cookie(
|
||||
version or 0, name, value, None, False,
|
||||
domain, bool(domain), bool(domain), path, bool(path),
|
||||
secure, expires, False, None, None, rest={})
|
||||
|
||||
_test_url = 'https://yt.dlp/test'
|
||||
|
||||
def test(encoded_cookies, cookies, *, headers=False, round_trip=None, error_re=None):
|
||||
def _test():
|
||||
ydl.cookiejar.clear()
|
||||
ydl._load_cookies(encoded_cookies, autoscope=headers)
|
||||
if headers:
|
||||
ydl._apply_header_cookies(_test_url)
|
||||
data = {'url': _test_url}
|
||||
ydl._calc_headers(data)
|
||||
self.assertCountEqual(
|
||||
map(vars, ydl.cookiejar), map(vars, cookies),
|
||||
'Extracted cookiejar.Cookie is not the same')
|
||||
if not headers:
|
||||
self.assertEqual(
|
||||
data.get('cookies'), round_trip or encoded_cookies,
|
||||
'Cookie is not the same as round trip')
|
||||
ydl.__dict__['_YoutubeDL__header_cookies'] = []
|
||||
|
||||
with self.subTest(msg=encoded_cookies):
|
||||
if not error_re:
|
||||
_test()
|
||||
return
|
||||
with self.assertRaisesRegex(Exception, error_re):
|
||||
_test()
|
||||
|
||||
test('test=value; Domain=.yt.dlp', [cookie('test', 'value', domain='.yt.dlp')])
|
||||
test('test=value', [cookie('test', 'value')], error_re=r'Unscoped cookies are not allowed')
|
||||
test('cookie1=value1; Domain=.yt.dlp; Path=/test; cookie2=value2; Domain=.yt.dlp; Path=/', [
|
||||
cookie('cookie1', 'value1', domain='.yt.dlp', path='/test'),
|
||||
cookie('cookie2', 'value2', domain='.yt.dlp', path='/')])
|
||||
test('test=value; Domain=.yt.dlp; Path=/test; Secure; Expires=9999999999', [
|
||||
cookie('test', 'value', domain='.yt.dlp', path='/test', secure=True, expires=9999999999)])
|
||||
test('test="value; "; path=/test; domain=.yt.dlp', [
|
||||
cookie('test', 'value; ', domain='.yt.dlp', path='/test')],
|
||||
round_trip='test="value\\073 "; Domain=.yt.dlp; Path=/test')
|
||||
test('name=; Domain=.yt.dlp', [cookie('name', '', domain='.yt.dlp')],
|
||||
round_trip='name=""; Domain=.yt.dlp')
|
||||
|
||||
test('test=value', [cookie('test', 'value', domain='.yt.dlp')], headers=True)
|
||||
test('cookie1=value; Domain=.yt.dlp; cookie2=value', [], headers=True, error_re=r'Invalid syntax')
|
||||
ydl.deprecated_feature = ydl.report_error
|
||||
test('test=value', [], headers=True, error_re=r'Passing cookies as a header is a potential security risk')
|
||||
|
||||
def test_infojson_cookies(self):
|
||||
TEST_FILE = 'test_infojson_cookies.info.json'
|
||||
TEST_URL = 'https://example.com/example.mp4'
|
||||
COOKIES = 'a=b; Domain=.example.com; c=d; Domain=.example.com'
|
||||
COOKIE_HEADER = {'Cookie': 'a=b; c=d'}
|
||||
|
||||
ydl = FakeYDL()
|
||||
ydl.process_info = lambda x: ydl._write_info_json('test', x, TEST_FILE)
|
||||
|
||||
def make_info(info_header_cookies=False, fmts_header_cookies=False, cookies_field=False):
|
||||
fmt = {'url': TEST_URL}
|
||||
if fmts_header_cookies:
|
||||
fmt['http_headers'] = COOKIE_HEADER
|
||||
if cookies_field:
|
||||
fmt['cookies'] = COOKIES
|
||||
return _make_result([fmt], http_headers=COOKIE_HEADER if info_header_cookies else None)
|
||||
|
||||
def test(initial_info, note):
|
||||
result = {}
|
||||
result['processed'] = ydl.process_ie_result(initial_info)
|
||||
self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL),
|
||||
msg=f'No cookies set in cookiejar after initial process when {note}')
|
||||
ydl.cookiejar.clear()
|
||||
with open(TEST_FILE) as infojson:
|
||||
result['loaded'] = ydl.sanitize_info(json.load(infojson), True)
|
||||
result['final'] = ydl.process_ie_result(result['loaded'].copy(), download=False)
|
||||
self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL),
|
||||
msg=f'No cookies set in cookiejar after final process when {note}')
|
||||
ydl.cookiejar.clear()
|
||||
for key in ('processed', 'loaded', 'final'):
|
||||
info = result[key]
|
||||
self.assertIsNone(
|
||||
traverse_obj(info, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False),
|
||||
msg=f'Cookie header not removed in {key} result when {note}')
|
||||
self.assertEqual(
|
||||
traverse_obj(info, ((None, ('formats', 0)), 'cookies'), get_all=False), COOKIES,
|
||||
msg=f'No cookies field found in {key} result when {note}')
|
||||
|
||||
test({'url': TEST_URL, 'http_headers': COOKIE_HEADER, 'id': '1', 'title': 'x'}, 'no formats field')
|
||||
test(make_info(info_header_cookies=True), 'info_dict header cokies')
|
||||
test(make_info(fmts_header_cookies=True), 'format header cookies')
|
||||
test(make_info(info_header_cookies=True, fmts_header_cookies=True), 'info_dict and format header cookies')
|
||||
test(make_info(info_header_cookies=True, fmts_header_cookies=True, cookies_field=True), 'all cookies fields')
|
||||
test(make_info(cookies_field=True), 'cookies format field')
|
||||
test({'url': TEST_URL, 'cookies': COOKIES, 'id': '1', 'title': 'x'}, 'info_dict cookies field only')
|
||||
|
||||
try_rm(TEST_FILE)
|
||||
|
||||
def test_add_headers_cookie(self):
|
||||
def check_for_cookie_header(result):
|
||||
return traverse_obj(result, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False)
|
||||
|
||||
ydl = FakeYDL({'http_headers': {'Cookie': 'a=b'}})
|
||||
ydl._apply_header_cookies(_make_result([])['webpage_url']) # Scope to input webpage URL: .example.com
|
||||
|
||||
fmt = {'url': 'https://example.com/video.mp4'}
|
||||
result = ydl.process_ie_result(_make_result([fmt]), download=False)
|
||||
self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies in result info_dict')
|
||||
self.assertEqual(result.get('cookies'), 'a=b; Domain=.example.com', msg='No cookies were set in cookies field')
|
||||
self.assertIn('a=b', ydl.cookiejar.get_cookie_header(fmt['url']), msg='No cookies were set in cookiejar')
|
||||
|
||||
fmt = {'url': 'https://wrong.com/video.mp4'}
|
||||
result = ydl.process_ie_result(_make_result([fmt]), download=False)
|
||||
self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies for wrong domain')
|
||||
self.assertFalse(result.get('cookies'), msg='Cookies set in cookies field for wrong domain')
|
||||
self.assertFalse(ydl.cookiejar.get_cookie_header(fmt['url']), msg='Cookies set in cookiejar for wrong domain')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -17,10 +17,10 @@ from yt_dlp.cookies import YoutubeDLCookieJar
|
||||
class TestYoutubeDLCookieJar(unittest.TestCase):
|
||||
def test_keep_session_cookies(self):
|
||||
cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/session_cookies.txt')
|
||||
cookiejar.load(ignore_discard=True, ignore_expires=True)
|
||||
cookiejar.load()
|
||||
tf = tempfile.NamedTemporaryFile(delete=False)
|
||||
try:
|
||||
cookiejar.save(filename=tf.name, ignore_discard=True, ignore_expires=True)
|
||||
cookiejar.save(filename=tf.name)
|
||||
temp = tf.read().decode()
|
||||
self.assertTrue(re.search(
|
||||
r'www\.foobar\.foobar\s+FALSE\s+/\s+TRUE\s+0\s+YoutubeDLExpiresEmpty\s+YoutubeDLExpiresEmptyValue', temp))
|
||||
@@ -32,7 +32,7 @@ class TestYoutubeDLCookieJar(unittest.TestCase):
|
||||
|
||||
def test_strip_httponly_prefix(self):
|
||||
cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/httponly_cookies.txt')
|
||||
cookiejar.load(ignore_discard=True, ignore_expires=True)
|
||||
cookiejar.load()
|
||||
|
||||
def assert_cookie_has_value(key):
|
||||
self.assertEqual(cookiejar._cookies['www.foobar.foobar']['/'][key].value, key + '_VALUE')
|
||||
@@ -42,17 +42,25 @@ class TestYoutubeDLCookieJar(unittest.TestCase):
|
||||
|
||||
def test_malformed_cookies(self):
|
||||
cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/malformed_cookies.txt')
|
||||
cookiejar.load(ignore_discard=True, ignore_expires=True)
|
||||
cookiejar.load()
|
||||
# Cookies should be empty since all malformed cookie file entries
|
||||
# will be ignored
|
||||
self.assertFalse(cookiejar._cookies)
|
||||
|
||||
def test_get_cookie_header(self):
|
||||
cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/httponly_cookies.txt')
|
||||
cookiejar.load(ignore_discard=True, ignore_expires=True)
|
||||
cookiejar.load()
|
||||
header = cookiejar.get_cookie_header('https://www.foobar.foobar')
|
||||
self.assertIn('HTTPONLY_COOKIE', header)
|
||||
|
||||
def test_get_cookies_for_url(self):
|
||||
cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/session_cookies.txt')
|
||||
cookiejar.load()
|
||||
cookies = cookiejar.get_cookies_for_url('https://www.foobar.foobar/')
|
||||
self.assertEqual(len(cookies), 2)
|
||||
cookies = cookiejar.get_cookies_for_url('https://foobar.foobar/')
|
||||
self.assertFalse(cookies)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -9,15 +9,16 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
|
||||
import struct
|
||||
import urllib.parse
|
||||
|
||||
from yt_dlp import compat
|
||||
from yt_dlp.compat import urllib # isort: split
|
||||
from yt_dlp.compat import (
|
||||
compat_etree_fromstring,
|
||||
compat_expanduser,
|
||||
compat_urllib_parse_unquote,
|
||||
compat_urllib_parse_urlencode,
|
||||
)
|
||||
from yt_dlp.compat.urllib.request import getproxies
|
||||
|
||||
|
||||
class TestCompat(unittest.TestCase):
|
||||
@@ -28,8 +29,7 @@ class TestCompat(unittest.TestCase):
|
||||
with self.assertWarns(DeprecationWarning):
|
||||
compat.WINDOWS_VT_MODE
|
||||
|
||||
# TODO: Test submodule
|
||||
# compat.asyncio.events # Must not raise error
|
||||
self.assertEqual(urllib.request.getproxies, getproxies)
|
||||
|
||||
with self.assertWarns(DeprecationWarning):
|
||||
compat.compat_pycrypto_AES # Must not raise error
|
||||
|
||||
@@ -10,10 +10,7 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import collections
|
||||
import hashlib
|
||||
import http.client
|
||||
import json
|
||||
import socket
|
||||
import urllib.error
|
||||
|
||||
from test.helper import (
|
||||
assertGreaterEqual,
|
||||
@@ -29,10 +26,12 @@ from test.helper import (
|
||||
|
||||
import yt_dlp.YoutubeDL # isort: split
|
||||
from yt_dlp.extractor import get_info_extractor
|
||||
from yt_dlp.networking.exceptions import HTTPError, TransportError
|
||||
from yt_dlp.utils import (
|
||||
DownloadError,
|
||||
ExtractorError,
|
||||
UnavailableVideoError,
|
||||
YoutubeDLError,
|
||||
format_bytes,
|
||||
join_nonempty,
|
||||
)
|
||||
@@ -102,6 +101,8 @@ def generator(test_case, tname):
|
||||
print_skipping('IE marked as not _WORKING')
|
||||
|
||||
for tc in test_cases:
|
||||
if tc.get('expected_exception'):
|
||||
continue
|
||||
info_dict = tc.get('info_dict', {})
|
||||
params = tc.get('params', {})
|
||||
if not info_dict.get('id'):
|
||||
@@ -141,6 +142,17 @@ def generator(test_case, tname):
|
||||
|
||||
res_dict = None
|
||||
|
||||
def match_exception(err):
|
||||
expected_exception = test_case.get('expected_exception')
|
||||
if not expected_exception:
|
||||
return False
|
||||
if err.__class__.__name__ == expected_exception:
|
||||
return True
|
||||
for exc in err.exc_info:
|
||||
if exc.__class__.__name__ == expected_exception:
|
||||
return True
|
||||
return False
|
||||
|
||||
def try_rm_tcs_files(tcs=None):
|
||||
if tcs is None:
|
||||
tcs = test_cases
|
||||
@@ -162,8 +174,9 @@ def generator(test_case, tname):
|
||||
force_generic_extractor=params.get('force_generic_extractor', False))
|
||||
except (DownloadError, ExtractorError) as err:
|
||||
# Check if the exception is not a network related one
|
||||
if (err.exc_info[0] not in (urllib.error.URLError, socket.timeout, UnavailableVideoError, http.client.BadStatusLine)
|
||||
or (err.exc_info[0] == urllib.error.HTTPError and err.exc_info[1].code == 503)):
|
||||
if not isinstance(err.exc_info[1], (TransportError, UnavailableVideoError)) or (isinstance(err.exc_info[1], HTTPError) and err.exc_info[1].status == 503):
|
||||
if match_exception(err):
|
||||
return
|
||||
err.msg = f'{getattr(err, "msg", err)} ({tname})'
|
||||
raise
|
||||
|
||||
@@ -174,6 +187,10 @@ def generator(test_case, tname):
|
||||
print(f'Retrying: {try_num} failed tries\n\n##########\n\n')
|
||||
|
||||
try_num += 1
|
||||
except YoutubeDLError as err:
|
||||
if match_exception(err):
|
||||
return
|
||||
raise
|
||||
else:
|
||||
break
|
||||
|
||||
@@ -249,7 +266,7 @@ def generator(test_case, tname):
|
||||
# extractor returns full results even with extract_flat
|
||||
res_tcs = [{'info_dict': e} for e in res_dict['entries']]
|
||||
try_rm_tcs_files(res_tcs)
|
||||
|
||||
ydl.close()
|
||||
return test_template
|
||||
|
||||
|
||||
|
||||
139
test/test_downloader_external.py
Normal file
139
test/test_downloader_external.py
Normal file
@@ -0,0 +1,139 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Allow direct execution
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import http.cookiejar
|
||||
|
||||
from test.helper import FakeYDL
|
||||
from yt_dlp.downloader.external import (
|
||||
Aria2cFD,
|
||||
AxelFD,
|
||||
CurlFD,
|
||||
FFmpegFD,
|
||||
HttpieFD,
|
||||
WgetFD,
|
||||
)
|
||||
|
||||
TEST_COOKIE = {
|
||||
'version': 0,
|
||||
'name': 'test',
|
||||
'value': 'ytdlp',
|
||||
'port': None,
|
||||
'port_specified': False,
|
||||
'domain': '.example.com',
|
||||
'domain_specified': True,
|
||||
'domain_initial_dot': False,
|
||||
'path': '/',
|
||||
'path_specified': True,
|
||||
'secure': False,
|
||||
'expires': None,
|
||||
'discard': False,
|
||||
'comment': None,
|
||||
'comment_url': None,
|
||||
'rest': {},
|
||||
}
|
||||
|
||||
TEST_INFO = {'url': 'http://www.example.com/'}
|
||||
|
||||
|
||||
class TestHttpieFD(unittest.TestCase):
|
||||
def test_make_cmd(self):
|
||||
with FakeYDL() as ydl:
|
||||
downloader = HttpieFD(ydl, {})
|
||||
self.assertEqual(
|
||||
downloader._make_cmd('test', TEST_INFO),
|
||||
['http', '--download', '--output', 'test', 'http://www.example.com/'])
|
||||
|
||||
# Test cookie header is added
|
||||
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
|
||||
self.assertEqual(
|
||||
downloader._make_cmd('test', TEST_INFO),
|
||||
['http', '--download', '--output', 'test', 'http://www.example.com/', 'Cookie:test=ytdlp'])
|
||||
|
||||
|
||||
class TestAxelFD(unittest.TestCase):
|
||||
def test_make_cmd(self):
|
||||
with FakeYDL() as ydl:
|
||||
downloader = AxelFD(ydl, {})
|
||||
self.assertEqual(
|
||||
downloader._make_cmd('test', TEST_INFO),
|
||||
['axel', '-o', 'test', '--', 'http://www.example.com/'])
|
||||
|
||||
# Test cookie header is added
|
||||
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
|
||||
self.assertEqual(
|
||||
downloader._make_cmd('test', TEST_INFO),
|
||||
['axel', '-o', 'test', '-H', 'Cookie: test=ytdlp', '--max-redirect=0', '--', 'http://www.example.com/'])
|
||||
|
||||
|
||||
class TestWgetFD(unittest.TestCase):
|
||||
def test_make_cmd(self):
|
||||
with FakeYDL() as ydl:
|
||||
downloader = WgetFD(ydl, {})
|
||||
self.assertNotIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
|
||||
# Test cookiejar tempfile arg is added
|
||||
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
|
||||
self.assertIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
|
||||
|
||||
|
||||
class TestCurlFD(unittest.TestCase):
|
||||
def test_make_cmd(self):
|
||||
with FakeYDL() as ydl:
|
||||
downloader = CurlFD(ydl, {})
|
||||
self.assertNotIn('--cookie', downloader._make_cmd('test', TEST_INFO))
|
||||
# Test cookie header is added
|
||||
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
|
||||
self.assertIn('--cookie', downloader._make_cmd('test', TEST_INFO))
|
||||
self.assertIn('test=ytdlp', downloader._make_cmd('test', TEST_INFO))
|
||||
|
||||
|
||||
class TestAria2cFD(unittest.TestCase):
|
||||
def test_make_cmd(self):
|
||||
with FakeYDL() as ydl:
|
||||
downloader = Aria2cFD(ydl, {})
|
||||
downloader._make_cmd('test', TEST_INFO)
|
||||
self.assertFalse(hasattr(downloader, '_cookies_tempfile'))
|
||||
|
||||
# Test cookiejar tempfile arg is added
|
||||
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
|
||||
cmd = downloader._make_cmd('test', TEST_INFO)
|
||||
self.assertIn(f'--load-cookies={downloader._cookies_tempfile}', cmd)
|
||||
|
||||
|
||||
@unittest.skipUnless(FFmpegFD.available(), 'ffmpeg not found')
|
||||
class TestFFmpegFD(unittest.TestCase):
|
||||
_args = []
|
||||
|
||||
def _test_cmd(self, args):
|
||||
self._args = args
|
||||
|
||||
def test_make_cmd(self):
|
||||
with FakeYDL() as ydl:
|
||||
downloader = FFmpegFD(ydl, {})
|
||||
downloader._debug_cmd = self._test_cmd
|
||||
|
||||
downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'})
|
||||
self.assertEqual(self._args, [
|
||||
'ffmpeg', '-y', '-hide_banner', '-i', 'http://www.example.com/',
|
||||
'-c', 'copy', '-f', 'mp4', 'file:test'])
|
||||
|
||||
# Test cookies arg is added
|
||||
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(**TEST_COOKIE))
|
||||
downloader._call_downloader('test', {**TEST_INFO, 'ext': 'mp4'})
|
||||
self.assertEqual(self._args, [
|
||||
'ffmpeg', '-y', '-hide_banner', '-cookies', 'test=ytdlp; path=/; domain=.example.com;\r\n',
|
||||
'-i', 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', 'file:test'])
|
||||
|
||||
# Test with non-url input (ffmpeg reads from stdin '-' for websockets)
|
||||
downloader._call_downloader('test', {'url': 'x', 'ext': 'mp4'})
|
||||
self.assertEqual(self._args, [
|
||||
'ffmpeg', '-y', '-hide_banner', '-i', 'x', '-c', 'copy', '-f', 'mp4', 'file:test'])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -16,6 +16,7 @@ from test.helper import http_server_port, try_rm
|
||||
from yt_dlp import YoutubeDL
|
||||
from yt_dlp.downloader.http import HttpFD
|
||||
from yt_dlp.utils import encodeFilename
|
||||
from yt_dlp.utils._utils import _YDLLogger as FakeLogger
|
||||
|
||||
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
@@ -67,17 +68,6 @@ class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
assert False
|
||||
|
||||
|
||||
class FakeLogger:
|
||||
def debug(self, msg):
|
||||
pass
|
||||
|
||||
def warning(self, msg):
|
||||
pass
|
||||
|
||||
def error(self, msg):
|
||||
pass
|
||||
|
||||
|
||||
class TestHttpFD(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.httpd = http.server.HTTPServer(
|
||||
|
||||
@@ -45,6 +45,9 @@ class TestExecution(unittest.TestCase):
|
||||
self.assertTrue(os.path.exists(LAZY_EXTRACTORS))
|
||||
|
||||
_, stderr = self.run_yt_dlp(opts=('-s', 'test:'))
|
||||
# `MIN_RECOMMENDED` emits a deprecated feature warning for deprecated python versions
|
||||
if stderr and stderr.startswith('Deprecated Feature: Support for Python'):
|
||||
stderr = ''
|
||||
self.assertFalse(stderr)
|
||||
|
||||
subprocess.check_call([sys.executable, 'test/test_all_urls.py'], cwd=rootDir, stdout=subprocess.DEVNULL)
|
||||
|
||||
@@ -1,500 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Allow direct execution
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import gzip
|
||||
import http.cookiejar
|
||||
import http.server
|
||||
import io
|
||||
import pathlib
|
||||
import ssl
|
||||
import tempfile
|
||||
import threading
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
import zlib
|
||||
|
||||
from test.helper import http_server_port
|
||||
from yt_dlp import YoutubeDL
|
||||
from yt_dlp.dependencies import brotli
|
||||
from yt_dlp.utils import sanitized_Request, urlencode_postdata
|
||||
|
||||
from .helper import FakeYDL
|
||||
|
||||
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
||||
class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
protocol_version = 'HTTP/1.1'
|
||||
|
||||
def log_message(self, format, *args):
|
||||
pass
|
||||
|
||||
def _headers(self):
|
||||
payload = str(self.headers).encode('utf-8')
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'application/json')
|
||||
self.send_header('Content-Length', str(len(payload)))
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
|
||||
def _redirect(self):
|
||||
self.send_response(int(self.path[len('/redirect_'):]))
|
||||
self.send_header('Location', '/method')
|
||||
self.send_header('Content-Length', '0')
|
||||
self.end_headers()
|
||||
|
||||
def _method(self, method, payload=None):
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Length', str(len(payload or '')))
|
||||
self.send_header('Method', method)
|
||||
self.end_headers()
|
||||
if payload:
|
||||
self.wfile.write(payload)
|
||||
|
||||
def _status(self, status):
|
||||
payload = f'<html>{status} NOT FOUND</html>'.encode()
|
||||
self.send_response(int(status))
|
||||
self.send_header('Content-Type', 'text/html; charset=utf-8')
|
||||
self.send_header('Content-Length', str(len(payload)))
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
|
||||
def _read_data(self):
|
||||
if 'Content-Length' in self.headers:
|
||||
return self.rfile.read(int(self.headers['Content-Length']))
|
||||
|
||||
def do_POST(self):
|
||||
data = self._read_data()
|
||||
if self.path.startswith('/redirect_'):
|
||||
self._redirect()
|
||||
elif self.path.startswith('/method'):
|
||||
self._method('POST', data)
|
||||
elif self.path.startswith('/headers'):
|
||||
self._headers()
|
||||
else:
|
||||
self._status(404)
|
||||
|
||||
def do_HEAD(self):
|
||||
if self.path.startswith('/redirect_'):
|
||||
self._redirect()
|
||||
elif self.path.startswith('/method'):
|
||||
self._method('HEAD')
|
||||
else:
|
||||
self._status(404)
|
||||
|
||||
def do_PUT(self):
|
||||
data = self._read_data()
|
||||
if self.path.startswith('/redirect_'):
|
||||
self._redirect()
|
||||
elif self.path.startswith('/method'):
|
||||
self._method('PUT', data)
|
||||
else:
|
||||
self._status(404)
|
||||
|
||||
def do_GET(self):
|
||||
if self.path == '/video.html':
|
||||
payload = b'<html><video src="/vid.mp4" /></html>'
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'text/html; charset=utf-8')
|
||||
self.send_header('Content-Length', str(len(payload))) # required for persistent connections
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
elif self.path == '/vid.mp4':
|
||||
payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'video/mp4')
|
||||
self.send_header('Content-Length', str(len(payload)))
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
elif self.path == '/%E4%B8%AD%E6%96%87.html':
|
||||
payload = b'<html><video src="/vid.mp4" /></html>'
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'text/html; charset=utf-8')
|
||||
self.send_header('Content-Length', str(len(payload)))
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
elif self.path == '/%c7%9f':
|
||||
payload = b'<html><video src="/vid.mp4" /></html>'
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'text/html; charset=utf-8')
|
||||
self.send_header('Content-Length', str(len(payload)))
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
elif self.path.startswith('/redirect_'):
|
||||
self._redirect()
|
||||
elif self.path.startswith('/method'):
|
||||
self._method('GET')
|
||||
elif self.path.startswith('/headers'):
|
||||
self._headers()
|
||||
elif self.path == '/trailing_garbage':
|
||||
payload = b'<html><video src="/vid.mp4" /></html>'
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'text/html; charset=utf-8')
|
||||
self.send_header('Content-Encoding', 'gzip')
|
||||
buf = io.BytesIO()
|
||||
with gzip.GzipFile(fileobj=buf, mode='wb') as f:
|
||||
f.write(payload)
|
||||
compressed = buf.getvalue() + b'trailing garbage'
|
||||
self.send_header('Content-Length', str(len(compressed)))
|
||||
self.end_headers()
|
||||
self.wfile.write(compressed)
|
||||
elif self.path == '/302-non-ascii-redirect':
|
||||
new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
|
||||
self.send_response(301)
|
||||
self.send_header('Location', new_url)
|
||||
self.send_header('Content-Length', '0')
|
||||
self.end_headers()
|
||||
elif self.path == '/content-encoding':
|
||||
encodings = self.headers.get('ytdl-encoding', '')
|
||||
payload = b'<html><video src="/vid.mp4" /></html>'
|
||||
for encoding in filter(None, (e.strip() for e in encodings.split(','))):
|
||||
if encoding == 'br' and brotli:
|
||||
payload = brotli.compress(payload)
|
||||
elif encoding == 'gzip':
|
||||
buf = io.BytesIO()
|
||||
with gzip.GzipFile(fileobj=buf, mode='wb') as f:
|
||||
f.write(payload)
|
||||
payload = buf.getvalue()
|
||||
elif encoding == 'deflate':
|
||||
payload = zlib.compress(payload)
|
||||
elif encoding == 'unsupported':
|
||||
payload = b'raw'
|
||||
break
|
||||
else:
|
||||
self._status(415)
|
||||
return
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Encoding', encodings)
|
||||
self.send_header('Content-Length', str(len(payload)))
|
||||
self.end_headers()
|
||||
self.wfile.write(payload)
|
||||
|
||||
else:
|
||||
self._status(404)
|
||||
|
||||
def send_header(self, keyword, value):
|
||||
"""
|
||||
Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
|
||||
This is against what is defined in RFC 3986, however we need to test we support this
|
||||
since some sites incorrectly do this.
|
||||
"""
|
||||
if keyword.lower() == 'connection':
|
||||
return super().send_header(keyword, value)
|
||||
|
||||
if not hasattr(self, '_headers_buffer'):
|
||||
self._headers_buffer = []
|
||||
|
||||
self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
|
||||
|
||||
|
||||
class FakeLogger:
|
||||
def debug(self, msg):
|
||||
pass
|
||||
|
||||
def warning(self, msg):
|
||||
pass
|
||||
|
||||
def error(self, msg):
|
||||
pass
|
||||
|
||||
|
||||
class TestHTTP(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# HTTP server
|
||||
self.http_httpd = http.server.ThreadingHTTPServer(
|
||||
('127.0.0.1', 0), HTTPTestRequestHandler)
|
||||
self.http_port = http_server_port(self.http_httpd)
|
||||
self.http_server_thread = threading.Thread(target=self.http_httpd.serve_forever)
|
||||
# FIXME: we should probably stop the http server thread after each test
|
||||
# See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
|
||||
self.http_server_thread.daemon = True
|
||||
self.http_server_thread.start()
|
||||
|
||||
# HTTPS server
|
||||
certfn = os.path.join(TEST_DIR, 'testcert.pem')
|
||||
self.https_httpd = http.server.ThreadingHTTPServer(
|
||||
('127.0.0.1', 0), HTTPTestRequestHandler)
|
||||
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||
sslctx.load_cert_chain(certfn, None)
|
||||
self.https_httpd.socket = sslctx.wrap_socket(self.https_httpd.socket, server_side=True)
|
||||
self.https_port = http_server_port(self.https_httpd)
|
||||
self.https_server_thread = threading.Thread(target=self.https_httpd.serve_forever)
|
||||
self.https_server_thread.daemon = True
|
||||
self.https_server_thread.start()
|
||||
|
||||
def test_nocheckcertificate(self):
|
||||
with FakeYDL({'logger': FakeLogger()}) as ydl:
|
||||
with self.assertRaises(urllib.error.URLError):
|
||||
ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
|
||||
|
||||
with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}) as ydl:
|
||||
r = ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
|
||||
self.assertEqual(r.status, 200)
|
||||
r.close()
|
||||
|
||||
def test_percent_encode(self):
|
||||
with FakeYDL() as ydl:
|
||||
# Unicode characters should be encoded with uppercase percent-encoding
|
||||
res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
|
||||
self.assertEqual(res.status, 200)
|
||||
res.close()
|
||||
# don't normalize existing percent encodings
|
||||
res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
|
||||
self.assertEqual(res.status, 200)
|
||||
res.close()
|
||||
|
||||
def test_unicode_path_redirection(self):
|
||||
with FakeYDL() as ydl:
|
||||
r = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
|
||||
self.assertEqual(r.url, f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html')
|
||||
r.close()
|
||||
|
||||
def test_redirect(self):
|
||||
with FakeYDL() as ydl:
|
||||
def do_req(redirect_status, method):
|
||||
data = b'testdata' if method in ('POST', 'PUT') else None
|
||||
res = ydl.urlopen(sanitized_Request(
|
||||
f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data))
|
||||
return res.read().decode('utf-8'), res.headers.get('method', '')
|
||||
|
||||
# A 303 must either use GET or HEAD for subsequent request
|
||||
self.assertEqual(do_req(303, 'POST'), ('', 'GET'))
|
||||
self.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD'))
|
||||
|
||||
self.assertEqual(do_req(303, 'PUT'), ('', 'GET'))
|
||||
|
||||
# 301 and 302 turn POST only into a GET
|
||||
self.assertEqual(do_req(301, 'POST'), ('', 'GET'))
|
||||
self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD'))
|
||||
self.assertEqual(do_req(302, 'POST'), ('', 'GET'))
|
||||
self.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD'))
|
||||
|
||||
self.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT'))
|
||||
self.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT'))
|
||||
|
||||
# 307 and 308 should not change method
|
||||
for m in ('POST', 'PUT'):
|
||||
self.assertEqual(do_req(307, m), ('testdata', m))
|
||||
self.assertEqual(do_req(308, m), ('testdata', m))
|
||||
|
||||
self.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD'))
|
||||
self.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD'))
|
||||
|
||||
# These should not redirect and instead raise an HTTPError
|
||||
for code in (300, 304, 305, 306):
|
||||
with self.assertRaises(urllib.error.HTTPError):
|
||||
do_req(code, 'GET')
|
||||
|
||||
def test_content_type(self):
|
||||
# https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28
|
||||
with FakeYDL({'nocheckcertificate': True}) as ydl:
|
||||
# method should be auto-detected as POST
|
||||
r = sanitized_Request(f'https://localhost:{self.https_port}/headers', data=urlencode_postdata({'test': 'test'}))
|
||||
|
||||
headers = ydl.urlopen(r).read().decode('utf-8')
|
||||
self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
|
||||
|
||||
# test http
|
||||
r = sanitized_Request(f'http://localhost:{self.http_port}/headers', data=urlencode_postdata({'test': 'test'}))
|
||||
headers = ydl.urlopen(r).read().decode('utf-8')
|
||||
self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
|
||||
|
||||
def test_cookiejar(self):
|
||||
with FakeYDL() as ydl:
|
||||
ydl.cookiejar.set_cookie(http.cookiejar.Cookie(
|
||||
0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
|
||||
False, '/headers', True, False, None, False, None, None, {}))
|
||||
data = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
|
||||
self.assertIn(b'Cookie: test=ytdlp', data)
|
||||
|
||||
def test_no_compression_compat_header(self):
|
||||
with FakeYDL() as ydl:
|
||||
data = ydl.urlopen(
|
||||
sanitized_Request(
|
||||
f'http://127.0.0.1:{self.http_port}/headers',
|
||||
headers={'Youtubedl-no-compression': True})).read()
|
||||
self.assertIn(b'Accept-Encoding: identity', data)
|
||||
self.assertNotIn(b'youtubedl-no-compression', data.lower())
|
||||
|
||||
def test_gzip_trailing_garbage(self):
|
||||
# https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5
|
||||
# https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f
|
||||
with FakeYDL() as ydl:
|
||||
data = ydl.urlopen(sanitized_Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8')
|
||||
self.assertEqual(data, '<html><video src="/vid.mp4" /></html>')
|
||||
|
||||
@unittest.skipUnless(brotli, 'brotli support is not installed')
|
||||
def test_brotli(self):
|
||||
with FakeYDL() as ydl:
|
||||
res = ydl.urlopen(
|
||||
sanitized_Request(
|
||||
f'http://127.0.0.1:{self.http_port}/content-encoding',
|
||||
headers={'ytdl-encoding': 'br'}))
|
||||
self.assertEqual(res.headers.get('Content-Encoding'), 'br')
|
||||
self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
|
||||
|
||||
def test_deflate(self):
|
||||
with FakeYDL() as ydl:
|
||||
res = ydl.urlopen(
|
||||
sanitized_Request(
|
||||
f'http://127.0.0.1:{self.http_port}/content-encoding',
|
||||
headers={'ytdl-encoding': 'deflate'}))
|
||||
self.assertEqual(res.headers.get('Content-Encoding'), 'deflate')
|
||||
self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
|
||||
|
||||
def test_gzip(self):
|
||||
with FakeYDL() as ydl:
|
||||
res = ydl.urlopen(
|
||||
sanitized_Request(
|
||||
f'http://127.0.0.1:{self.http_port}/content-encoding',
|
||||
headers={'ytdl-encoding': 'gzip'}))
|
||||
self.assertEqual(res.headers.get('Content-Encoding'), 'gzip')
|
||||
self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
|
||||
|
||||
def test_multiple_encodings(self):
|
||||
# https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4
|
||||
with FakeYDL() as ydl:
|
||||
for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
|
||||
res = ydl.urlopen(
|
||||
sanitized_Request(
|
||||
f'http://127.0.0.1:{self.http_port}/content-encoding',
|
||||
headers={'ytdl-encoding': pair}))
|
||||
self.assertEqual(res.headers.get('Content-Encoding'), pair)
|
||||
self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
|
||||
|
||||
def test_unsupported_encoding(self):
|
||||
# it should return the raw content
|
||||
with FakeYDL() as ydl:
|
||||
res = ydl.urlopen(
|
||||
sanitized_Request(
|
||||
f'http://127.0.0.1:{self.http_port}/content-encoding',
|
||||
headers={'ytdl-encoding': 'unsupported'}))
|
||||
self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported')
|
||||
self.assertEqual(res.read(), b'raw')
|
||||
|
||||
|
||||
class TestClientCert(unittest.TestCase):
|
||||
def setUp(self):
|
||||
certfn = os.path.join(TEST_DIR, 'testcert.pem')
|
||||
self.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
|
||||
cacertfn = os.path.join(self.certdir, 'ca.crt')
|
||||
self.httpd = http.server.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
|
||||
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||
sslctx.verify_mode = ssl.CERT_REQUIRED
|
||||
sslctx.load_verify_locations(cafile=cacertfn)
|
||||
sslctx.load_cert_chain(certfn, None)
|
||||
self.httpd.socket = sslctx.wrap_socket(self.httpd.socket, server_side=True)
|
||||
self.port = http_server_port(self.httpd)
|
||||
self.server_thread = threading.Thread(target=self.httpd.serve_forever)
|
||||
self.server_thread.daemon = True
|
||||
self.server_thread.start()
|
||||
|
||||
def _run_test(self, **params):
|
||||
ydl = YoutubeDL({
|
||||
'logger': FakeLogger(),
|
||||
# Disable client-side validation of unacceptable self-signed testcert.pem
|
||||
# The test is of a check on the server side, so unaffected
|
||||
'nocheckcertificate': True,
|
||||
**params,
|
||||
})
|
||||
r = ydl.extract_info(f'https://127.0.0.1:{self.port}/video.html')
|
||||
self.assertEqual(r['url'], f'https://127.0.0.1:{self.port}/vid.mp4')
|
||||
|
||||
def test_certificate_combined_nopass(self):
|
||||
self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithkey.crt'))
|
||||
|
||||
def test_certificate_nocombined_nopass(self):
|
||||
self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
|
||||
client_certificate_key=os.path.join(self.certdir, 'client.key'))
|
||||
|
||||
def test_certificate_combined_pass(self):
|
||||
self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
|
||||
client_certificate_password='foobar')
|
||||
|
||||
def test_certificate_nocombined_pass(self):
|
||||
self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
|
||||
client_certificate_key=os.path.join(self.certdir, 'clientencrypted.key'),
|
||||
client_certificate_password='foobar')
|
||||
|
||||
|
||||
def _build_proxy_handler(name):
|
||||
class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
proxy_name = name
|
||||
|
||||
def log_message(self, format, *args):
|
||||
pass
|
||||
|
||||
def do_GET(self):
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'text/plain; charset=utf-8')
|
||||
self.end_headers()
|
||||
self.wfile.write(f'{self.proxy_name}: {self.path}'.encode())
|
||||
return HTTPTestRequestHandler
|
||||
|
||||
|
||||
class TestProxy(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.proxy = http.server.HTTPServer(
|
||||
('127.0.0.1', 0), _build_proxy_handler('normal'))
|
||||
self.port = http_server_port(self.proxy)
|
||||
self.proxy_thread = threading.Thread(target=self.proxy.serve_forever)
|
||||
self.proxy_thread.daemon = True
|
||||
self.proxy_thread.start()
|
||||
|
||||
self.geo_proxy = http.server.HTTPServer(
|
||||
('127.0.0.1', 0), _build_proxy_handler('geo'))
|
||||
self.geo_port = http_server_port(self.geo_proxy)
|
||||
self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever)
|
||||
self.geo_proxy_thread.daemon = True
|
||||
self.geo_proxy_thread.start()
|
||||
|
||||
def test_proxy(self):
|
||||
geo_proxy = f'127.0.0.1:{self.geo_port}'
|
||||
ydl = YoutubeDL({
|
||||
'proxy': f'127.0.0.1:{self.port}',
|
||||
'geo_verification_proxy': geo_proxy,
|
||||
})
|
||||
url = 'http://foo.com/bar'
|
||||
response = ydl.urlopen(url).read().decode()
|
||||
self.assertEqual(response, f'normal: {url}')
|
||||
|
||||
req = urllib.request.Request(url)
|
||||
req.add_header('Ytdl-request-proxy', geo_proxy)
|
||||
response = ydl.urlopen(req).read().decode()
|
||||
self.assertEqual(response, f'geo: {url}')
|
||||
|
||||
def test_proxy_with_idn(self):
|
||||
ydl = YoutubeDL({
|
||||
'proxy': f'127.0.0.1:{self.port}',
|
||||
})
|
||||
url = 'http://中文.tw/'
|
||||
response = ydl.urlopen(url).read().decode()
|
||||
# b'xn--fiq228c' is '中文'.encode('idna')
|
||||
self.assertEqual(response, 'normal: http://xn--fiq228c.tw/')
|
||||
|
||||
|
||||
class TestFileURL(unittest.TestCase):
|
||||
# See https://github.com/ytdl-org/youtube-dl/issues/8227
|
||||
def test_file_urls(self):
|
||||
tf = tempfile.NamedTemporaryFile(delete=False)
|
||||
tf.write(b'foobar')
|
||||
tf.close()
|
||||
url = pathlib.Path(tf.name).as_uri()
|
||||
with FakeYDL() as ydl:
|
||||
self.assertRaisesRegex(
|
||||
urllib.error.URLError, 'file:// URLs are explicitly disabled in yt-dlp for security reasons', ydl.urlopen, url)
|
||||
with FakeYDL({'enable_file_urls': True}) as ydl:
|
||||
res = ydl.urlopen(url)
|
||||
self.assertEqual(res.read(), b'foobar')
|
||||
res.close()
|
||||
os.unlink(tf.name)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
1439
test/test_networking.py
Normal file
1439
test/test_networking.py
Normal file
File diff suppressed because it is too large
Load Diff
282
test/test_networking_utils.py
Normal file
282
test/test_networking_utils.py
Normal file
@@ -0,0 +1,282 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Allow direct execution
|
||||
import os
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import contextlib
|
||||
import io
|
||||
import platform
|
||||
import random
|
||||
import ssl
|
||||
import urllib.error
|
||||
import warnings
|
||||
|
||||
from yt_dlp.cookies import YoutubeDLCookieJar
|
||||
from yt_dlp.dependencies import certifi
|
||||
from yt_dlp.networking import Response
|
||||
from yt_dlp.networking._helper import (
|
||||
InstanceStoreMixin,
|
||||
add_accept_encoding_header,
|
||||
get_redirect_method,
|
||||
make_socks_proxy_opts,
|
||||
select_proxy,
|
||||
ssl_load_certs,
|
||||
)
|
||||
from yt_dlp.networking.exceptions import (
|
||||
HTTPError,
|
||||
IncompleteRead,
|
||||
_CompatHTTPError,
|
||||
)
|
||||
from yt_dlp.socks import ProxyType
|
||||
from yt_dlp.utils.networking import HTTPHeaderDict
|
||||
|
||||
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
||||
class TestNetworkingUtils:
|
||||
|
||||
def test_select_proxy(self):
|
||||
proxies = {
|
||||
'all': 'socks5://example.com',
|
||||
'http': 'http://example.com:1080',
|
||||
'no': 'bypass.example.com,yt-dl.org'
|
||||
}
|
||||
|
||||
assert select_proxy('https://example.com', proxies) == proxies['all']
|
||||
assert select_proxy('http://example.com', proxies) == proxies['http']
|
||||
assert select_proxy('http://bypass.example.com', proxies) is None
|
||||
assert select_proxy('https://yt-dl.org', proxies) is None
|
||||
|
||||
@pytest.mark.parametrize('socks_proxy,expected', [
|
||||
('socks5h://example.com', {
|
||||
'proxytype': ProxyType.SOCKS5,
|
||||
'addr': 'example.com',
|
||||
'port': 1080,
|
||||
'rdns': True,
|
||||
'username': None,
|
||||
'password': None
|
||||
}),
|
||||
('socks5://user:@example.com:5555', {
|
||||
'proxytype': ProxyType.SOCKS5,
|
||||
'addr': 'example.com',
|
||||
'port': 5555,
|
||||
'rdns': False,
|
||||
'username': 'user',
|
||||
'password': ''
|
||||
}),
|
||||
('socks4://u%40ser:pa%20ss@127.0.0.1:1080', {
|
||||
'proxytype': ProxyType.SOCKS4,
|
||||
'addr': '127.0.0.1',
|
||||
'port': 1080,
|
||||
'rdns': False,
|
||||
'username': 'u@ser',
|
||||
'password': 'pa ss'
|
||||
}),
|
||||
('socks4a://:pa%20ss@127.0.0.1', {
|
||||
'proxytype': ProxyType.SOCKS4A,
|
||||
'addr': '127.0.0.1',
|
||||
'port': 1080,
|
||||
'rdns': True,
|
||||
'username': '',
|
||||
'password': 'pa ss'
|
||||
})
|
||||
])
|
||||
def test_make_socks_proxy_opts(self, socks_proxy, expected):
|
||||
assert make_socks_proxy_opts(socks_proxy) == expected
|
||||
|
||||
def test_make_socks_proxy_unknown(self):
|
||||
with pytest.raises(ValueError, match='Unknown SOCKS proxy version: socks'):
|
||||
make_socks_proxy_opts('socks://127.0.0.1')
|
||||
|
||||
@pytest.mark.skipif(not certifi, reason='certifi is not installed')
|
||||
def test_load_certifi(self):
|
||||
context_certifi = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
context_certifi.load_verify_locations(cafile=certifi.where())
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ssl_load_certs(context, use_certifi=True)
|
||||
assert context.get_ca_certs() == context_certifi.get_ca_certs()
|
||||
|
||||
context_default = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
context_default.load_default_certs()
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ssl_load_certs(context, use_certifi=False)
|
||||
assert context.get_ca_certs() == context_default.get_ca_certs()
|
||||
|
||||
if context_default.get_ca_certs() == context_certifi.get_ca_certs():
|
||||
pytest.skip('System uses certifi as default. The test is not valid')
|
||||
|
||||
@pytest.mark.parametrize('method,status,expected', [
|
||||
('GET', 303, 'GET'),
|
||||
('HEAD', 303, 'HEAD'),
|
||||
('PUT', 303, 'GET'),
|
||||
('POST', 301, 'GET'),
|
||||
('HEAD', 301, 'HEAD'),
|
||||
('POST', 302, 'GET'),
|
||||
('HEAD', 302, 'HEAD'),
|
||||
('PUT', 302, 'PUT'),
|
||||
('POST', 308, 'POST'),
|
||||
('POST', 307, 'POST'),
|
||||
('HEAD', 308, 'HEAD'),
|
||||
('HEAD', 307, 'HEAD'),
|
||||
])
|
||||
def test_get_redirect_method(self, method, status, expected):
|
||||
assert get_redirect_method(method, status) == expected
|
||||
|
||||
@pytest.mark.parametrize('headers,supported_encodings,expected', [
|
||||
({'Accept-Encoding': 'br'}, ['gzip', 'br'], {'Accept-Encoding': 'br'}),
|
||||
({}, ['gzip', 'br'], {'Accept-Encoding': 'gzip, br'}),
|
||||
({'Content-type': 'application/json'}, [], {'Content-type': 'application/json', 'Accept-Encoding': 'identity'}),
|
||||
])
|
||||
def test_add_accept_encoding_header(self, headers, supported_encodings, expected):
|
||||
headers = HTTPHeaderDict(headers)
|
||||
add_accept_encoding_header(headers, supported_encodings)
|
||||
assert headers == HTTPHeaderDict(expected)
|
||||
|
||||
|
||||
class TestInstanceStoreMixin:
|
||||
|
||||
class FakeInstanceStoreMixin(InstanceStoreMixin):
|
||||
def _create_instance(self, **kwargs):
|
||||
return random.randint(0, 1000000)
|
||||
|
||||
def _close_instance(self, instance):
|
||||
pass
|
||||
|
||||
def test_mixin(self):
|
||||
mixin = self.FakeInstanceStoreMixin()
|
||||
assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) == mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
|
||||
|
||||
assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'e', 4}}) != mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
|
||||
|
||||
assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}} != mixin._get_instance(d={'a': 1, 'b': 2, 'g': {'d', 4}}))
|
||||
|
||||
assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) == mixin._get_instance(d={'a': 1}, e=[1, 2, 3])
|
||||
|
||||
assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) != mixin._get_instance(d={'a': 1}, e=[1, 2, 3, 4])
|
||||
|
||||
cookiejar = YoutubeDLCookieJar()
|
||||
assert mixin._get_instance(b=[1, 2], c=cookiejar) == mixin._get_instance(b=[1, 2], c=cookiejar)
|
||||
|
||||
assert mixin._get_instance(b=[1, 2], c=cookiejar) != mixin._get_instance(b=[1, 2], c=YoutubeDLCookieJar())
|
||||
|
||||
# Different order
|
||||
assert mixin._get_instance(c=cookiejar, b=[1, 2]) == mixin._get_instance(b=[1, 2], c=cookiejar)
|
||||
|
||||
m = mixin._get_instance(t=1234)
|
||||
assert mixin._get_instance(t=1234) == m
|
||||
mixin._clear_instances()
|
||||
assert mixin._get_instance(t=1234) != m
|
||||
|
||||
|
||||
class TestNetworkingExceptions:
|
||||
|
||||
@staticmethod
|
||||
def create_response(status):
|
||||
return Response(fp=io.BytesIO(b'test'), url='http://example.com', headers={'tesT': 'test'}, status=status)
|
||||
|
||||
@pytest.mark.parametrize('http_error_class', [HTTPError, lambda r: _CompatHTTPError(HTTPError(r))])
|
||||
def test_http_error(self, http_error_class):
|
||||
|
||||
response = self.create_response(403)
|
||||
error = http_error_class(response)
|
||||
|
||||
assert error.status == 403
|
||||
assert str(error) == error.msg == 'HTTP Error 403: Forbidden'
|
||||
assert error.reason == response.reason
|
||||
assert error.response is response
|
||||
|
||||
data = error.response.read()
|
||||
assert data == b'test'
|
||||
assert repr(error) == '<HTTPError 403: Forbidden>'
|
||||
|
||||
@pytest.mark.parametrize('http_error_class', [HTTPError, lambda *args, **kwargs: _CompatHTTPError(HTTPError(*args, **kwargs))])
|
||||
def test_redirect_http_error(self, http_error_class):
|
||||
response = self.create_response(301)
|
||||
error = http_error_class(response, redirect_loop=True)
|
||||
assert str(error) == error.msg == 'HTTP Error 301: Moved Permanently (redirect loop detected)'
|
||||
assert error.reason == 'Moved Permanently'
|
||||
|
||||
def test_compat_http_error(self):
|
||||
response = self.create_response(403)
|
||||
error = _CompatHTTPError(HTTPError(response))
|
||||
assert isinstance(error, HTTPError)
|
||||
assert isinstance(error, urllib.error.HTTPError)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def raises_deprecation_warning():
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter('always')
|
||||
yield
|
||||
|
||||
if len(w) == 0:
|
||||
pytest.fail('Did not raise DeprecationWarning')
|
||||
if len(w) > 1:
|
||||
pytest.fail(f'Raised multiple warnings: {w}')
|
||||
|
||||
if not issubclass(w[-1].category, DeprecationWarning):
|
||||
pytest.fail(f'Expected DeprecationWarning, got {w[-1].category}')
|
||||
w.clear()
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.code == 403
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.getcode() == 403
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.hdrs is error.response.headers
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.info() is error.response.headers
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.headers is error.response.headers
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.filename == error.response.url
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.url == error.response.url
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert error.geturl() == error.response.url
|
||||
|
||||
# Passthrough file operations
|
||||
with raises_deprecation_warning():
|
||||
assert error.read() == b'test'
|
||||
|
||||
with raises_deprecation_warning():
|
||||
assert not error.closed
|
||||
|
||||
with raises_deprecation_warning():
|
||||
# Technically Response operations are also passed through, which should not be used.
|
||||
assert error.get_header('test') == 'test'
|
||||
|
||||
# Should not raise a warning
|
||||
error.close()
|
||||
|
||||
@pytest.mark.skipif(
|
||||
platform.python_implementation() == 'PyPy', reason='garbage collector works differently in pypy')
|
||||
def test_compat_http_error_autoclose(self):
|
||||
# Compat HTTPError should not autoclose response
|
||||
response = self.create_response(403)
|
||||
_CompatHTTPError(HTTPError(response))
|
||||
assert not response.closed
|
||||
|
||||
def test_incomplete_read_error(self):
|
||||
error = IncompleteRead(4, 3, cause='test')
|
||||
assert isinstance(error, IncompleteRead)
|
||||
assert repr(error) == '<IncompleteRead: 4 bytes read, 3 more expected>'
|
||||
assert str(error) == error.msg == '4 bytes read, 3 more expected'
|
||||
assert error.partial == 4
|
||||
assert error.expected == 3
|
||||
assert error.cause == 'test'
|
||||
|
||||
error = IncompleteRead(3)
|
||||
assert repr(error) == '<IncompleteRead: 3 bytes read>'
|
||||
assert str(error) == '3 bytes read'
|
||||
@@ -1,113 +1,450 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Allow direct execution
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import unittest
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
|
||||
import abc
|
||||
import contextlib
|
||||
import enum
|
||||
import functools
|
||||
import http.server
|
||||
import json
|
||||
import random
|
||||
import subprocess
|
||||
import urllib.request
|
||||
import socket
|
||||
import struct
|
||||
import time
|
||||
from socketserver import (
|
||||
BaseRequestHandler,
|
||||
StreamRequestHandler,
|
||||
ThreadingTCPServer,
|
||||
)
|
||||
|
||||
from test.helper import FakeYDL, get_params, is_download_test
|
||||
from test.helper import http_server_port
|
||||
from yt_dlp.networking import Request
|
||||
from yt_dlp.networking.exceptions import ProxyError, TransportError
|
||||
from yt_dlp.socks import (
|
||||
SOCKS4_REPLY_VERSION,
|
||||
SOCKS4_VERSION,
|
||||
SOCKS5_USER_AUTH_SUCCESS,
|
||||
SOCKS5_USER_AUTH_VERSION,
|
||||
SOCKS5_VERSION,
|
||||
Socks5AddressType,
|
||||
Socks5Auth,
|
||||
)
|
||||
|
||||
SOCKS5_USER_AUTH_FAILURE = 0x1
|
||||
|
||||
|
||||
@is_download_test
|
||||
class TestMultipleSocks(unittest.TestCase):
|
||||
@staticmethod
|
||||
def _check_params(attrs):
|
||||
params = get_params()
|
||||
for attr in attrs:
|
||||
if attr not in params:
|
||||
print('Missing %s. Skipping.' % attr)
|
||||
class Socks4CD(enum.IntEnum):
|
||||
REQUEST_GRANTED = 90
|
||||
REQUEST_REJECTED_OR_FAILED = 91
|
||||
REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD = 92
|
||||
REQUEST_REJECTED_DIFFERENT_USERID = 93
|
||||
|
||||
|
||||
class Socks5Reply(enum.IntEnum):
|
||||
SUCCEEDED = 0x0
|
||||
GENERAL_FAILURE = 0x1
|
||||
CONNECTION_NOT_ALLOWED = 0x2
|
||||
NETWORK_UNREACHABLE = 0x3
|
||||
HOST_UNREACHABLE = 0x4
|
||||
CONNECTION_REFUSED = 0x5
|
||||
TTL_EXPIRED = 0x6
|
||||
COMMAND_NOT_SUPPORTED = 0x7
|
||||
ADDRESS_TYPE_NOT_SUPPORTED = 0x8
|
||||
|
||||
|
||||
class SocksTestRequestHandler(BaseRequestHandler):
|
||||
|
||||
def __init__(self, *args, socks_info=None, **kwargs):
|
||||
self.socks_info = socks_info
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class SocksProxyHandler(BaseRequestHandler):
|
||||
def __init__(self, request_handler_class, socks_server_kwargs, *args, **kwargs):
|
||||
self.socks_kwargs = socks_server_kwargs or {}
|
||||
self.request_handler_class = request_handler_class
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class Socks5ProxyHandler(StreamRequestHandler, SocksProxyHandler):
|
||||
|
||||
# SOCKS5 protocol https://tools.ietf.org/html/rfc1928
|
||||
# SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
|
||||
|
||||
def handle(self):
|
||||
sleep = self.socks_kwargs.get('sleep')
|
||||
if sleep:
|
||||
time.sleep(sleep)
|
||||
version, nmethods = self.connection.recv(2)
|
||||
assert version == SOCKS5_VERSION
|
||||
methods = list(self.connection.recv(nmethods))
|
||||
|
||||
auth = self.socks_kwargs.get('auth')
|
||||
|
||||
if auth is not None and Socks5Auth.AUTH_USER_PASS not in methods:
|
||||
self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
|
||||
self.server.close_request(self.request)
|
||||
return
|
||||
|
||||
elif Socks5Auth.AUTH_USER_PASS in methods:
|
||||
self.connection.sendall(struct.pack("!BB", SOCKS5_VERSION, Socks5Auth.AUTH_USER_PASS))
|
||||
|
||||
_, user_len = struct.unpack('!BB', self.connection.recv(2))
|
||||
username = self.connection.recv(user_len).decode()
|
||||
pass_len = ord(self.connection.recv(1))
|
||||
password = self.connection.recv(pass_len).decode()
|
||||
|
||||
if username == auth[0] and password == auth[1]:
|
||||
self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_SUCCESS))
|
||||
else:
|
||||
self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_FAILURE))
|
||||
self.server.close_request(self.request)
|
||||
return
|
||||
return params
|
||||
|
||||
def test_proxy_http(self):
|
||||
params = self._check_params(['primary_proxy', 'primary_server_ip'])
|
||||
if params is None:
|
||||
return
|
||||
ydl = FakeYDL({
|
||||
'proxy': params['primary_proxy']
|
||||
})
|
||||
self.assertEqual(
|
||||
ydl.urlopen('http://yt-dl.org/ip').read().decode(),
|
||||
params['primary_server_ip'])
|
||||
|
||||
def test_proxy_https(self):
|
||||
params = self._check_params(['primary_proxy', 'primary_server_ip'])
|
||||
if params is None:
|
||||
return
|
||||
ydl = FakeYDL({
|
||||
'proxy': params['primary_proxy']
|
||||
})
|
||||
self.assertEqual(
|
||||
ydl.urlopen('https://yt-dl.org/ip').read().decode(),
|
||||
params['primary_server_ip'])
|
||||
|
||||
def test_secondary_proxy_http(self):
|
||||
params = self._check_params(['secondary_proxy', 'secondary_server_ip'])
|
||||
if params is None:
|
||||
return
|
||||
ydl = FakeYDL()
|
||||
req = urllib.request.Request('http://yt-dl.org/ip')
|
||||
req.add_header('Ytdl-request-proxy', params['secondary_proxy'])
|
||||
self.assertEqual(
|
||||
ydl.urlopen(req).read().decode(),
|
||||
params['secondary_server_ip'])
|
||||
|
||||
def test_secondary_proxy_https(self):
|
||||
params = self._check_params(['secondary_proxy', 'secondary_server_ip'])
|
||||
if params is None:
|
||||
return
|
||||
ydl = FakeYDL()
|
||||
req = urllib.request.Request('https://yt-dl.org/ip')
|
||||
req.add_header('Ytdl-request-proxy', params['secondary_proxy'])
|
||||
self.assertEqual(
|
||||
ydl.urlopen(req).read().decode(),
|
||||
params['secondary_server_ip'])
|
||||
|
||||
|
||||
@is_download_test
|
||||
class TestSocks(unittest.TestCase):
|
||||
_SKIP_SOCKS_TEST = True
|
||||
|
||||
def setUp(self):
|
||||
if self._SKIP_SOCKS_TEST:
|
||||
elif Socks5Auth.AUTH_NONE in methods:
|
||||
self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NONE))
|
||||
else:
|
||||
self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
|
||||
self.server.close_request(self.request)
|
||||
return
|
||||
|
||||
self.port = random.randint(20000, 30000)
|
||||
self.server_process = subprocess.Popen([
|
||||
'srelay', '-f', '-i', '127.0.0.1:%d' % self.port],
|
||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
version, command, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))
|
||||
socks_info = {
|
||||
'version': version,
|
||||
'auth_methods': methods,
|
||||
'command': command,
|
||||
'client_address': self.client_address,
|
||||
'ipv4_address': None,
|
||||
'domain_address': None,
|
||||
'ipv6_address': None,
|
||||
}
|
||||
if address_type == Socks5AddressType.ATYP_IPV4:
|
||||
socks_info['ipv4_address'] = socket.inet_ntoa(self.connection.recv(4))
|
||||
elif address_type == Socks5AddressType.ATYP_DOMAINNAME:
|
||||
socks_info['domain_address'] = self.connection.recv(ord(self.connection.recv(1))).decode()
|
||||
elif address_type == Socks5AddressType.ATYP_IPV6:
|
||||
socks_info['ipv6_address'] = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16))
|
||||
else:
|
||||
self.server.close_request(self.request)
|
||||
|
||||
def tearDown(self):
|
||||
if self._SKIP_SOCKS_TEST:
|
||||
socks_info['port'] = struct.unpack('!H', self.connection.recv(2))[0]
|
||||
|
||||
# dummy response, the returned IP is just a placeholder
|
||||
self.connection.sendall(struct.pack(
|
||||
'!BBBBIH', SOCKS5_VERSION, self.socks_kwargs.get('reply', Socks5Reply.SUCCEEDED), 0x0, 0x1, 0x7f000001, 40000))
|
||||
|
||||
self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
|
||||
|
||||
|
||||
class Socks4ProxyHandler(StreamRequestHandler, SocksProxyHandler):
|
||||
|
||||
# SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
|
||||
# SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
|
||||
|
||||
def _read_until_null(self):
|
||||
return b''.join(iter(functools.partial(self.connection.recv, 1), b'\x00'))
|
||||
|
||||
def handle(self):
|
||||
sleep = self.socks_kwargs.get('sleep')
|
||||
if sleep:
|
||||
time.sleep(sleep)
|
||||
socks_info = {
|
||||
'version': SOCKS4_VERSION,
|
||||
'command': None,
|
||||
'client_address': self.client_address,
|
||||
'ipv4_address': None,
|
||||
'port': None,
|
||||
'domain_address': None,
|
||||
}
|
||||
version, command, dest_port, dest_ip = struct.unpack('!BBHI', self.connection.recv(8))
|
||||
socks_info['port'] = dest_port
|
||||
socks_info['command'] = command
|
||||
if version != SOCKS4_VERSION:
|
||||
self.server.close_request(self.request)
|
||||
return
|
||||
use_remote_dns = False
|
||||
if 0x0 < dest_ip <= 0xFF:
|
||||
use_remote_dns = True
|
||||
else:
|
||||
socks_info['ipv4_address'] = socket.inet_ntoa(struct.pack("!I", dest_ip))
|
||||
|
||||
user_id = self._read_until_null().decode()
|
||||
if user_id != (self.socks_kwargs.get('user_id') or ''):
|
||||
self.connection.sendall(struct.pack(
|
||||
'!BBHI', SOCKS4_REPLY_VERSION, Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID, 0x00, 0x00000000))
|
||||
self.server.close_request(self.request)
|
||||
return
|
||||
|
||||
self.server_process.terminate()
|
||||
self.server_process.communicate()
|
||||
if use_remote_dns:
|
||||
socks_info['domain_address'] = self._read_until_null().decode()
|
||||
|
||||
def _get_ip(self, protocol):
|
||||
if self._SKIP_SOCKS_TEST:
|
||||
return '127.0.0.1'
|
||||
# dummy response, the returned IP is just a placeholder
|
||||
self.connection.sendall(
|
||||
struct.pack(
|
||||
'!BBHI', SOCKS4_REPLY_VERSION,
|
||||
self.socks_kwargs.get('cd_reply', Socks4CD.REQUEST_GRANTED), 40000, 0x7f000001))
|
||||
|
||||
ydl = FakeYDL({
|
||||
'proxy': '%s://127.0.0.1:%d' % (protocol, self.port),
|
||||
})
|
||||
return ydl.urlopen('http://yt-dl.org/ip').read().decode()
|
||||
self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
|
||||
|
||||
def test_socks4(self):
|
||||
self.assertTrue(isinstance(self._get_ip('socks4'), str))
|
||||
|
||||
def test_socks4a(self):
|
||||
self.assertTrue(isinstance(self._get_ip('socks4a'), str))
|
||||
class IPv6ThreadingTCPServer(ThreadingTCPServer):
|
||||
address_family = socket.AF_INET6
|
||||
|
||||
def test_socks5(self):
|
||||
self.assertTrue(isinstance(self._get_ip('socks5'), str))
|
||||
|
||||
class SocksHTTPTestRequestHandler(http.server.BaseHTTPRequestHandler, SocksTestRequestHandler):
|
||||
def do_GET(self):
|
||||
if self.path == '/socks_info':
|
||||
payload = json.dumps(self.socks_info.copy())
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'application/json; charset=utf-8')
|
||||
self.send_header('Content-Length', str(len(payload)))
|
||||
self.end_headers()
|
||||
self.wfile.write(payload.encode())
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def socks_server(socks_server_class, request_handler, bind_ip=None, **socks_server_kwargs):
|
||||
server = server_thread = None
|
||||
try:
|
||||
bind_address = bind_ip or '127.0.0.1'
|
||||
server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
|
||||
server = server_type(
|
||||
(bind_address, 0), functools.partial(socks_server_class, request_handler, socks_server_kwargs))
|
||||
server_port = http_server_port(server)
|
||||
server_thread = threading.Thread(target=server.serve_forever)
|
||||
server_thread.daemon = True
|
||||
server_thread.start()
|
||||
if '.' not in bind_address:
|
||||
yield f'[{bind_address}]:{server_port}'
|
||||
else:
|
||||
yield f'{bind_address}:{server_port}'
|
||||
finally:
|
||||
server.shutdown()
|
||||
server.server_close()
|
||||
server_thread.join(2.0)
|
||||
|
||||
|
||||
class SocksProxyTestContext(abc.ABC):
|
||||
REQUEST_HANDLER_CLASS = None
|
||||
|
||||
def socks_server(self, server_class, *args, **kwargs):
|
||||
return socks_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
|
||||
|
||||
@abc.abstractmethod
|
||||
def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
|
||||
"""return a dict of socks_info"""
|
||||
|
||||
|
||||
class HTTPSocksTestProxyContext(SocksProxyTestContext):
|
||||
REQUEST_HANDLER_CLASS = SocksHTTPTestRequestHandler
|
||||
|
||||
def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
|
||||
request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs)
|
||||
handler.validate(request)
|
||||
return json.loads(handler.send(request).read().decode())
|
||||
|
||||
|
||||
CTX_MAP = {
|
||||
'http': HTTPSocksTestProxyContext,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def ctx(request):
|
||||
return CTX_MAP[request.param]()
|
||||
|
||||
|
||||
class TestSocks4Proxy:
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_socks4_no_auth(self, handler, ctx):
|
||||
with handler() as rh:
|
||||
with ctx.socks_server(Socks4ProxyHandler) as server_address:
|
||||
response = ctx.socks_info_request(
|
||||
rh, proxies={'all': f'socks4://{server_address}'})
|
||||
assert response['version'] == 4
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_socks4_auth(self, handler, ctx):
|
||||
with handler() as rh:
|
||||
with ctx.socks_server(Socks4ProxyHandler, user_id='user') as server_address:
|
||||
with pytest.raises(ProxyError):
|
||||
ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
|
||||
response = ctx.socks_info_request(
|
||||
rh, proxies={'all': f'socks4://user:@{server_address}'})
|
||||
assert response['version'] == 4
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_socks4a_ipv4_target(self, handler, ctx):
|
||||
with ctx.socks_server(Socks4ProxyHandler) as server_address:
|
||||
with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
|
||||
response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
|
||||
assert response['version'] == 4
|
||||
assert (response['ipv4_address'] == '127.0.0.1') != (response['domain_address'] == '127.0.0.1')
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_socks4a_domain_target(self, handler, ctx):
|
||||
with ctx.socks_server(Socks4ProxyHandler) as server_address:
|
||||
with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
|
||||
response = ctx.socks_info_request(rh, target_domain='localhost')
|
||||
assert response['version'] == 4
|
||||
assert response['ipv4_address'] is None
|
||||
assert response['domain_address'] == 'localhost'
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_ipv4_client_source_address(self, handler, ctx):
|
||||
with ctx.socks_server(Socks4ProxyHandler) as server_address:
|
||||
source_address = f'127.0.0.{random.randint(5, 255)}'
|
||||
with handler(proxies={'all': f'socks4://{server_address}'},
|
||||
source_address=source_address) as rh:
|
||||
response = ctx.socks_info_request(rh)
|
||||
assert response['client_address'][0] == source_address
|
||||
assert response['version'] == 4
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
@pytest.mark.parametrize('reply_code', [
|
||||
Socks4CD.REQUEST_REJECTED_OR_FAILED,
|
||||
Socks4CD.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD,
|
||||
Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID,
|
||||
])
|
||||
def test_socks4_errors(self, handler, ctx, reply_code):
|
||||
with ctx.socks_server(Socks4ProxyHandler, cd_reply=reply_code) as server_address:
|
||||
with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
|
||||
with pytest.raises(ProxyError):
|
||||
ctx.socks_info_request(rh)
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_ipv6_socks4_proxy(self, handler, ctx):
|
||||
with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
|
||||
with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
|
||||
response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
|
||||
assert response['client_address'][0] == '::1'
|
||||
assert response['ipv4_address'] == '127.0.0.1'
|
||||
assert response['version'] == 4
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_timeout(self, handler, ctx):
|
||||
with ctx.socks_server(Socks4ProxyHandler, sleep=2) as server_address:
|
||||
with handler(proxies={'all': f'socks4://{server_address}'}, timeout=0.5) as rh:
|
||||
with pytest.raises(TransportError):
|
||||
ctx.socks_info_request(rh)
|
||||
|
||||
|
||||
class TestSocks5Proxy:
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_socks5_no_auth(self, handler, ctx):
|
||||
with ctx.socks_server(Socks5ProxyHandler) as server_address:
|
||||
with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
|
||||
response = ctx.socks_info_request(rh)
|
||||
assert response['auth_methods'] == [0x0]
|
||||
assert response['version'] == 5
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_socks5_user_pass(self, handler, ctx):
|
||||
with ctx.socks_server(Socks5ProxyHandler, auth=('test', 'testpass')) as server_address:
|
||||
with handler() as rh:
|
||||
with pytest.raises(ProxyError):
|
||||
ctx.socks_info_request(rh, proxies={'all': f'socks5://{server_address}'})
|
||||
|
||||
response = ctx.socks_info_request(
|
||||
rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
|
||||
|
||||
assert response['auth_methods'] == [Socks5Auth.AUTH_NONE, Socks5Auth.AUTH_USER_PASS]
|
||||
assert response['version'] == 5
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_socks5_ipv4_target(self, handler, ctx):
|
||||
with ctx.socks_server(Socks5ProxyHandler) as server_address:
|
||||
with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
|
||||
response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
|
||||
assert response['ipv4_address'] == '127.0.0.1'
|
||||
assert response['version'] == 5
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_socks5_domain_target(self, handler, ctx):
|
||||
with ctx.socks_server(Socks5ProxyHandler) as server_address:
|
||||
with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
|
||||
response = ctx.socks_info_request(rh, target_domain='localhost')
|
||||
assert (response['ipv4_address'] == '127.0.0.1') != (response['ipv6_address'] == '::1')
|
||||
assert response['version'] == 5
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_socks5h_domain_target(self, handler, ctx):
|
||||
with ctx.socks_server(Socks5ProxyHandler) as server_address:
|
||||
with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
|
||||
response = ctx.socks_info_request(rh, target_domain='localhost')
|
||||
assert response['ipv4_address'] is None
|
||||
assert response['domain_address'] == 'localhost'
|
||||
assert response['version'] == 5
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_socks5h_ip_target(self, handler, ctx):
|
||||
with ctx.socks_server(Socks5ProxyHandler) as server_address:
|
||||
with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
|
||||
response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
|
||||
assert response['ipv4_address'] == '127.0.0.1'
|
||||
assert response['domain_address'] is None
|
||||
assert response['version'] == 5
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_socks5_ipv6_destination(self, handler, ctx):
|
||||
with ctx.socks_server(Socks5ProxyHandler) as server_address:
|
||||
with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
|
||||
response = ctx.socks_info_request(rh, target_domain='[::1]')
|
||||
assert response['ipv6_address'] == '::1'
|
||||
assert response['version'] == 5
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_ipv6_socks5_proxy(self, handler, ctx):
|
||||
with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
|
||||
with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
|
||||
response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
|
||||
assert response['client_address'][0] == '::1'
|
||||
assert response['ipv4_address'] == '127.0.0.1'
|
||||
assert response['version'] == 5
|
||||
|
||||
# XXX: is there any feasible way of testing IPv6 source addresses?
|
||||
# Same would go for non-proxy source_address test...
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_ipv4_client_source_address(self, handler, ctx):
|
||||
with ctx.socks_server(Socks5ProxyHandler) as server_address:
|
||||
source_address = f'127.0.0.{random.randint(5, 255)}'
|
||||
with handler(proxies={'all': f'socks5://{server_address}'}, source_address=source_address) as rh:
|
||||
response = ctx.socks_info_request(rh)
|
||||
assert response['client_address'][0] == source_address
|
||||
assert response['version'] == 5
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
@pytest.mark.parametrize('reply_code', [
|
||||
Socks5Reply.GENERAL_FAILURE,
|
||||
Socks5Reply.CONNECTION_NOT_ALLOWED,
|
||||
Socks5Reply.NETWORK_UNREACHABLE,
|
||||
Socks5Reply.HOST_UNREACHABLE,
|
||||
Socks5Reply.CONNECTION_REFUSED,
|
||||
Socks5Reply.TTL_EXPIRED,
|
||||
Socks5Reply.COMMAND_NOT_SUPPORTED,
|
||||
Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
|
||||
])
|
||||
def test_socks5_errors(self, handler, ctx, reply_code):
|
||||
with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
|
||||
with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
|
||||
with pytest.raises(ProxyError):
|
||||
ctx.socks_info_request(rh)
|
||||
|
||||
@pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
|
||||
def test_timeout(self, handler, ctx):
|
||||
with ctx.socks_server(Socks5ProxyHandler, sleep=2) as server_address:
|
||||
with handler(proxies={'all': f'socks5://{server_address}'}, timeout=1) as rh:
|
||||
with pytest.raises(TransportError):
|
||||
ctx.socks_info_request(rh)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -14,6 +14,7 @@ import contextlib
|
||||
import io
|
||||
import itertools
|
||||
import json
|
||||
import subprocess
|
||||
import xml.etree.ElementTree
|
||||
|
||||
from yt_dlp.compat import (
|
||||
@@ -28,6 +29,7 @@ from yt_dlp.utils import (
|
||||
InAdvancePagedList,
|
||||
LazyList,
|
||||
OnDemandPagedList,
|
||||
Popen,
|
||||
age_restricted,
|
||||
args_to_str,
|
||||
base_url,
|
||||
@@ -47,10 +49,9 @@ from yt_dlp.utils import (
|
||||
encode_base_n,
|
||||
encode_compat_str,
|
||||
encodeFilename,
|
||||
escape_rfc3986,
|
||||
escape_url,
|
||||
expand_path,
|
||||
extract_attributes,
|
||||
extract_basic_auth,
|
||||
find_xpath_attr,
|
||||
fix_xml_ampersands,
|
||||
float_or_none,
|
||||
@@ -103,7 +104,6 @@ from yt_dlp.utils import (
|
||||
sanitize_filename,
|
||||
sanitize_path,
|
||||
sanitize_url,
|
||||
sanitized_Request,
|
||||
shell_quote,
|
||||
smuggle_url,
|
||||
str_or_none,
|
||||
@@ -132,6 +132,12 @@ from yt_dlp.utils import (
|
||||
xpath_text,
|
||||
xpath_with_ns,
|
||||
)
|
||||
from yt_dlp.utils.networking import (
|
||||
HTTPHeaderDict,
|
||||
escape_rfc3986,
|
||||
normalize_url,
|
||||
remove_dot_segments,
|
||||
)
|
||||
|
||||
|
||||
class TestUtil(unittest.TestCase):
|
||||
@@ -258,15 +264,6 @@ class TestUtil(unittest.TestCase):
|
||||
self.assertEqual(sanitize_url('https://foo.bar'), 'https://foo.bar')
|
||||
self.assertEqual(sanitize_url('foo bar'), 'foo bar')
|
||||
|
||||
def test_extract_basic_auth(self):
|
||||
auth_header = lambda url: sanitized_Request(url).get_header('Authorization')
|
||||
self.assertFalse(auth_header('http://foo.bar'))
|
||||
self.assertFalse(auth_header('http://:foo.bar'))
|
||||
self.assertEqual(auth_header('http://@foo.bar'), 'Basic Og==')
|
||||
self.assertEqual(auth_header('http://:pass@foo.bar'), 'Basic OnBhc3M=')
|
||||
self.assertEqual(auth_header('http://user:@foo.bar'), 'Basic dXNlcjo=')
|
||||
self.assertEqual(auth_header('http://user:pass@foo.bar'), 'Basic dXNlcjpwYXNz')
|
||||
|
||||
def test_expand_path(self):
|
||||
def env(var):
|
||||
return f'%{var}%' if sys.platform == 'win32' else f'${var}'
|
||||
@@ -668,6 +665,8 @@ class TestUtil(unittest.TestCase):
|
||||
self.assertEqual(parse_duration('P0Y0M0DT0H4M20.880S'), 260.88)
|
||||
self.assertEqual(parse_duration('01:02:03:050'), 3723.05)
|
||||
self.assertEqual(parse_duration('103:050'), 103.05)
|
||||
self.assertEqual(parse_duration('1HR 3MIN'), 3780)
|
||||
self.assertEqual(parse_duration('2hrs 3mins'), 7380)
|
||||
|
||||
def test_fix_xml_ampersands(self):
|
||||
self.assertEqual(
|
||||
@@ -944,24 +943,45 @@ class TestUtil(unittest.TestCase):
|
||||
self.assertEqual(escape_rfc3986('foo bar'), 'foo%20bar')
|
||||
self.assertEqual(escape_rfc3986('foo%20bar'), 'foo%20bar')
|
||||
|
||||
def test_escape_url(self):
|
||||
def test_normalize_url(self):
|
||||
self.assertEqual(
|
||||
escape_url('http://wowza.imust.org/srv/vod/telemb/new/UPLOAD/UPLOAD/20224_IncendieHavré_FD.mp4'),
|
||||
normalize_url('http://wowza.imust.org/srv/vod/telemb/new/UPLOAD/UPLOAD/20224_IncendieHavré_FD.mp4'),
|
||||
'http://wowza.imust.org/srv/vod/telemb/new/UPLOAD/UPLOAD/20224_IncendieHavre%CC%81_FD.mp4'
|
||||
)
|
||||
self.assertEqual(
|
||||
escape_url('http://www.ardmediathek.de/tv/Sturm-der-Liebe/Folge-2036-Zu-Mann-und-Frau-erklärt/Das-Erste/Video?documentId=22673108&bcastId=5290'),
|
||||
normalize_url('http://www.ardmediathek.de/tv/Sturm-der-Liebe/Folge-2036-Zu-Mann-und-Frau-erklärt/Das-Erste/Video?documentId=22673108&bcastId=5290'),
|
||||
'http://www.ardmediathek.de/tv/Sturm-der-Liebe/Folge-2036-Zu-Mann-und-Frau-erkl%C3%A4rt/Das-Erste/Video?documentId=22673108&bcastId=5290'
|
||||
)
|
||||
self.assertEqual(
|
||||
escape_url('http://тест.рф/фрагмент'),
|
||||
normalize_url('http://тест.рф/фрагмент'),
|
||||
'http://xn--e1aybc.xn--p1ai/%D1%84%D1%80%D0%B0%D0%B3%D0%BC%D0%B5%D0%BD%D1%82'
|
||||
)
|
||||
self.assertEqual(
|
||||
escape_url('http://тест.рф/абв?абв=абв#абв'),
|
||||
normalize_url('http://тест.рф/абв?абв=абв#абв'),
|
||||
'http://xn--e1aybc.xn--p1ai/%D0%B0%D0%B1%D0%B2?%D0%B0%D0%B1%D0%B2=%D0%B0%D0%B1%D0%B2#%D0%B0%D0%B1%D0%B2'
|
||||
)
|
||||
self.assertEqual(escape_url('http://vimeo.com/56015672#at=0'), 'http://vimeo.com/56015672#at=0')
|
||||
self.assertEqual(normalize_url('http://vimeo.com/56015672#at=0'), 'http://vimeo.com/56015672#at=0')
|
||||
|
||||
self.assertEqual(normalize_url('http://www.example.com/../a/b/../c/./d.html'), 'http://www.example.com/a/c/d.html')
|
||||
|
||||
def test_remove_dot_segments(self):
|
||||
self.assertEqual(remove_dot_segments('/a/b/c/./../../g'), '/a/g')
|
||||
self.assertEqual(remove_dot_segments('mid/content=5/../6'), 'mid/6')
|
||||
self.assertEqual(remove_dot_segments('/ad/../cd'), '/cd')
|
||||
self.assertEqual(remove_dot_segments('/ad/../cd/'), '/cd/')
|
||||
self.assertEqual(remove_dot_segments('/..'), '/')
|
||||
self.assertEqual(remove_dot_segments('/./'), '/')
|
||||
self.assertEqual(remove_dot_segments('/./a'), '/a')
|
||||
self.assertEqual(remove_dot_segments('/abc/./.././d/././e/.././f/./../../ghi'), '/ghi')
|
||||
self.assertEqual(remove_dot_segments('/'), '/')
|
||||
self.assertEqual(remove_dot_segments('/t'), '/t')
|
||||
self.assertEqual(remove_dot_segments('t'), 't')
|
||||
self.assertEqual(remove_dot_segments(''), '')
|
||||
self.assertEqual(remove_dot_segments('/../a/b/c'), '/a/b/c')
|
||||
self.assertEqual(remove_dot_segments('../a'), 'a')
|
||||
self.assertEqual(remove_dot_segments('./a'), 'a')
|
||||
self.assertEqual(remove_dot_segments('.'), '')
|
||||
self.assertEqual(remove_dot_segments('////'), '////')
|
||||
|
||||
def test_js_to_json_vars_strings(self):
|
||||
self.assertDictEqual(
|
||||
@@ -1194,6 +1214,9 @@ class TestUtil(unittest.TestCase):
|
||||
on = js_to_json('\'"\\""\'')
|
||||
self.assertEqual(json.loads(on), '"""', msg='Unnecessary quote escape should be escaped')
|
||||
|
||||
on = js_to_json('[new Date("spam"), \'("eggs")\']')
|
||||
self.assertEqual(json.loads(on), ['spam', '("eggs")'], msg='Date regex should match a single string')
|
||||
|
||||
def test_js_to_json_malformed(self):
|
||||
self.assertEqual(js_to_json('42a1'), '42"a1"')
|
||||
self.assertEqual(js_to_json('42a-1'), '42"a"-1')
|
||||
@@ -1205,6 +1228,14 @@ class TestUtil(unittest.TestCase):
|
||||
self.assertEqual(js_to_json('`${name}"${name}"`', {'name': '5'}), '"5\\"5\\""')
|
||||
self.assertEqual(js_to_json('`${name}`', {}), '"name"')
|
||||
|
||||
def test_js_to_json_common_constructors(self):
|
||||
self.assertEqual(json.loads(js_to_json('new Map([["a", 5]])')), {'a': 5})
|
||||
self.assertEqual(json.loads(js_to_json('Array(5, 10)')), [5, 10])
|
||||
self.assertEqual(json.loads(js_to_json('new Array(15,5)')), [15, 5])
|
||||
self.assertEqual(json.loads(js_to_json('new Map([Array(5, 10),new Array(15,5)])')), {'5': 10, '15': 5})
|
||||
self.assertEqual(json.loads(js_to_json('new Date("123")')), "123")
|
||||
self.assertEqual(json.loads(js_to_json('new Date(\'2023-10-19\')')), "2023-10-19")
|
||||
|
||||
def test_extract_attributes(self):
|
||||
self.assertEqual(extract_attributes('<e x="y">'), {'x': 'y'})
|
||||
self.assertEqual(extract_attributes("<e x='y'>"), {'x': 'y'})
|
||||
@@ -1840,6 +1871,8 @@ Line 1
|
||||
def test_clean_podcast_url(self):
|
||||
self.assertEqual(clean_podcast_url('https://www.podtrac.com/pts/redirect.mp3/chtbl.com/track/5899E/traffic.megaphone.fm/HSW7835899191.mp3'), 'https://traffic.megaphone.fm/HSW7835899191.mp3')
|
||||
self.assertEqual(clean_podcast_url('https://play.podtrac.com/npr-344098539/edge1.pod.npr.org/anon.npr-podcasts/podcast/npr/waitwait/2020/10/20201003_waitwait_wwdtmpodcast201003-015621a5-f035-4eca-a9a1-7c118d90bc3c.mp3'), 'https://edge1.pod.npr.org/anon.npr-podcasts/podcast/npr/waitwait/2020/10/20201003_waitwait_wwdtmpodcast201003-015621a5-f035-4eca-a9a1-7c118d90bc3c.mp3')
|
||||
self.assertEqual(clean_podcast_url('https://pdst.fm/e/2.gum.fm/chtbl.com/track/chrt.fm/track/34D33/pscrb.fm/rss/p/traffic.megaphone.fm/ITLLC7765286967.mp3?updated=1687282661'), 'https://traffic.megaphone.fm/ITLLC7765286967.mp3?updated=1687282661')
|
||||
self.assertEqual(clean_podcast_url('https://pdst.fm/e/https://mgln.ai/e/441/www.buzzsprout.com/1121972/13019085-ep-252-the-deep-life-stack.mp3'), 'https://www.buzzsprout.com/1121972/13019085-ep-252-the-deep-life-stack.mp3')
|
||||
|
||||
def test_LazyList(self):
|
||||
it = list(range(10))
|
||||
@@ -2327,6 +2360,61 @@ Line 1
|
||||
self.assertEqual(traverse_obj(mobj, lambda k, _: k in (0, 'group')), ['0123', '3'],
|
||||
msg='function on a `re.Match` should give group name as well')
|
||||
|
||||
def test_http_header_dict(self):
|
||||
headers = HTTPHeaderDict()
|
||||
headers['ytdl-test'] = b'0'
|
||||
self.assertEqual(list(headers.items()), [('Ytdl-Test', '0')])
|
||||
headers['ytdl-test'] = 1
|
||||
self.assertEqual(list(headers.items()), [('Ytdl-Test', '1')])
|
||||
headers['Ytdl-test'] = '2'
|
||||
self.assertEqual(list(headers.items()), [('Ytdl-Test', '2')])
|
||||
self.assertTrue('ytDl-Test' in headers)
|
||||
self.assertEqual(str(headers), str(dict(headers)))
|
||||
self.assertEqual(repr(headers), str(dict(headers)))
|
||||
|
||||
headers.update({'X-dlp': 'data'})
|
||||
self.assertEqual(set(headers.items()), {('Ytdl-Test', '2'), ('X-Dlp', 'data')})
|
||||
self.assertEqual(dict(headers), {'Ytdl-Test': '2', 'X-Dlp': 'data'})
|
||||
self.assertEqual(len(headers), 2)
|
||||
self.assertEqual(headers.copy(), headers)
|
||||
headers2 = HTTPHeaderDict({'X-dlp': 'data3'}, **headers, **{'X-dlp': 'data2'})
|
||||
self.assertEqual(set(headers2.items()), {('Ytdl-Test', '2'), ('X-Dlp', 'data2')})
|
||||
self.assertEqual(len(headers2), 2)
|
||||
headers2.clear()
|
||||
self.assertEqual(len(headers2), 0)
|
||||
|
||||
# ensure we prefer latter headers
|
||||
headers3 = HTTPHeaderDict({'Ytdl-TeSt': 1}, {'Ytdl-test': 2})
|
||||
self.assertEqual(set(headers3.items()), {('Ytdl-Test', '2')})
|
||||
del headers3['ytdl-tesT']
|
||||
self.assertEqual(dict(headers3), {})
|
||||
|
||||
headers4 = HTTPHeaderDict({'ytdl-test': 'data;'})
|
||||
self.assertEqual(set(headers4.items()), {('Ytdl-Test', 'data;')})
|
||||
|
||||
def test_extract_basic_auth(self):
|
||||
assert extract_basic_auth('http://:foo.bar') == ('http://:foo.bar', None)
|
||||
assert extract_basic_auth('http://foo.bar') == ('http://foo.bar', None)
|
||||
assert extract_basic_auth('http://@foo.bar') == ('http://foo.bar', 'Basic Og==')
|
||||
assert extract_basic_auth('http://:pass@foo.bar') == ('http://foo.bar', 'Basic OnBhc3M=')
|
||||
assert extract_basic_auth('http://user:@foo.bar') == ('http://foo.bar', 'Basic dXNlcjo=')
|
||||
assert extract_basic_auth('http://user:pass@foo.bar') == ('http://foo.bar', 'Basic dXNlcjpwYXNz')
|
||||
|
||||
@unittest.skipUnless(compat_os_name == 'nt', 'Only relevant on Windows')
|
||||
def test_Popen_windows_escaping(self):
|
||||
def run_shell(args):
|
||||
stdout, stderr, error = Popen.run(
|
||||
args, text=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
assert not stderr
|
||||
assert not error
|
||||
return stdout
|
||||
|
||||
# Test escaping
|
||||
assert run_shell(['echo', 'test"&']) == '"test""&"\n'
|
||||
# Test if delayed expansion is disabled
|
||||
assert run_shell(['echo', '^!']) == '"^!"\n'
|
||||
assert run_shell('echo "^!"') == '"^!"\n'
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user