mirror of
				https://github.com/yt-dlp/yt-dlp.git
				synced 2025-11-04 08:35:12 +00:00 
			
		
		
		
	Authored by: seproDev Reviewed-by: bashonly <88596187+bashonly@users.noreply.github.com> Reviewed-by: Simon Sawicki <contact@grub4k.xyz>
		
			
				
	
	
		
			399 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			399 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""
 | 
						|
A partial parser for WebVTT segments. Interprets enough of the WebVTT stream
 | 
						|
to be able to assemble a single stand-alone subtitle file, suitably adjusting
 | 
						|
timestamps on the way, while everything else is passed through unmodified.
 | 
						|
 | 
						|
Regular expressions based on the W3C WebVTT specification
 | 
						|
<https://www.w3.org/TR/webvtt1/>. The X-TIMESTAMP-MAP extension is described
 | 
						|
in RFC 8216 §3.5 <https://tools.ietf.org/html/rfc8216#section-3.5>.
 | 
						|
"""
 | 
						|
 | 
						|
import io
 | 
						|
import re
 | 
						|
 | 
						|
from .utils import int_or_none, timetuple_from_msec
 | 
						|
 | 
						|
 | 
						|
class _MatchParser:
 | 
						|
    """
 | 
						|
    An object that maintains the current parsing position and allows
 | 
						|
    conveniently advancing it as syntax elements are successfully parsed.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, string):
 | 
						|
        self._data = string
 | 
						|
        self._pos = 0
 | 
						|
 | 
						|
    def match(self, r):
 | 
						|
        if isinstance(r, re.Pattern):
 | 
						|
            return r.match(self._data, self._pos)
 | 
						|
        if isinstance(r, str):
 | 
						|
            if self._data.startswith(r, self._pos):
 | 
						|
                return len(r)
 | 
						|
            return None
 | 
						|
        raise ValueError(r)
 | 
						|
 | 
						|
    def advance(self, by):
 | 
						|
        if by is None:
 | 
						|
            amt = 0
 | 
						|
        elif isinstance(by, re.Match):
 | 
						|
            amt = len(by.group(0))
 | 
						|
        elif isinstance(by, str):
 | 
						|
            amt = len(by)
 | 
						|
        elif isinstance(by, int):
 | 
						|
            amt = by
 | 
						|
        else:
 | 
						|
            raise ValueError(by)
 | 
						|
        self._pos += amt
 | 
						|
        return by
 | 
						|
 | 
						|
    def consume(self, r):
 | 
						|
        return self.advance(self.match(r))
 | 
						|
 | 
						|
    def child(self):
 | 
						|
        return _MatchChildParser(self)
 | 
						|
 | 
						|
 | 
						|
class _MatchChildParser(_MatchParser):
 | 
						|
    """
 | 
						|
    A child parser state, which advances through the same data as
 | 
						|
    its parent, but has an independent position. This is useful when
 | 
						|
    advancing through syntax elements we might later want to backtrack
 | 
						|
    from.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, parent):
 | 
						|
        super().__init__(parent._data)
 | 
						|
        self.__parent = parent
 | 
						|
        self._pos = parent._pos
 | 
						|
 | 
						|
    def commit(self):
 | 
						|
        """
 | 
						|
        Advance the parent state to the current position of this child state.
 | 
						|
        """
 | 
						|
        self.__parent._pos = self._pos
 | 
						|
        return self.__parent
 | 
						|
 | 
						|
 | 
						|
class ParseError(Exception):
 | 
						|
    def __init__(self, parser):
 | 
						|
        data = parser._data[parser._pos:parser._pos + 100]
 | 
						|
        super().__init__(f'Parse error at position {parser._pos} (near {data!r})')
 | 
						|
 | 
						|
 | 
						|
# While the specification <https://www.w3.org/TR/webvtt1/#webvtt-timestamp>
 | 
						|
# prescribes that hours must be *2 or more* digits, timestamps with a single
 | 
						|
# digit for the hour part has been seen in the wild.
 | 
						|
# See https://github.com/yt-dlp/yt-dlp/issues/921
 | 
						|
_REGEX_TS = re.compile(r'''(?x)
 | 
						|
    (?:([0-9]{1,}):)?
 | 
						|
    ([0-9]{2}):
 | 
						|
    ([0-9]{2})\.
 | 
						|
    ([0-9]{3})?
 | 
						|
''')
 | 
						|
_REGEX_EOF = re.compile(r'\Z')
 | 
						|
_REGEX_NL = re.compile(r'(?:\r\n|[\r\n]|$)')
 | 
						|
_REGEX_BLANK = re.compile(r'(?:\r\n|[\r\n])+')
 | 
						|
_REGEX_OPTIONAL_WHITESPACE = re.compile(r'[ \t]*')
 | 
						|
 | 
						|
 | 
						|
def _parse_ts(ts):
 | 
						|
    """
 | 
						|
    Convert a parsed WebVTT timestamp (a re.Match obtained from _REGEX_TS)
 | 
						|
    into an MPEG PES timestamp: a tick counter at 90 kHz resolution.
 | 
						|
    """
 | 
						|
    return 90 * sum(
 | 
						|
        int(part or 0) * mult for part, mult in zip(ts.groups(), (3600_000, 60_000, 1000, 1)))
 | 
						|
 | 
						|
 | 
						|
def _format_ts(ts):
 | 
						|
    """
 | 
						|
    Convert an MPEG PES timestamp into a WebVTT timestamp.
 | 
						|
    This will lose sub-millisecond precision.
 | 
						|
    """
 | 
						|
    return '%02u:%02u:%02u.%03u' % timetuple_from_msec(int((ts + 45) // 90))
 | 
						|
 | 
						|
 | 
						|
class Block:
 | 
						|
    """
 | 
						|
    An abstract WebVTT block.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, **kwargs):
 | 
						|
        for key, val in kwargs.items():
 | 
						|
            setattr(self, key, val)
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def parse(cls, parser):
 | 
						|
        m = parser.match(cls._REGEX)
 | 
						|
        if not m:
 | 
						|
            return None
 | 
						|
        parser.advance(m)
 | 
						|
        return cls(raw=m.group(0))
 | 
						|
 | 
						|
    def write_into(self, stream):
 | 
						|
        stream.write(self.raw)
 | 
						|
 | 
						|
 | 
						|
