mirror of
				https://github.com/yt-dlp/yt-dlp.git
				synced 2025-10-25 11:40:59 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			399 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			399 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | ||
| A partial parser for WebVTT segments. Interprets enough of the WebVTT stream
 | ||
| to be able to assemble a single stand-alone subtitle file, suitably adjusting
 | ||
| timestamps on the way, while everything else is passed through unmodified.
 | ||
| 
 | ||
| Regular expressions based on the W3C WebVTT specification
 | ||
| <https://www.w3.org/TR/webvtt1/>. The X-TIMESTAMP-MAP extension is described
 | ||
| in RFC 8216 §3.5 <https://tools.ietf.org/html/rfc8216#section-3.5>.
 | ||
| """
 | ||
| 
 | ||
| import io
 | ||
| import re
 | ||
| 
 | ||
| from .utils import int_or_none, timetuple_from_msec
 | ||
| 
 | ||
| 
 | ||
| class _MatchParser:
 | ||
|     """
 | ||
|     An object that maintains the current parsing position and allows
 | ||
|     conveniently advancing it as syntax elements are successfully parsed.
 | ||
|     """
 | ||
| 
 | ||
|     def __init__(self, string):
 | ||
|         self._data = string
 | ||
|         self._pos = 0
 | ||
| 
 | ||
|     def match(self, r):
 | ||
|         if isinstance(r, re.Pattern):
 | ||
|             return r.match(self._data, self._pos)
 | ||
|         if isinstance(r, str):
 | ||
|             if self._data.startswith(r, self._pos):
 | ||
|                 return len(r)
 | ||
|             return None
 | ||
|         raise ValueError(r)
 | ||
| 
 | ||
|     def advance(self, by):
 | ||
|         if by is None:
 | ||
|             amt = 0
 | ||
|         elif isinstance(by, re.Match):
 | ||
|             amt = len(by.group(0))
 | ||
|         elif isinstance(by, str):
 | ||
|             amt = len(by)
 | ||
|         elif isinstance(by, int):
 | ||
|             amt = by
 | ||
|         else:
 | ||
|             raise ValueError(by)
 | ||
|         self._pos += amt
 | ||
|         return by
 | ||
| 
 | ||
|     def consume(self, r):
 | ||
|         return self.advance(self.match(r))
 | ||
| 
 | ||
|     def child(self):
 | ||
|         return _MatchChildParser(self)
 | ||
| 
 | ||
| 
 | ||
| class _MatchChildParser(_MatchParser):
 | ||
|     """
 | ||
|     A child parser state, which advances through the same data as
 | ||
|     its parent, but has an independent position. This is useful when
 | ||
|     advancing through syntax elements we might later want to backtrack
 | ||
|     from.
 | ||
|     """
 | ||
| 
 | ||
|     def __init__(self, parent):
 | ||
|         super().__init__(parent._data)
 | ||
|         self.__parent = parent
 | ||
|         self._pos = parent._pos
 | ||
| 
 | ||
|     def commit(self):
 | ||
|         """
 | ||
|         Advance the parent state to the current position of this child state.
 | ||
|         """
 | ||
|         self.__parent._pos = self._pos
 | ||
|         return self.__parent
 | ||
| 
 | ||
| 
 | ||
| class ParseError(Exception):
 | ||
|     def __init__(self, parser):
 | ||
|         super().__init__("Parse error at position %u (near %r)" % (
 | ||
|             parser._pos, parser._data[parser._pos:parser._pos + 20]
 | ||
|         ))
 | ||
| 
 | ||
| 
 | ||
| # While the specification <https://www.w3.org/TR/webvtt1/#webvtt-timestamp>
 | ||
| # prescribes that hours must be *2 or more* digits, timestamps with a single
 | ||
| # digit for the hour part has been seen in the wild.
 | ||
| # See https://github.com/yt-dlp/yt-dlp/issues/921
 | ||
| _REGEX_TS = re.compile(r'''(?x)
 | ||
|     (?:([0-9]{1,}):)?
 | ||
|     ([0-9]{2}):
 | ||
|     ([0-9]{2})\.
 | ||
|     ([0-9]{3})?
 | ||
| ''')
 | ||
| _REGEX_EOF = re.compile(r'\Z')
 | ||
| _REGEX_NL = re.compile(r'(?:\r\n|[\r\n])')
 | ||
| _REGEX_BLANK = re.compile(r'(?:\r\n|[\r\n])+')
 | ||
| 
 | ||
| 
 | ||
| def _parse_ts(ts):
 | ||
|     """
 | ||
