mirror of
				https://github.com/yt-dlp/yt-dlp.git
				synced 2025-10-30 22:25:19 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			116 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			116 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| # coding: utf-8
 | |
| from __future__ import unicode_literals
 | |
| 
 | |
| # Allow direct execution
 | |
| import os
 | |
| import re
 | |
| import sys
 | |
| import unittest
 | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 | |
| 
 | |
| from test.helper import http_server_port, try_rm
 | |
| from youtube_dl import YoutubeDL
 | |
| from youtube_dl.compat import compat_http_server
 | |
| from youtube_dl.downloader.http import HttpFD
 | |
| from youtube_dl.utils import encodeFilename
 | |
| import threading
 | |
| 
 | |
| TEST_DIR = os.path.dirname(os.path.abspath(__file__))
 | |
| 
 | |
| 
 | |
| TEST_SIZE = 10 * 1024
 | |
| 
 | |
| 
 | |
| class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
 | |
|     def log_message(self, format, *args):
 | |
|         pass
 | |
| 
 | |
|     def send_content_range(self, total=None):
 | |
|         range_header = self.headers.get('Range')
 | |
|         start = end = None
 | |
|         if range_header:
 | |
|             mobj = re.search(r'^bytes=(\d+)-(\d+)', range_header)
 | |
|             if mobj:
 | |
|                 start = int(mobj.group(1))
 | |
|                 end = int(mobj.group(2))
 | |
|         valid_range = start is not None and end is not None
 | |
|         if valid_range:
 | |
|             content_range = 'bytes %d-%d' % (start, end)
 | |
|             if total:
 | |
|                 content_range += '/%d' % total
 | |
|             self.send_header('Content-Range', content_range)
 | |
|         return (end - start + 1) if valid_range else total
 | |
| 
 | |
|     def serve(self, range=True, content_length=True):
 | |
|         self.send_response(200)
 | |
|         self.send_header('Content-Type', 'video/mp4')
 | |
|         size = TEST_SIZE
 | |
|         if range:
 | |
|             size = self.send_content_range(TEST_SIZE)
 | |
|         if content_length:
 | |
|             self.send_header('Content-Length', size)
 | |
|         self.end_headers()
 | |
|         self.wfile.write(b'#' * size)
 | |
| 
 | |
|     def do_GET(self):
 | |
|         if self.path == '/regular':
 | |
|             self.serve()
 | |
|         elif self.path == '/no-content-length':
 | |
|             self.serve(content_length=False)
 | |
|         elif self.path == '/no-range':
 | |
|             self.serve(range=False)
 | |
|         elif self.path == '/no-range-no-content-length':
 | |
|             self.serve(range=False, content_length=False)
 | |
|         else:
 | |
|             assert False
 | |
| 
 | |
| 
 | |
| class FakeLogger(object):
 | |
|     def debug(self, msg):
 | |
|         pass
 | |
| 
 | |
|     def warning(self, msg):
 | |
|         pass
 | |
| 
 | |
|     def error(self, msg):
 | |
|         pass
 | |
| 
 | |
| 
 | |
| class TestHttpFD(unittest.TestCase):
 | |
|     def setUp(self):
 | |
|         self.httpd = compat_http_server.HTTPServer(
 | |
|             ('127.0.0.1', 0), HTTPTestRequestHandler)
 | |
|         self.port = http_server_port(self.httpd)
 | |
|         self.server_thread = threading.Thread(target=self.httpd.serve_forever)
 | |
|         self.server_thread.daemon = True
 | |
|         self.server_thread.start()
 | |
| 
 | |
|     def download(self, params, ep):
 | |
|         params['logger'] = FakeLogger()
 | |
|         ydl = YoutubeDL(params)
 | |
|         downloader = HttpFD(ydl, params)
 | |
|         filename = 'testfile.mp4'
 | |
|         try_rm(encodeFilename(filename))
 | |
|         self.assertTrue(downloader.real_download(filename, {
 | |
|             'url': 'http://127.0.0.1:%d/%s' % (self.port, ep),
 | |
|         }))
 | |
|         self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE)
 | |
|         try_rm(encodeFilename(filename))
 | |
| 
 | |
|     def download_all(self, params):
 | |
|         for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'):
 | |
|             self.download(params, ep)
 | |
| 
 | |
|     def test_regular(self):
 | |
|         self.download_all({})
 | |
| 
 | |
|     def test_chunked(self):
 | |
|         self.download_all({
 | |
|             'http_chunk_size': 1000,
 | |
|         })
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     unittest.main()
 | 