class HeaderBlock(Block):
 | 
						|
    """
 | 
						|
    A WebVTT block that may only appear in the header part of the file,
 | 
						|
    i.e. before any cue blocks.
 | 
						|
    """
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class Magic(HeaderBlock):
 | 
						|
    _REGEX = re.compile(r'\ufeff?WEBVTT([ \t][^\r\n]*)?(?:\r\n|[\r\n])')
 | 
						|
 | 
						|
    # XXX: The X-TIMESTAMP-MAP extension is described in RFC 8216 §3.5
 | 
						|
    # <https://tools.ietf.org/html/rfc8216#section-3.5>, but the RFC
 | 
						|
    # doesn't specify the exact grammar nor where in the WebVTT
 | 
						|
    # syntax it should be placed; the below has been devised based
 | 
						|
    # on usage in the wild
 | 
						|
    #
 | 
						|
    # And strictly speaking, the presence of this extension violates
 | 
						|
    # the W3C WebVTT spec. Oh well.
 | 
						|
 | 
						|
    _REGEX_TSMAP = re.compile(r'X-TIMESTAMP-MAP=')
 | 
						|
    _REGEX_TSMAP_LOCAL = re.compile(r'LOCAL:')
 | 
						|
    _REGEX_TSMAP_MPEGTS = re.compile(r'MPEGTS:([0-9]+)')
 | 
						|
    _REGEX_TSMAP_SEP = re.compile(r'[ \t]*,[ \t]*')
 | 
						|
 | 
						|
    # This was removed from the spec in the 2017 revision;
 | 
						|
    # the last spec draft to describe this syntax element is
 | 
						|
    # <https://www.w3.org/TR/2015/WD-webvtt1-20151208/#webvtt-metadata-header>.
 | 
						|
    # Nevertheless, YouTube keeps serving those
 | 
						|
    _REGEX_META = re.compile(r'(?:(?!-->)[^\r\n])+:(?:(?!-->)[^\r\n])+(?:\r\n|[\r\n])')
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def __parse_tsmap(cls, parser):
 | 
						|
        parser = parser.child()
 | 
						|
 | 
						|
        while True:
 | 
						|
            m = parser.consume(cls._REGEX_TSMAP_LOCAL)
 | 
						|
            if m:
 | 
						|
                m = parser.consume(_REGEX_TS)
 | 
						|
                if m is None:
 | 
						|
                    raise ParseError(parser)
 | 
						|
                local = _parse_ts(m)
 | 
						|
                if local is None:
 | 
						|
                    raise ParseError(parser)
 | 
						|
            else:
 | 
						|
                m = parser.consume(cls._REGEX_TSMAP_MPEGTS)
 | 
						|
                if m:
 | 
						|
                    mpegts = int_or_none(m.group(1))
 | 
						|
                    if mpegts is None:
 | 
						|
                        raise ParseError(parser)
 | 
						|
                else:
 | 
						|
                    raise ParseError(parser)
 | 
						|
            if parser.consume(cls._REGEX_TSMAP_SEP):
 | 
						|
                continue
 | 
						|
            if parser.consume(_REGEX_NL):
 | 
						|
                break
 | 
						|
            raise ParseError(parser)
 | 
						|
 | 
						|
        parser.commit()
 | 
						|
        return local, mpegts
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def parse(cls, parser):
 | 
						|
        parser = parser.child()
 | 
						|
 | 
						|
        m = parser.consume(cls._REGEX)
 | 
						|
        if not m:
 | 
						|
            raise ParseError(parser)
 | 
						|
 | 
						|
        extra = m.group(1)
 | 
						|
        local, mpegts, meta = None, None, ''
 | 
						|
        while not parser.consume(_REGEX_NL):
 | 
						|
            if parser.consume(cls._REGEX_TSMAP):
 | 
						|
                local, mpegts = cls.__parse_tsmap(parser)
 | 
						|
                continue
 | 
						|
            m = parser.consume(cls._REGEX_META)
 | 
						|
            if m:
 | 
						|
                meta += m.group(0)
 | 
						|
                continue
 | 
						|
            raise ParseError(parser)
 | 
						|
        parser.commit()
 | 
						|
        return cls(extra=extra, mpegts=mpegts, local=local, meta=meta)
 | 
						|
 | 
						|
    def write_into(self, stream):
 | 
						|
        stream.write('WEBVTT')
 | 
						|
        if self.extra is not None:
 | 
						|
            stream.write(self.extra)
 | 
						|
        stream.write('\n')
 | 
						|
        if self.local or self.mpegts:
 | 
						|
            stream.write('X-TIMESTAMP-MAP=LOCAL:')
 | 
						|
            stream.write(_format_ts(self.local if self.local is not None else 0))
 | 
						|
            stream.write(',MPEGTS:')
 | 
						|
            stream.write(str(self.mpegts if self.mpegts is not None else 0))
 | 
						|
            stream.write('\n')
 | 
						|
        if self.meta:
 | 
						|
            stream.write(self.meta)
 | 
						|
        stream.write('\n')
 | 
						|
 | 
						|
 | 
						|