|     Convert a parsed WebVTT timestamp (a re.Match obtained from _REGEX_TS)
 | ||
|     into an MPEG PES timestamp: a tick counter at 90 kHz resolution.
 | ||
|     """
 | ||
|     return 90 * sum(
 | ||
|         int(part or 0) * mult for part, mult in zip(ts.groups(), (3600_000, 60_000, 1000, 1)))
 | ||
| 
 | ||
| 
 | ||
| def _format_ts(ts):
 | ||
|     """
 | ||
|     Convert an MPEG PES timestamp into a WebVTT timestamp.
 | ||
|     This will lose sub-millisecond precision.
 | ||
|     """
 | ||
|     return '%02u:%02u:%02u.%03u' % timetuple_from_msec(int((ts + 45) // 90))
 | ||
| 
 | ||
| 
 | ||
| class Block:
 | ||
|     """
 | ||
|     An abstract WebVTT block.
 | ||
|     """
 | ||
| 
 | ||
|     def __init__(self, **kwargs):
 | ||
|         for key, val in kwargs.items():
 | ||
|             setattr(self, key, val)
 | ||
| 
 | ||
|     @classmethod
 | ||
|     def parse(cls, parser):
 | ||
|         m = parser.match(cls._REGEX)
 | ||
|         if not m:
 | ||
|             return None
 | ||
|         parser.advance(m)
 | ||
|         return cls(raw=m.group(0))
 | ||
| 
 | ||
|     def write_into(self, stream):
 | ||
|         stream.write(self.raw)
 | ||
| 
 | ||
| 
 | ||
| class HeaderBlock(Block):
 | ||
|     """
 | ||
|     A WebVTT block that may only appear in the header part of the file,
 | ||
|     i.e. before any cue blocks.
 | ||
