mirror of
				https://github.com/yt-dlp/yt-dlp.git
				synced 2025-10-31 06:35:12 +00:00 
			
		
		
		
	[downloader/aria2c] Native progress for aria2c via RPC (#3724)
Authored by: Lesmiscore, pukkandan Closes #2038
This commit is contained in:
		| @@ -1,9 +1,11 @@ | ||||
| import enum | ||||
| import json | ||||
| import os.path | ||||
| import re | ||||
| import subprocess | ||||
| import sys | ||||
| import time | ||||
| import uuid | ||||
| 
 | ||||
| from .fragment import FragmentFD | ||||
| from ..compat import functools | ||||
| @@ -20,8 +22,10 @@ from ..utils import ( | ||||
|     determine_ext, | ||||
|     encodeArgument, | ||||
|     encodeFilename, | ||||
|     find_available_port, | ||||
|     handle_youtubedl_headers, | ||||
|     remove_end, | ||||
|     sanitized_Request, | ||||
|     traverse_obj, | ||||
| ) | ||||
| 
 | ||||
| @@ -60,7 +64,6 @@ class ExternalFD(FragmentFD): | ||||
|             } | ||||
|             if filename != '-': | ||||
|                 fsize = os.path.getsize(encodeFilename(tmpfilename)) | ||||
|                 self.to_screen(f'\r[{self.get_basename()}] Downloaded {fsize} bytes') | ||||
|                 self.try_rename(tmpfilename, filename) | ||||
|                 status.update({ | ||||
|                     'downloaded_bytes': fsize, | ||||
| @@ -129,8 +132,7 @@ class ExternalFD(FragmentFD): | ||||
|         self._debug_cmd(cmd) | ||||
| 
 | ||||
|         if 'fragments' not in info_dict: | ||||
|             _, stderr, returncode = Popen.run( | ||||
|                 cmd, text=True, stderr=subprocess.PIPE if self._CAPTURE_STDERR else None) | ||||
|             _, stderr, returncode = self._call_process(cmd, info_dict) | ||||
|             if returncode and stderr: | ||||
|                 self.to_stderr(stderr) | ||||
|             return returncode | ||||
| @@ -140,7 +142,7 @@ class ExternalFD(FragmentFD): | ||||
|         retry_manager = RetryManager(self.params.get('fragment_retries'), self.report_retry, | ||||
|                                      frag_index=None, fatal=not skip_unavailable_fragments) | ||||
|         for retry in retry_manager: | ||||
|             _, stderr, returncode = Popen.run(cmd, text=True, stderr=subprocess.PIPE) | ||||
|             _, stderr, returncode = self._call_process(cmd, info_dict) | ||||
|             if not returncode: | ||||
|                 break | ||||
|             # TODO: Decide whether to retry based on error code | ||||
| @@ -172,6 +174,9 @@ class ExternalFD(FragmentFD): | ||||
|         self.try_remove(encodeFilename('%s.frag.urls' % tmpfilename)) | ||||
|         return 0 | ||||
| 
 | ||||
|     def _call_process(self, cmd, info_dict): | ||||
|         return Popen.run(cmd, text=True, stderr=subprocess.PIPE) | ||||
| 
 | ||||
| 
 | ||||
| class CurlFD(ExternalFD): | ||||
|     AVAILABLE_OPT = '-V' | ||||
| @@ -256,6 +261,14 @@ class Aria2cFD(ExternalFD): | ||||
|     def _aria2c_filename(fn): | ||||
|         return fn if os.path.isabs(fn) else f'.{os.path.sep}{fn}' | ||||
| 
 | ||||
|     def _call_downloader(self, tmpfilename, info_dict): | ||||
|         if 'no-external-downloader-progress' not in self.params.get('compat_opts', []): | ||||
|             info_dict['__rpc'] = { | ||||
|                 'port': find_available_port() or 19190, | ||||
|                 'secret': str(uuid.uuid4()), | ||||
|             } | ||||
|         return super()._call_downloader(tmpfilename, info_dict) | ||||
| 
 | ||||
|     def _make_cmd(self, tmpfilename, info_dict): | ||||
|         cmd = [self.exe, '-c', | ||||
|                '--console-log-level=warn', '--summary-interval=0', '--download-result=hide', | ||||
| @@ -276,6 +289,12 @@ class Aria2cFD(ExternalFD): | ||||
|         cmd += self._bool_option('--show-console-readout', 'noprogress', 'false', 'true', '=') | ||||
|         cmd += self._configuration_args() | ||||
| 
 | ||||
|         if '__rpc' in info_dict: | ||||
|             cmd += [ | ||||
|                 '--enable-rpc', | ||||
|                 f'--rpc-listen-port={info_dict["__rpc"]["port"]}', | ||||
|                 f'--rpc-secret={info_dict["__rpc"]["secret"]}'] | ||||
| 
 | ||||
|         # aria2c strips out spaces from the beginning/end of filenames and paths. | ||||
|         # We work around this issue by adding a "./" to the beginning of the | ||||
|         # filename and relative path, and adding a "/" at the end of the path. | ||||
| @@ -304,6 +323,88 @@ class Aria2cFD(ExternalFD): | ||||
|             cmd += ['--', info_dict['url']] | ||||
|         return cmd | ||||
| 
 | ||||
|     def aria2c_rpc(self, rpc_port, rpc_secret, method, params=()): | ||||
|         # Does not actually need to be UUID, just unique | ||||
|         sanitycheck = str(uuid.uuid4()) | ||||
|         d = json.dumps({ | ||||
|             'jsonrpc': '2.0', | ||||
|             'id': sanitycheck, | ||||
|             'method': method, | ||||
|             'params': [f'token:{rpc_secret}', *params], | ||||
|         }).encode('utf-8') | ||||
|         request = sanitized_Request( | ||||
|             f'http://localhost:{rpc_port}/jsonrpc', | ||||
|             data=d, headers={ | ||||
|                 'Content-Type': 'application/json', | ||||
|                 'Content-Length': f'{len(d)}', | ||||
|                 'Ytdl-request-proxy': '__noproxy__', | ||||
|             }) | ||||
|         with self.ydl.urlopen(request) as r: | ||||
|             resp = json.load(r) | ||||
|         assert resp.get('id') == sanitycheck, 'Something went wrong with RPC server' | ||||
|         return resp['result'] | ||||
| 
 | ||||
|     def _call_process(self, cmd, info_dict): | ||||
|         if '__rpc' not in info_dict: | ||||
|             return super()._call_process(cmd, info_dict) | ||||
| 
 | ||||
|         send_rpc = functools.partial(self.aria2c_rpc, info_dict['__rpc']['port'], info_dict['__rpc']['secret']) | ||||
|         started = time.time() | ||||
| 
 | ||||
|         fragmented = 'fragments' in info_dict | ||||
|         frag_count = len(info_dict['fragments']) if fragmented else 1 | ||||
|         status = { | ||||
|             'filename': info_dict.get('_filename'), | ||||
|             'status': 'downloading', | ||||
|             'elapsed': 0, | ||||
|             'downloaded_bytes': 0, | ||||
|             'fragment_count': frag_count if fragmented else None, | ||||
|             'fragment_index': 0 if fragmented else None, | ||||
|         } | ||||
|         self._hook_progress(status, info_dict) | ||||
| 
 | ||||
|         def get_stat(key, *obj, average=False): | ||||
|             val = tuple(filter(None, map(float, traverse_obj(obj, (..., ..., key))))) or [0] | ||||
|             return sum(val) / (len(val) if average else 1) | ||||
| 
 | ||||
|         with Popen(cmd, text=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) as p: | ||||
|             # Add a small sleep so that RPC client can receive response, | ||||
|             # or the connection stalls infinitely | ||||
|             time.sleep(0.2) | ||||
|             retval = p.poll() | ||||
|             while retval is None: | ||||
|                 # We don't use tellStatus as we won't know the GID without reading stdout | ||||
|                 # Ref: https://aria2.github.io/manual/en/html/aria2c.html#aria2.tellActive | ||||
|                 active = send_rpc('aria2.tellActive') | ||||
|                 completed = send_rpc('aria2.tellStopped', [0, frag_count]) | ||||
| 
 | ||||
|                 downloaded = get_stat('totalLength', completed) + get_stat('completedLength', active) | ||||
|                 speed = get_stat('downloadSpeed', active) | ||||
|                 total = frag_count * get_stat('totalLength', active, completed, average=True) | ||||
|                 if total < downloaded: | ||||
|                     total = None | ||||
| 
 | ||||
|                 status.update({ | ||||
|                     'downloaded_bytes': int(downloaded), | ||||
|                     'speed': speed, | ||||
|                     'total_bytes': None if fragmented else total, | ||||
|                     'total_bytes_estimate': total, | ||||
|                     'eta': (total - downloaded) / (speed or 1), | ||||
|                     'fragment_index': min(frag_count, len(completed) + 1) if fragmented else None, | ||||
|                     'elapsed': time.time() - started | ||||
|                 }) | ||||
|                 self._hook_progress(status, info_dict) | ||||
| 
 | ||||
|                 if not active and len(completed) >= frag_count: | ||||
|                     send_rpc('aria2.shutdown') | ||||
|                     retval = p.wait() | ||||
|                     break | ||||
| 
 | ||||
|                 time.sleep(0.1) | ||||
|                 retval = p.poll() | ||||
| 
 | ||||
|             return '', p.stderr.read(), retval | ||||
| 
 | ||||
| 
 | ||||
| class HttpieFD(ExternalFD): | ||||
|     AVAILABLE_OPT = '--version' | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Lesmiscore
					Lesmiscore