From 482713f787ffe2c771b65a9b80028030312e286a Mon Sep 17 00:00:00 2001 From: Simon Sawicki Date: Wed, 11 Jun 2025 03:32:21 +0200 Subject: [PATCH] [utils] `web/devalue`: Implement base module --- test/test_devalue.py | 206 +++++++++++++++++++++++++++++++++++ yt_dlp/utils/web/__init__.py | 0 yt_dlp/utils/web/devalue.py | 145 ++++++++++++++++++++++++ 3 files changed, 351 insertions(+) create mode 100644 test/test_devalue.py create mode 100644 yt_dlp/utils/web/__init__.py create mode 100644 yt_dlp/utils/web/devalue.py diff --git a/test/test_devalue.py b/test/test_devalue.py new file mode 100644 index 0000000000..bfc3960239 --- /dev/null +++ b/test/test_devalue.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 + +# Allow direct execution +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + + +import datetime as dt +import math +import re +import unittest + +from yt_dlp.utils.web import devalue + + +TEST_CASES_EQUALS = [{ + 'name': 'int', + 'unparsed': [-42], + 'parsed': -42, +}, { + 'name': 'str', + 'unparsed': ['woo!!!'], + 'parsed': 'woo!!!', +}, { + 'name': 'Number', + 'unparsed': [['Object', 42]], + 'parsed': 42, +}, { + 'name': 'String', + 'unparsed': [['Object', 'yar']], + 'parsed': 'yar', +}, { + 'name': 'negative zero', + 'unparsed': -6, + 'parsed': -0.0, +}, { + 'name': 'RegExp', + 'unparsed': [['RegExp', 'regexp', 'gim']], # XXX: flags are ignored + 'parsed': re.compile('regexp'), +}, { + 'name': 'Date', + 'unparsed': [['Date', '2001-09-09T01:46:40.000Z']], + 'parsed': dt.datetime.fromtimestamp(1000000000, tz=dt.timezone.utc), +}, { + 'name': 'Array', + 'unparsed': [[1, 2, 3], 'a', 'b', 'c'], + 'parsed': ['a', 'b', 'c'], +}, { + 'name': 'Array (empty)', + 'unparsed': [[]], + 'parsed': [], +}, { + 'name': 'Array (sparse)', + 'unparsed': [[-2, 1, -2], 'b'], + 'parsed': [None, 'b', None], +}, { + 'name': 'Object', + 'unparsed': [{'foo': 1, 'x-y': 2}, 'bar', 'z'], + 'parsed': {'foo': 'bar', 'x-y': 'z'}, +}, { + 'name': 'Set', + 'unparsed': [['Set', 1, 2, 3], 1, 2, 3], + 'parsed': [1, 2, 3], +}, { + 'name': 'Map', + 'unparsed': [['Map', 1, 2], 'a', 'b'], + 'parsed': [['a', 'b']], +}, { + 'name': 'BigInt', + 'unparsed': [['BigInt', '1']], + 'parsed': 1, +}, { + 'name': 'Uint8Array', + 'unparsed': [['Uint8Array', 'AQID']], + 'parsed': [1, 2, 3], +}, { + 'name': 'ArrayBuffer', + 'unparsed': [['ArrayBuffer', 'AQID']], + 'parsed': [1, 2, 3], +}, { + 'name': 'str (repetition)', + 'unparsed': [[1, 1], 'a string'], + 'parsed': ['a string', 'a string'], +}, { + 'name': 'None (repetition)', + 'unparsed': [[1, 1], None], + 'parsed': [None, None], +}, { + 'name': 'dict (repetition)', + 'unparsed': [[1, 1], {}], + 'parsed': [{}, {}], +}, { + 'name': 'Object without prototype', + 'unparsed': [['null']], + 'parsed': {}, +}, { + 'name': 'cross-realm POJO', + 'unparsed': [{}], + 'parsed': {}, +}, { + 'name': 'Infinity', + 'unparsed': -4, + 'parsed': math.inf, +}, { + 'name': 'negative Infinity', + 'unparsed': -5, + 'parsed': -math.inf, +}] + +TEST_CASES_IS = [{ + 'name': 'bool', + 'unparsed': [True], + 'parsed': True, +}, { + 'name': 'Boolean', + 'unparsed': [['Object', False]], + 'parsed': False, +}, { + 'name': 'undefined', + 'unparsed': -1, + 'parsed': None, +}, { + 'name': 'null', + 'unparsed': [None], + 'parsed': None, +}, { + 'name': 'NaN', + 'unparsed': -3, + 'parsed': math.nan, +}] + +TEST_CASES_INVALID = [{ + 'name': 'empty string', + 'unparsed': '', +}, { + 'name': 'hole', + 'unparsed': '-2', +}, { + 'name': 'string', + 'unparsed': 'hello', +}, { + 'name': 'number', + 'unparsed': 42, +}, { + 'name': 'boolean', + 'unparsed': True, +}, { + 'name': 'null', + 'unparsed': None, +}, { + 'name': 'object', + 'unparsed': {}, +}, { + 'name': 'empty array', + 'unparsed': [], +}] + + +class TestDevalue(unittest.TestCase): + def test_devalue_parse_equals(self): + for tc in TEST_CASES_EQUALS: + self.assertEqual(devalue.parse(tc['unparsed']), tc['parsed'], tc['name']) + + def test_devalue_parse_is(self): + for tc in TEST_CASES_IS: + self.assertIs(devalue.parse(tc['unparsed']), tc['parsed'], tc['name']) + + def test_devalue_parse_invalid(self): + pass + # for tc in TEST_CASES_INVALID: + # pass # XXX: Not sure how to write this without seeing the impl + + def test_devalue_parse_cyclical(self): + name = 'Map (cyclical)' + result = devalue.parse([['Map', 1, 0], 'self']) + self.assertEqual(result[0][0], 'self', name) + self.assertIs(result, result[0][1], name) + + name = 'Set (cyclical)' + result = devalue.parse([['Set', 0, 1], 42]) + self.assertEqual(result[1], 42, name) + self.assertIs(result, result[0], name) + + result = devalue.parse([[0]]) + self.assertIs(result, result[0], 'Array (cyclical)') + + name = 'Object (cyclical)' + result = devalue.parse([{'self': 0}]) + self.assertIs(result, result['self'], name) + + name = 'Object with null prototype (cyclical)' + result = devalue.parse([['null', 'self', 0]]) + self.assertIs(result, result['self'], name) + + name = 'Objects (cyclical)' + result = devalue.parse([[1, 2], {'second': 2}, {'first': 1}]) + self.assertIs(result[0], result[1]['first'], name) + self.assertIs(result[1], result[0]['second'], name) + +# XXX: add custom revivers test / or will this be tested by the InfoExtractor._search_nuxt_json tests? + + +if __name__ == '__main__': + unittest.main() diff --git a/yt_dlp/utils/web/__init__.py b/yt_dlp/utils/web/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/yt_dlp/utils/web/devalue.py b/yt_dlp/utils/web/devalue.py new file mode 100644 index 0000000000..a13e8a8a52 --- /dev/null +++ b/yt_dlp/utils/web/devalue.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +import array +import base64 +import datetime as dt +import math +import re + +TYPE_CHECKING = False +if TYPE_CHECKING: + import collections.abc + import typing + + T = typing.TypeVar('T') + + +_ARRAY_TYPE_LOOKUP = { + 'Int8Array': 'b', + 'Uint8Array': 'B', + 'Uint8ClampedArray': 'B', + 'Int16Array': 'h', + 'Uint16Array': 'H', + 'Int32Array': 'i', + 'Uint32Array': 'I', + 'Float32Array': 'f', + 'Float64Array': 'd', + 'BigInt64Array': 'l', + 'BigUint64Array': 'L', + 'ArrayBuffer': 'B', +} + + +def parse_iter(parsed: typing.Any, /, *, revivers: dict[str, collections.abc.Callable[[list], typing.Any]] | None = None): + resolved = { + -1: None, + -2: None, + -3: math.nan, + -4: math.inf, + -5: -math.inf, + -6: -0.0, + } + if isinstance(parsed, int): + return resolved[parsed] + elif not isinstance(parsed, list): + raise ValueError('expected int or list as input') + + if revivers is None: + revivers = {} + return_value = [None] + stack: list[tuple] = [(return_value, 0, 0)] + + while stack: + target, index, source = stack.pop() + if isinstance(source, tuple): + name, source, reviver = source + try: + resolved[source] = target[index] = reviver(target[index]) + except Exception as error: + yield TypeError(f'Failed to parse {source} as {name!r}: {error}') + resolved[source] = target[index] = None + continue + + if source in resolved: + target[index] = resolved[source] + continue + + value = parsed[source] + if isinstance(value, list): + if value and isinstance(value[0], str): + # TODO: implement zips `strict=True` + if reviver := revivers.get(value[0]): + # inverse order: resolve index, revive value + stack.append((target, index, (value[0], value[1], reviver))) + stack.append((target, index, value[1])) + continue + + elif value[0] == 'Date': + try: + result = dt.datetime.fromisoformat(value[1]) + except Exception: + yield ValueError(f'invalid date: {value[1]!r}') + result = None + + elif value[0] == 'Set': + result = [None] * (len(value) - 1) + for offset, new_source in enumerate(value[1:]): + stack.append((result, offset, new_source)) + + elif value[0] == 'Map': + result = [] + for key, new_source in zip(*(iter(value[1:]),) * 2): + pair = [None, None] + stack.append((pair, 0, key)) + stack.append((pair, 1, new_source)) + result.append(pair) + + elif value[0] == 'RegExp': + # XXX: use jsinterp to translate regex flags + # currently ignores `value[2]` + result = re.compile(value[1]) + + elif value[0] == 'Object': + result = value[1] + + elif value[0] == 'BigInt': + result = int(value[1]) + + elif value[0] == 'null': + result = {} + for key, new_source in zip(*(iter(value[1:]),) * 2): + stack.append((result, key, new_source)) + + elif value[0] in _ARRAY_TYPE_LOOKUP: + typecode = _ARRAY_TYPE_LOOKUP[value[0]] + data = base64.b64decode(value[1]) + result = array.array(typecode, data).tolist() + + else: + yield TypeError(f'invalid type at {source}: {value[0]!r}') + result = None + else: + result = len(value) * [None] + for offset, new_source in enumerate(value): + stack.append((result, offset, new_source)) + + elif isinstance(value, dict): + result = {} + for key, new_source in value.items(): + stack.append((result, key, new_source)) + + else: + result = value + + target[index] = resolved[source] = result + + return return_value[0] + + +def parse(parsed: typing.Any, /, *, revivers: dict[str, collections.abc.Callable[[typing.Any], typing.Any]] | None = None): + generator = parse_iter(parsed, revivers=revivers) + while True: + try: + raise generator.send(None) + except StopIteration as error: + return error.value