|     """
 | ||
| 
 | ||
|     pass
 | ||
| 
 | ||
| 
 | ||
| class Magic(HeaderBlock):
 | ||
|     _REGEX = re.compile(r'\ufeff?WEBVTT([ \t][^\r\n]*)?(?:\r\n|[\r\n])')
 | ||
| 
 | ||
|     # XXX: The X-TIMESTAMP-MAP extension is described in RFC 8216 §3.5
 | ||
|     # <https://tools.ietf.org/html/rfc8216#section-3.5>, but the RFC
 | ||
|     # doesn’t specify the exact grammar nor where in the WebVTT
 | ||
|     # syntax it should be placed; the below has been devised based
 | ||
|     # on usage in the wild
 | ||
|     #
 | ||
|     # And strictly speaking, the presence of this extension violates
 | ||
|     # the W3C WebVTT spec. Oh well.
 | ||
| 
 | ||
|     _REGEX_TSMAP = re.compile(r'X-TIMESTAMP-MAP=')
 | ||
|     _REGEX_TSMAP_LOCAL = re.compile(r'LOCAL:')
 | ||
|     _REGEX_TSMAP_MPEGTS = re.compile(r'MPEGTS:([0-9]+)')
 | ||
|     _REGEX_TSMAP_SEP = re.compile(r'[ \t]*,[ \t]*')
 | ||
| 
 | ||
|     # This was removed from the spec in the 2017 revision;
 | ||
|     # the last spec draft to describe this syntax element is
 | ||
|     # <https://www.w3.org/TR/2015/WD-webvtt1-20151208/#webvtt-metadata-header>.
 | ||
|     # Nevertheless, YouTube keeps serving those
 | ||
|     _REGEX_META = re.compile(r'(?:(?!-->)[^\r\n])+:(?:(?!-->)[^\r\n])+(?:\r\n|[\r\n])')
 | ||
| 
 | ||
|     @classmethod
 | ||
|     def __parse_tsmap(cls, parser):
 | ||
|         parser = parser.child()
 | ||
| 
 | ||
|         while True:
 | ||
|             m = parser.consume(cls._REGEX_TSMAP_LOCAL)
 | ||
|             if m:
 | ||
|                 m = parser.consume(_REGEX_TS)
 | ||
|                 if m is None:
 | ||
|                     raise ParseError(parser)
 | ||
|                 local = _parse_ts(m)
 | ||
|                 if local is None:
 | ||
|                     raise ParseError(parser)
 | ||
|             else:
 | ||
|                 m = parser.consume(cls._REGEX_TSMAP_MPEGTS)
 | ||
|                 if m:
 | ||
|                     mpegts = int_or_none(m.group(1))
 | ||
|                     if mpegts is None:
 | ||
|                         raise ParseError(parser)
 | ||
|                 else:
 | ||
|                     raise ParseError(parser)
 | ||
|             if parser.consume(cls._REGEX_TSMAP_SEP):
 | ||
|                 continue
 | ||
|             if parser.consume(_REGEX_NL):
 | ||
|                 break
 | ||
|             raise ParseError(parser)
 | ||
| 
 | ||
|         parser.commit()
 | ||
|         return local, mpegts
 | ||
| 
 | ||
|     @classmethod
 | ||
|     def parse(cls, parser):
 | ||
|         parser = parser.child()
 | ||
| 
 | ||
|         m = parser.consume(cls._REGEX)
 | ||
|         if not m:
 | ||
|             raise ParseError(parser)
 | ||
| 
 | ||
|         extra = m.group(1)
 | ||
|         local, mpegts, meta = None, None, ''
 | ||
|         while not parser.consume(_REGEX_NL):
 | ||
|             if parser.consume(cls._REGEX_TSMAP):
 | ||
|                 local, mpegts = cls.__parse_tsmap(parser)
 | ||
|                 continue
 | ||
|             m = parser.consume(cls._REGEX_META)
 | ||
|             if m:
 | ||
|                 meta += m.group(0)
 | ||
|                 continue
 | ||
|             raise ParseError(parser)
 | ||
|         parser.commit()
 | ||
|         return cls(extra=extra, mpegts=mpegts, local=local, meta=meta)
 | ||
| 
 | ||
|     def write_into(self, stream):
 | ||
|         stream.write('WEBVTT')
 | ||
|         if self.extra is not None:
 | ||
|             stream.write(self.extra)
 | ||
|         stream.write('\n')
 | ||
|         if self.local or self.mpegts:
 | ||
|             stream.write('X-TIMESTAMP-MAP=LOCAL:')
 | ||
|             stream.write(_format_ts(self.local if self.local is not None else 0))
 | ||
|             stream.write(',MPEGTS:')
 | ||
|             stream.write(str(self.mpegts if self.mpegts is not None else 0))
 | ||
|             stream.write('\n')
 | ||
|         if self.meta:
 | ||
|             stream.write(self.meta)
 | ||
|         stream.write('\n')
 | ||
| 
 | ||
| 
 | ||
| class StyleBlock(HeaderBlock):
 | ||
|     _REGEX = re.compile(r'''(?x)
 | ||
|         STYLE[\ \t]*(?:\r\n|[\r\n])
 | ||
|         ((?:(?!-->)[^\r\n])+(?:\r\n|[\r\n]))*
 | ||
|         (?:\r\n|[\r\n])
 | ||
|     ''')
 | ||
| 
 | ||
| 
 | ||
| class RegionBlock(HeaderBlock):
 | ||
|     _REGEX = re.compile(r'''(?x)
 | ||
|         REGION[\ \t]*
 | ||
|         ((?:(?!-->)[^\r\n])+(?:\r\n|[\r\n]))*
 | ||
|         (?:\r\n|[\r\n])
 | ||
|     ''')
 | ||
| 
 | ||
| 
 | ||
| class CommentBlock(Block):
 | ||
|     _REGEX = re.compile(r'''(?x)
 | ||
|         NOTE(?:\r\n|[\ \t\r\n])
 | ||
|         ((?:(?!-->)[^\r\n])+(?:\r\n|[\r\n]))*
 | ||
|         (?:\r\n|[\r\n])
 | ||
|     ''')
 | ||
| 
 | ||
| 
 | ||
| class CueBlock(Block):
 | ||
|     """
 | ||
|     A cue block. The payload is not interpreted.
 | ||