class StyleBlock(HeaderBlock):
 | 
						|
    _REGEX = re.compile(r'''(?x)
 | 
						|
        STYLE[\ \t]*(?:\r\n|[\r\n])
 | 
						|
        ((?:(?!-->)[^\r\n])+(?:\r\n|[\r\n]))*
 | 
						|
        (?:\r\n|[\r\n])
 | 
						|
    ''')
 | 
						|
 | 
						|
 | 
						|
class RegionBlock(HeaderBlock):
 | 
						|
    _REGEX = re.compile(r'''(?x)
 | 
						|
        REGION[\ \t]*
 | 
						|
        ((?:(?!-->)[^\r\n])+(?:\r\n|[\r\n]))*
 | 
						|
        (?:\r\n|[\r\n])
 | 
						|
    ''')
 | 
						|
 | 
						|
 | 
						|
class CommentBlock(Block):
 | 
						|
    _REGEX = re.compile(r'''(?x)
 | 
						|
        NOTE(?:\r\n|[\ \t\r\n])
 | 
						|
        ((?:(?!-->)[^\r\n])+(?:\r\n|[\r\n]))*
 | 
						|
        (?:\r\n|[\r\n])
 | 
						|
    ''')
 | 
						|
 | 
						|
 | 
						|
class CueBlock(Block):
 | 
						|
    """
 | 
						|
    A cue block. The payload is not interpreted.
 | 
						|
    """
 | 
						|
 | 
						|
    _REGEX_ID = re.compile(r'((?:(?!-->)[^\r\n])+)(?:\r\n|[\r\n])')
 | 
						|
    _REGEX_ARROW = re.compile(r'[ \t]+-->[ \t]+')
 | 
						|
    _REGEX_SETTINGS = re.compile(r'[ \t]+((?:(?!-->)[^\r\n])+)')
 | 
						|
    _REGEX_PAYLOAD = re.compile(r'[^\r\n]+(?:\r\n|[\r\n])?')
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def parse(cls, parser):
 | 
						|
        parser = parser.child()
 | 
						|
 | 
						|
        id_ = None
 | 
						|
        m = parser.consume(cls._REGEX_ID)
 | 
						|
        if m:
 | 
						|
            id_ = m.group(1)
 | 
						|
 | 
						|
        m0 = parser.consume(_REGEX_TS)
 | 
						|
        if not m0:
 | 
						|
            return None
 | 
						|
        if not parser.consume(cls._REGEX_ARROW):
 | 
						|
            return None
 | 
						|
        m1 = parser.consume(_REGEX_TS)
 | 
						|
        if not m1:
 | 
						|
            return None
 | 
						|
        m2 = parser.consume(cls._REGEX_SETTINGS)
 | 
						|
        parser.consume(_REGEX_OPTIONAL_WHITESPACE)
 | 
						|
        if not parser.consume(_REGEX_NL):
 | 
						|
            return None
 | 
						|
 | 
						|
        start = _parse_ts(m0)
 | 
						|
        end = _parse_ts(m1)
 | 
						|
        settings = m2.group(1) if m2 is not None else None
 | 
						|
 | 
						|
        text = io.StringIO()
 | 
						|
        while True:
 | 
						|
            m = parser.consume(cls._REGEX_PAYLOAD)
 | 
						|
            if not m:
 | 
						|
                break
 | 
						|
            text.write(m.group(0))
 | 
						|
 | 
						|
        parser.commit()
 | 
						|
        return cls(
 | 
						|
            id=id_,
 | 
						|
            start=start, end=end, settings=settings,
 | 
						|
            text=text.getvalue(),
 | 
						|
        )
 | 
						|
 | 
						|
    def write_into(self, stream):
 | 
						|
        if self.id is not None:
 | 
						|
            stream.write(self.id)
 | 
						|
            stream.write('\n')
 | 
						|
        stream.write(_format_ts(self.start))
 | 
						|
        stream.write(' --> ')
 | 
						|
        stream.write(_format_ts(self.end))
 | 
						|
        if self.settings is not None:
 | 
						|
            stream.write(' ')
 | 
						|
            stream.write(self.settings)
 | 
						|
        stream.write('\n')
 | 
						|
        stream.write(self.text)
 | 
						|
        stream.write('\n')
 | 
						|
 | 
						|
    @property
 | 
						|
    def as_json(self):
 | 
						|
        return {
 | 
						|
            'id': self.id,
 | 
						|
            'start': self.start,
 | 
						|
            'end': self.end,
 | 
						|
            'text': self.text,
 | 
						|
            'settings': self.settings,
 | 
						|
        }
 | 
						|
 | 
						|
    def __eq__(self, other):
 | 
						|
        return self.as_json == other.as_json
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def from_json(cls, json):
 | 
						|
        return cls(
 | 
						|
            id=json['id'],
 | 
						|
            start=json['start'],
 | 
						|
            end=json['end'],
 | 
						|
            text=json['text'],
 | 
						|
            settings=json['settings'],
 | 
						|
        )
 | 
						|
 | 
						|
    def hinges(self, other):
 | 
						|
        if self.text != other.text:
 | 
						|
            return False
 | 
						|
        if self.settings != other.settings:
 | 
						|
            return False
 | 
						|
        return self.start <= self.end == other.start <= other.end
 | 
						|
 | 
						|
 | 
						|
