mirror of
				https://github.com/yt-dlp/yt-dlp.git
				synced 2025-10-26 12:10:59 +00:00 
			
		
		
		
	 adbc4ec4bb
			
		
	
	
		adbc4ec4bb
		
			
		
	
	
	
	
		
			
			* Add option `--live-from-start` to enable downloading live videos from start * Add key `is_from_start` in formats to identify formats (of live videos) that downloads from start * [dash] Create protocol `http_dash_segments_generator` that allows a function to be passed instead of fragments * [fragment] Allow multiple live dash formats to download simultaneously * [youtube] Implement fragment re-fetching for the live dash formats * [youtube] Re-extract dash manifest every 5 hours (manifest expires in 6hrs) * [postprocessor/ffmpeg] Add `FFmpegFixupDuplicateMoovPP` to fixup duplicated moov atoms Known issue: Ctrl+C doesn't work on Windows when downloading multiple formats Closes #1521 Authored by: nao20010128nao, pukkandan
		
			
				
	
	
		
			182 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			182 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import functools
 | |
| from threading import Lock
 | |
| from .utils import supports_terminal_sequences, write_string
 | |
| 
 | |
| 
 | |
| CONTROL_SEQUENCES = {
 | |
|     'DOWN': '\n',
 | |
|     'UP': '\033[A',
 | |
|     'ERASE_LINE': '\033[K',
 | |
|     'RESET': '\033[0m',
 | |
| }
 | |
| 
 | |
| 
 | |
| _COLORS = {
 | |
|     'BLACK': '0',
 | |
|     'RED': '1',
 | |
|     'GREEN': '2',
 | |
|     'YELLOW': '3',
 | |
|     'BLUE': '4',
 | |
|     'PURPLE': '5',
 | |
|     'CYAN': '6',
 | |
|     'WHITE': '7',
 | |
| }
 | |
| 
 | |
| 
 | |
| _TEXT_STYLES = {
 | |
|     'NORMAL': '0',
 | |
|     'BOLD': '1',
 | |
|     'UNDERLINED': '4',
 | |
| }
 | |
| 
 | |
| 
 | |
| def format_text(text, f):
 | |
|     '''
 | |
|     @param f    String representation of formatting to apply in the form:
 | |
|                 [style] [light] font_color [on [light] bg_color]
 | |
|                 Eg: "red", "bold green on light blue"
 | |
|     '''
 | |
|     f = f.upper()
 | |
|     tokens = f.strip().split()
 | |
| 
 | |
|     bg_color = ''
 | |
|     if 'ON' in tokens:
 | |
|         if tokens[-1] == 'ON':
 | |
|             raise SyntaxError(f'Empty background format specified in {f!r}')
 | |
|         if tokens[-1] not in _COLORS:
 | |
|             raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
 | |
|         bg_color = f'4{_COLORS[tokens.pop()]}'
 | |
|         if tokens[-1] == 'LIGHT':
 | |
|             bg_color = f'0;10{bg_color[1:]}'
 | |
|             tokens.pop()
 | |
|         if tokens[-1] != 'ON':
 | |
|             raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
 | |
|         bg_color = f'\033[{bg_color}m'
 | |
|         tokens.pop()
 | |
| 
 | |
|     if not tokens:
 | |
|         fg_color = ''
 | |
|     elif tokens[-1] not in _COLORS:
 | |
|         raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
 | |
|     else:
 | |
|         fg_color = f'3{_COLORS[tokens.pop()]}'
 | |
|         if tokens and tokens[-1] == 'LIGHT':
 | |
|             fg_color = f'9{fg_color[1:]}'
 | |
|             tokens.pop()
 | |
|         fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL'
 | |
|         fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
 | |
|         if tokens:
 | |
|             raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}')
 | |
| 
 | |
|     if fg_color or bg_color:
 | |
|         return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
 | |
|     else:
 | |
|         return text
 | |
| 
 | |
| 
 | |
| class MultilinePrinterBase:
 | |
|     def __init__(self, stream=None, lines=1):
 | |