|     """
 | ||
| 
 | ||
|     _REGEX_ID = re.compile(r'((?:(?!-->)[^\r\n])+)(?:\r\n|[\r\n])')
 | ||
|     _REGEX_ARROW = re.compile(r'[ \t]+-->[ \t]+')
 | ||
|     _REGEX_SETTINGS = re.compile(r'[ \t]+((?:(?!-->)[^\r\n])+)')
 | ||
|     _REGEX_PAYLOAD = re.compile(r'[^\r\n]+(?:\r\n|[\r\n])?')
 | ||
| 
 | ||
|     @classmethod
 | ||
|     def parse(cls, parser):
 | ||
|         parser = parser.child()
 | ||
| 
 | ||
|         id = None
 | ||
|         m = parser.consume(cls._REGEX_ID)
 | ||
|         if m:
 | ||
|             id = m.group(1)
 | ||
| 
 | ||
|         m0 = parser.consume(_REGEX_TS)
 | ||
|         if not m0:
 | ||
|             return None
 | ||
|         if not parser.consume(cls._REGEX_ARROW):
 | ||
|             return None
 | ||
|         m1 = parser.consume(_REGEX_TS)
 | ||
|         if not m1:
 | ||
|             return None
 | ||
|         m2 = parser.consume(cls._REGEX_SETTINGS)
 | ||
|         if not parser.consume(_REGEX_NL):
 | ||
|             return None
 | ||
| 
 | ||
|         start = _parse_ts(m0)
 | ||
|         end = _parse_ts(m1)
 | ||
|         settings = m2.group(1) if m2 is not None else None
 | ||
| 
 | ||
|         text = io.StringIO()
 | ||
|         while True:
 | ||
|             m = parser.consume(cls._REGEX_PAYLOAD)
 | ||
|             if not m:
 | ||
|                 break
 | ||
|             text.write(m.group(0))
 | ||
| 
 | ||
|         parser.commit()
 | ||
|         return cls(
 | ||
|             id=id,
 | ||
|             start=start, end=end, settings=settings,
 | ||
|             text=text.getvalue()
 | ||
|         )
 | ||
| 
 | ||
|     def write_into(self, stream):
 | ||
|         if self.id is not None:
 | ||
|             stream.write(self.id)
 | ||
|             stream.write('\n')
 | ||
|         stream.write(_format_ts(self.start))
 | ||
|         stream.write(' --> ')
 | ||
|         stream.write(_format_ts(self.end))
 | ||
|         if self.settings is not None:
 | ||
|             stream.write(' ')
 | ||
|             stream.write(self.settings)
 | ||
|         stream.write('\n')
 | ||
|         stream.write(self.text)
 | ||
|         stream.write('\n')
 | ||
| 
 | ||
|     @property
 | ||
|     def as_json(self):
 | ||
|         return {
 | ||
|             'id': self.id,
 | ||
|             'start': self.start,
 | ||
|             'end': self.end,
 | ||
|             'text': self.text,
 | ||
|             'settings': self.settings,
 | ||
|         }
 | ||
| 
 | ||
|     def __eq__(self, other):
 | ||
|         return self.as_json == other.as_json
 | ||
| 
 | ||
|     @classmethod
 | ||
|     def from_json(cls, json):
 | ||
|         return cls(
 | ||
|             id=json['id'],
 | ||
|             start=json['start'],
 | ||
|             end=json['end'],
 | ||
|             text=json['text'],
 | ||
|             settings=json['settings']
 | ||
|         )
 | ||
| 
 | ||
|     def hinges(self, other):
 | ||
|         if self.text != other.text:
 | ||
|             return False
 | ||
|         if self.settings != other.settings:
 | ||
|             return False
 | ||
|         return self.start <= self.end == other.start <= other.end
 | ||
| 
 | ||
| 
 | ||
| def parse_fragment(frag_content):
 | ||
|     """
 | ||
|     A generator that yields (partially) parsed WebVTT blocks when given
 | ||
|     a bytes object containing the raw contents of a WebVTT file.
 | ||
|     """
 | ||
| 
 | ||
|     parser = _MatchParser(frag_content.decode())
 | ||
| 
 | ||
|     yield Magic.parse(parser)
 | ||
| 
 | ||
|     while not parser.match(_REGEX_EOF):
 | ||
|         if parser.consume(_REGEX_BLANK):
 | ||
|             continue
 | ||
| 
 | ||
|         block = RegionBlock.parse(parser)
 | ||
|         if block:
 | ||
|             yield block
 | ||
|             continue
 | ||
|         block = StyleBlock.parse(parser)
 | ||
|         if block:
 | ||
|             yield block
 | ||
|             continue
 | ||
|         block = CommentBlock.parse(parser)
 | ||
|         if block:
 | ||
|             yield block  # XXX: or skip
 | ||
|             continue
 | ||
| 
 | ||
|         break
 | ||
| 
 | ||
|     while not parser.match(_REGEX_EOF):
 | ||
|         if parser.consume(_REGEX_BLANK):
 | ||
|             continue
 | ||
| 
 | ||
|         block = CommentBlock.parse(parser)
 | ||
|         if block:
 | ||
|             yield block  # XXX: or skip
 | ||
|             continue
 | ||
|         block = CueBlock.parse(parser)
 | ||
|         if block:
 | ||
|             yield block
 | ||
|             continue
 | ||
| 
 | ||
|         raise ParseError(parser)
 | 
