mirror of
				https://github.com/yt-dlp/yt-dlp.git
				synced 2025-10-30 22:25:19 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			168 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			168 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import annotations
 | |
| 
 | |
| import array
 | |
| import base64
 | |
| import datetime as dt
 | |
| import math
 | |
| import re
 | |
| 
 | |
| from .._utils import parse_iso8601
 | |
| 
 | |
| TYPE_CHECKING = False
 | |
| if TYPE_CHECKING:
 | |
|     import collections.abc
 | |
|     import typing
 | |
| 
 | |
|     T = typing.TypeVar('T')
 | |
| 
 | |
| 
 | |
| _ARRAY_TYPE_LOOKUP = {
 | |
|     'Int8Array': 'b',
 | |
|     'Uint8Array': 'B',
 | |
|     'Uint8ClampedArray': 'B',
 | |
|     'Int16Array': 'h',
 | |
|     'Uint16Array': 'H',
 | |
|     'Int32Array': 'i',
 | |
|     'Uint32Array': 'I',
 | |
|     'Float32Array': 'f',
 | |
|     'Float64Array': 'd',
 | |
|     'BigInt64Array': 'l',
 | |
|     'BigUint64Array': 'L',
 | |
|     'ArrayBuffer': 'B',
 | |
| }
 | |
| 
 | |
| 
 | |
| def parse_iter(parsed: typing.Any, /, *, revivers: dict[str, collections.abc.Callable[[list], typing.Any]] | None = None):
 | |
|     # based on https://github.com/Rich-Harris/devalue/blob/f3fd2aa93d79f21746555671f955a897335edb1b/src/parse.js
 | |
|     resolved = {
 | |
|         -1: None,
 | |
|         -2: None,
 | |
|         -3: math.nan,
 | |
|         -4: math.inf,
 | |
|         -5: -math.inf,
 | |
|         -6: -0.0,
 | |
|     }
 | |
| 
 | |
|     if isinstance(parsed, int) and not isinstance(parsed, bool):
 | |
|         if parsed not in resolved or parsed == -2:
 | |
|             raise ValueError('invalid integer input')
 | |
|         return resolved[parsed]
 | |
|     elif not isinstance(parsed, list):
 | |
|         raise ValueError('expected int or list as input')
 | |
|     elif not parsed:
 | |
|         raise ValueError('expected a non-empty list as input')
 | |
| 
 | |
|     if revivers is None:
 | |
|         revivers = {}
 | |
|     return_value = [None]
 | |
|     stack: list[tuple] = [(return_value, 0, 0)]
 | |
| 
 | |
|     while stack:
 | |
|         target, index, source = stack.pop()
 | |
|         if isinstance(source, tuple):
 | |
|             name, source, reviver = source
 | |
|             try:
 | |
|                 resolved[source] = target[index] = reviver(target[index])
 | |
|             except Exception as error:
 | |
|                 yield TypeError(f'failed to parse {source} as {name!r}: {error}')
 | |
|                 resolved[source] = target[index] = None
 | |
|             continue
 | |
| 
 | |
|         if source in resolved:
 | |
|             target[index] = resolved[source]
 | |
|             continue
 | |
| 
 | |
|         # guard against Python negative indexing
 | |
|         if source < 0:
 | |
|             yield IndexError(f'invalid index: {source!r}')
 | |
|             continue
 | |
| 
 | |
|         try:
 | |
|             value = parsed[source]
 | |
|         except IndexError as error:
 | |
|             yield error
 | |
|             continue
 | |
| 
 | |
|         if isinstance(value, list):
 | |
|             if value and isinstance(value[0], str):
 | |
|                 # TODO: implement zips `strict=True`
 | |
|                 if reviver := revivers.get(value[0]):
 | |
|                     if value[1] == source:
 | |
|                         # XXX: avoid infinite loop
 | |
|                         yield IndexError(f'{value[0]!r} cannot point to itself (index: {source})')
 | |
|                         continue
 | |
|                     # inverse order: resolve index, revive value
 | |
|                     stack.append((target, index, (value[0], value[1], reviver)))
 | |
|                     stack.append((target, index, value[1]))
 | |
|                     continue
 | |
| 
 | |
|                 elif value[0] == 'Date':
 | |
|                     try:
 | |
|                         result = dt.datetime.fromtimestamp(parse_iso8601(value[1]), tz=dt.timezone.utc)
 | |
|                     except Exception:
 | |
|                         yield ValueError(f'invalid date: {value[1]!r}')
 | |
|                         result = None
 | |
| 
 | |
|                 elif value[0] == 'Set':
 | |
|                     result = [None] * (len(value) - 1)
 | |
|                     for offset, new_source in enumerate(value[1:]):
 | |
|                         stack.append((result, offset, new_source))
 | |
| 
 | |
|                 elif value[0] == 'Map':
 | |
|                     result = []
 | |
|                     for key, new_source in zip(*(iter(value[1:]),) * 2, strict=True):
 | |
|                         pair = [None, None]
 | |
|                         stack.append((pair, 0, key))
 | |
|                         stack.append((pair, 1, new_source))
 | |
|                         result.append(pair)
 | |
| 
 | |
|                 elif value[0] == 'RegExp':
 | |
|                     # XXX: use jsinterp to translate regex flags
 | |
|                     #      currently ignores `value[2]`
 | |
|                     result = re.compile(value[1])
 | |
| 
 | |
|                 elif value[0] == 'Object':
 | |
|                     result = value[1]
 | |
| 
 | |
|                 elif value[0] == 'BigInt':
 | |
|                     result = int(value[1])
 | |
| 
 | |
|                 elif value[0] == 'null':
 | |
|                     result = {}
 | |
|                     for key, new_source in zip(*(iter(value[1:]),) * 2, strict=True):
 | |
|                         stack.append((result, key, new_source))
 | |
| 
 | |
|                 elif value[0] in _ARRAY_TYPE_LOOKUP:
 | |
|                     typecode = _ARRAY_TYPE_LOOKUP[value[0]]
 | |
|                     data = base64.b64decode(value[1])
 | |
|                     result = array.array(typecode, data).tolist()
 | |
| 
 | |
|                 else:
 | |
|                     yield TypeError(f'invalid type at {source}: {value[0]!r}')
 | |
|                     result = None
 | |
|             else:
 | |
|                 result = len(value) * [None]
 | |
|                 for offset, new_source in enumerate(value):
 | |
|                     stack.append((result, offset, new_source))
 | |
| 
 | |
|         elif isinstance(value, dict):
 | |
|             result = {}
 | |
|             for key, new_source in value.items():
 | |
|                 stack.append((result, key, new_source))
 | |
| 
 | |
|         else:
 | |
|             result = value
 | |
| 
 | |
|         target[index] = resolved[source] = result
 | |
| 
 | |
|     return return_value[0]
 | |
| 
 | |
| 
 | |
| def parse(parsed: typing.Any, /, *, revivers: dict[str, collections.abc.Callable[[typing.Any], typing.Any]] | None = None):
 | |
|     generator = parse_iter(parsed, revivers=revivers)
 | |
|     while True:
 | |
|         try:
 | |
|             raise generator.send(None)
 | |
|         except StopIteration as error:
 | |
|             return error.value
 | 
