diff --git a/devscripts/bash-completion.in b/devscripts/bash-completion.in index 21f52798ed..bb66c20956 100644 --- a/devscripts/bash-completion.in +++ b/devscripts/bash-completion.in @@ -10,9 +10,13 @@ __yt_dlp() diropts="--cache-dir" if [[ ${prev} =~ ${fileopts} ]]; then + local IFS=$'\n' + type compopt &>/dev/null && compopt -o filenames COMPREPLY=( $(compgen -f -- ${cur}) ) return 0 elif [[ ${prev} =~ ${diropts} ]]; then + local IFS=$'\n' + type compopt &>/dev/null && compopt -o dirnames COMPREPLY=( $(compgen -d -- ${cur}) ) return 0 fi diff --git a/pyproject.toml b/pyproject.toml index 3775251e10..41d5ec3b0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,7 +75,7 @@ dev = [ ] static-analysis = [ "autopep8~=2.0", - "ruff~=0.11.0", + "ruff~=0.12.0", ] test = [ "pytest~=8.1", @@ -210,10 +210,12 @@ ignore = [ "TD001", # invalid-todo-tag "TD002", # missing-todo-author "TD003", # missing-todo-link + "PLC0415", # import-outside-top-level "PLE0604", # invalid-all-object (false positives) "PLE0643", # potential-index-error (false positives) "PLW0603", # global-statement "PLW1510", # subprocess-run-without-check + "PLW1641", # eq-without-hash "PLW2901", # redefined-loop-name "RUF001", # ambiguous-unicode-character-string "RUF012", # mutable-class-default diff --git a/test/test_InfoExtractor.py b/test/test_InfoExtractor.py index e6c8d574e0..c9f70431f7 100644 --- a/test/test_InfoExtractor.py +++ b/test/test_InfoExtractor.py @@ -36,6 +36,18 @@ def do_GET(self): self.send_header('Content-Type', 'text/html; charset=utf-8') self.end_headers() self.wfile.write(TEAPOT_RESPONSE_BODY.encode()) + elif self.path == '/fake.m3u8': + self.send_response(200) + self.send_header('Content-Length', '1024') + self.end_headers() + self.wfile.write(1024 * b'\x00') + elif self.path == '/bipbop.m3u8': + with open('test/testdata/m3u8/bipbop_16x9.m3u8', 'rb') as f: + data = f.read() + self.send_response(200) + self.send_header('Content-Length', str(len(data))) + self.end_headers() + self.wfile.write(data) else: assert False @@ -2079,5 +2091,45 @@ def test_search_nuxt_json(self): self.ie._search_nuxt_json(HTML_TMPL.format(data), None, default=DEFAULT), DEFAULT) +class TestInfoExtractorNetwork(unittest.TestCase): + def setUp(self, /): + self.httpd = http.server.HTTPServer( + ('127.0.0.1', 0), InfoExtractorTestRequestHandler) + self.port = http_server_port(self.httpd) + + self.server_thread = threading.Thread(target=self.httpd.serve_forever) + self.server_thread.daemon = True + self.server_thread.start() + + self.called = False + + def require_warning(*args, **kwargs): + self.called = True + + self.ydl = FakeYDL() + self.ydl.report_warning = require_warning + self.ie = DummyIE(self.ydl) + + def tearDown(self, /): + self.ydl.close() + self.httpd.shutdown() + self.httpd.server_close() + self.server_thread.join(1) + + def test_extract_m3u8_formats(self): + formats, subtitles = self.ie._extract_m3u8_formats_and_subtitles( + f'http://127.0.0.1:{self.port}/bipbop.m3u8', None, fatal=False) + self.assertFalse(self.called) + self.assertTrue(formats) + self.assertTrue(subtitles) + + def test_extract_m3u8_formats_warning(self): + formats, subtitles = self.ie._extract_m3u8_formats_and_subtitles( + f'http://127.0.0.1:{self.port}/fake.m3u8', None, fatal=False) + self.assertTrue(self.called, 'Warning was not issued for binary m3u8 file') + self.assertFalse(formats) + self.assertFalse(subtitles) + + if __name__ == '__main__': unittest.main() diff --git a/test/test_download.py b/test/test_download.py index 3f36869d9d..c7842735c2 100755 --- a/test/test_download.py +++ b/test/test_download.py @@ -14,6 +14,7 @@ from test.helper import ( assertGreaterEqual, + assertLessEqual, expect_info_dict, expect_warnings, get_params, @@ -121,10 +122,13 @@ def print_skipping(reason): params = get_params(test_case.get('params', {})) params['outtmpl'] = tname + '_' + params['outtmpl'] if is_playlist and 'playlist' not in test_case: - params.setdefault('extract_flat', 'in_playlist') - params.setdefault('playlistend', test_case.get( - 'playlist_mincount', test_case.get('playlist_count', -2) + 1)) + params.setdefault('playlistend', max( + test_case.get('playlist_mincount', -1), + test_case.get('playlist_count', -2) + 1, + test_case.get('playlist_maxcount', -2) + 1)) params.setdefault('skip_download', True) + if 'playlist_duration_sum' not in test_case: + params.setdefault('extract_flat', 'in_playlist') ydl = YoutubeDL(params, auto_init=False) ydl.add_default_info_extractors() @@ -159,6 +163,7 @@ def try_rm_tcs_files(tcs=None): try_rm(os.path.splitext(tc_filename)[0] + '.info.json') try_rm_tcs_files() try: + test_url = test_case['url'] try_num = 1 while True: try: @@ -166,7 +171,7 @@ def try_rm_tcs_files(tcs=None): # for outside error handling, and returns the exit code # instead of the result dict. res_dict = ydl.extract_info( - test_case['url'], + test_url, force_generic_extractor=params.get('force_generic_extractor', False)) except (DownloadError, ExtractorError) as err: # Check if the exception is not a network related one @@ -194,23 +199,23 @@ def try_rm_tcs_files(tcs=None): self.assertTrue('entries' in res_dict) expect_info_dict(self, res_dict, test_case.get('info_dict', {})) + num_entries = len(res_dict.get('entries', [])) if 'playlist_mincount' in test_case: + mincount = test_case['playlist_mincount'] assertGreaterEqual( - self, - len(res_dict['entries']), - test_case['playlist_mincount'], - 'Expected at least %d in playlist %s, but got only %d' % ( - test_case['playlist_mincount'], test_case['url'], - len(res_dict['entries']))) + self, num_entries, mincount, + f'Expected at least {mincount} entries in playlist {test_url}, but got only {num_entries}') if 'playlist_count' in test_case: + count = test_case['playlist_count'] + got = num_entries if num_entries <= count else 'more' self.assertEqual( - len(res_dict['entries']), - test_case['playlist_count'], - 'Expected %d entries in playlist %s, but got %d.' % ( - test_case['playlist_count'], - test_case['url'], - len(res_dict['entries']), - )) + num_entries, count, + f'Expected exactly {count} entries in playlist {test_url}, but got {got}') + if 'playlist_maxcount' in test_case: + maxcount = test_case['playlist_maxcount'] + assertLessEqual( + self, num_entries, maxcount, + f'Expected at most {maxcount} entries in playlist {test_url}, but got more') if 'playlist_duration_sum' in test_case: got_duration = sum(e['duration'] for e in res_dict['entries']) self.assertEqual( diff --git a/test/test_jsinterp.py b/test/test_jsinterp.py index 4268e890b8..43b1d0fdee 100644 --- a/test/test_jsinterp.py +++ b/test/test_jsinterp.py @@ -490,6 +490,57 @@ def test_increment_decrement(self): self._test('function f() { var a = "test--"; return a; }', 'test--') self._test('function f() { var b = 1; var a = "b--"; return a; }', 'b--') + def test_nested_function_scoping(self): + self._test(R''' + function f() { + var g = function() { + var P = 2; + return P; + }; + var P = 1; + g(); + return P; + } + ''', 1) + self._test(R''' + function f() { + var x = function() { + for (var w = 1, M = []; w < 2; w++) switch (w) { + case 1: + M.push("a"); + case 2: + M.push("b"); + } + return M + }; + var w = "c"; + var M = "d"; + var y = x(); + y.push(w); + y.push(M); + return y; + } + ''', ['a', 'b', 'c', 'd']) + self._test(R''' + function f() { + var P, Q; + var z = 100; + var g = function() { + var P, Q; P = 2; Q = 15; + z = 0; + return P+Q; + }; + P = 1; Q = 10; + var x = g(), y = 3; + return P+Q+x+y+z; + } + ''', 31) + + def test_undefined_varnames(self): + jsi = JSInterpreter('function f(){ var a; return [a, b]; }') + self._test(jsi, [JS_Undefined, JS_Undefined]) + self.assertEqual(jsi._undefined_varnames, {'b'}) + if __name__ == '__main__': unittest.main() diff --git a/test/test_networking.py b/test/test_networking.py index 2f441fced2..afdd0c7aa7 100644 --- a/test/test_networking.py +++ b/test/test_networking.py @@ -22,7 +22,6 @@ import tempfile import threading import time -import urllib.error import urllib.request import warnings import zlib @@ -223,10 +222,7 @@ def do_GET(self): if encoding == 'br' and brotli: payload = brotli.compress(payload) elif encoding == 'gzip': - buf = io.BytesIO() - with gzip.GzipFile(fileobj=buf, mode='wb') as f: - f.write(payload) - payload = buf.getvalue() + payload = gzip.compress(payload, mtime=0) elif encoding == 'deflate': payload = zlib.compress(payload) elif encoding == 'unsupported': @@ -729,6 +725,17 @@ def test_keep_header_casing(self, handler): assert 'X-test-heaDer: test' in res + def test_partial_read_then_full_read(self, handler): + with handler() as rh: + for encoding in ('', 'gzip', 'deflate'): + res = validate_and_send(rh, Request( + f'http://127.0.0.1:{self.http_port}/content-encoding', + headers={'ytdl-encoding': encoding})) + assert res.headers.get('Content-Encoding') == encoding + assert res.read(6) == b'' + assert res.read(0) == b'' + assert res.read() == b'