|         self.stream = stream
 | |
|         self.maximum = lines - 1
 | |
|         self._HAVE_FULLCAP = supports_terminal_sequences(stream)
 | |
| 
 | |
|     def __enter__(self):
 | |
|         return self
 | |
| 
 | |
|     def __exit__(self, *args):
 | |
|         self.end()
 | |
| 
 | |
|     def print_at_line(self, text, pos):
 | |
|         pass
 | |
| 
 | |
|     def end(self):
 | |
|         pass
 | |
| 
 | |
|     def _add_line_number(self, text, line):
 | |
|         if self.maximum:
 | |
|             return f'{line + 1}: {text}'
 | |
|         return text
 | |
| 
 | |
|     def write(self, *text):
 | |
|         write_string(''.join(text), self.stream)
 | |
| 
 | |
| 
 | |
| class QuietMultilinePrinter(MultilinePrinterBase):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class MultilineLogger(MultilinePrinterBase):
 | |
|     def write(self, *text):
 | |
|         self.stream.debug(''.join(text))
 | |
| 
 | |
|     def print_at_line(self, text, pos):
 | |
|         # stream is the logger object, not an actual stream
 | |
|         self.write(self._add_line_number(text, pos))
 | |
| 
 | |
| 
 | |
| class BreaklineStatusPrinter(MultilinePrinterBase):
 | |
|     def print_at_line(self, text, pos):
 | |
|         self.write(self._add_line_number(text, pos), '\n')
 | |
| 
 | |
| 
 | |
| class MultilinePrinter(MultilinePrinterBase):
 | |
|     def __init__(self, stream=None, lines=1, preserve_output=True):
 | |
|         super().__init__(stream, lines)
 | |
|         self.preserve_output = preserve_output
 | |
|         self._lastline = self._lastlength = 0
 | |
|         self._movelock = Lock()
 | |
| 
 | |
|     def lock(func):
 | |
|         @functools.wraps(func)
 | |
|         def wrapper(self, *args, **kwargs):
 | |
|             with self._movelock:
 | |
|                 return func(self, *args, **kwargs)
 | |
|         return wrapper
 | |
| 
 | |
|     def _move_cursor(self, dest):
 | |
|         current = min(self._lastline, self.maximum)
 | |
|         yield '\r'
 | |
|         distance = dest - current
 | |
|         if distance < 0:
 | |
|             yield CONTROL_SEQUENCES['UP'] * -distance
 | |
|         elif distance > 0:
 | |
|             yield CONTROL_SEQUENCES['DOWN'] * distance
 | |
|         self._lastline = dest
 | |
| 
 | |
|     @lock
 | |
|     def print_at_line(self, text, pos):
 | |
|         if self._HAVE_FULLCAP:
 | |
|             self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
 | |
|             return
 | |
| 
 | |
|         text = self._add_line_number(text, pos)
 | |
|         textlen = len(text)
 | |
|         if self._lastline == pos:
 | |
|             # move cursor at the start of progress when writing to same line
 | |
|             prefix = '\r'
 | |
|             if self._lastlength > textlen:
 | |
|                 text += ' ' * (self._lastlength - textlen)
 | |
|             self._lastlength = textlen
 | |
|         else:
 | |
|             # otherwise, break the line
 | |
|             prefix = '\n'
 | |
|             self._lastlength = textlen
 | |
|         self.write(prefix, text)
 | |
|         self._lastline = pos
 | |
| 
 | |
|     @lock
 | |
|     def end(self):
 | |
|         # move cursor to the end of the last line, and write line break
 | |
|         # so that other to_screen calls can precede
 | |
|         text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else []
 | |
|         if self.preserve_output:
 | |
|             self.write(*text, '\n')
 | |
|             return
 | |
| 
 | |
|         if self._HAVE_FULLCAP:
 | |
|             self.write(
 | |
|                 *text, CONTROL_SEQUENCES['ERASE_LINE'],
 | |
|                 f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
 | |
|         else:
 | |
|             self.write(*text, ' ' * self._lastlength)
 |