def parse_fragment(frag_content):
 | 
						|
    """
 | 
						|
    A generator that yields (partially) parsed WebVTT blocks when given
 | 
						|
    a bytes object containing the raw contents of a WebVTT file.
 | 
						|
    """
 | 
						|
 | 
						|
    parser = _MatchParser(frag_content.decode())
 | 
						|
 | 
						|
    yield Magic.parse(parser)
 | 
						|
 | 
						|
    while not parser.match(_REGEX_EOF):
 | 
						|
        if parser.consume(_REGEX_BLANK):
 | 
						|
            continue
 | 
						|
 | 
						|
        block = RegionBlock.parse(parser)
 | 
						|
        if block:
 | 
						|
            yield block
 | 
						|
            continue
 | 
						|
        block = StyleBlock.parse(parser)
 | 
						|
        if block:
 | 
						|
            yield block
 | 
						|
            continue
 | 
						|
        block = CommentBlock.parse(parser)
 | 
						|
        if block:
 | 
						|
            yield block  # XXX: or skip
 | 
						|
            continue
 | 
						|
 | 
						|
        break
 | 
						|
 | 
						|
    while not parser.match(_REGEX_EOF):
 | 
						|
        if parser.consume(_REGEX_BLANK):
 | 
						|
            continue
 | 
						|
 | 
						|
        block = CommentBlock.parse(parser)
 | 
						|
        if block:
 | 
						|
            yield block  # XXX: or skip
 | 
						|
            continue
 | 
						|
        block = CueBlock.parse(parser)
 | 
						|
        if block:
 | 
						|
            yield block
 | 
						|
            continue
 | 
						|
 | 
						|
        raise ParseError(parser)
 |