1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-06-27 17:08:32 +00:00

[utils] web/devalue: Implement base module

This commit is contained in:
Simon Sawicki 2025-06-11 03:32:21 +02:00
parent a32dc1bcbe
commit 482713f787
No known key found for this signature in database
3 changed files with 351 additions and 0 deletions

206
test/test_devalue.py Normal file
View File

@ -0,0 +1,206 @@
#!/usr/bin/env python3
# Allow direct execution
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import datetime as dt
import math
import re
import unittest
from yt_dlp.utils.web import devalue
TEST_CASES_EQUALS = [{
'name': 'int',
'unparsed': [-42],
'parsed': -42,
}, {
'name': 'str',
'unparsed': ['woo!!!'],
'parsed': 'woo!!!',
}, {
'name': 'Number',
'unparsed': [['Object', 42]],
'parsed': 42,
}, {
'name': 'String',
'unparsed': [['Object', 'yar']],
'parsed': 'yar',
}, {
'name': 'negative zero',
'unparsed': -6,
'parsed': -0.0,
}, {
'name': 'RegExp',
'unparsed': [['RegExp', 'regexp', 'gim']], # XXX: flags are ignored
'parsed': re.compile('regexp'),
}, {
'name': 'Date',
'unparsed': [['Date', '2001-09-09T01:46:40.000Z']],
'parsed': dt.datetime.fromtimestamp(1000000000, tz=dt.timezone.utc),
}, {
'name': 'Array',
'unparsed': [[1, 2, 3], 'a', 'b', 'c'],
'parsed': ['a', 'b', 'c'],
}, {
'name': 'Array (empty)',
'unparsed': [[]],
'parsed': [],
}, {
'name': 'Array (sparse)',
'unparsed': [[-2, 1, -2], 'b'],
'parsed': [None, 'b', None],
}, {
'name': 'Object',
'unparsed': [{'foo': 1, 'x-y': 2}, 'bar', 'z'],
'parsed': {'foo': 'bar', 'x-y': 'z'},
}, {
'name': 'Set',
'unparsed': [['Set', 1, 2, 3], 1, 2, 3],
'parsed': [1, 2, 3],
}, {
'name': 'Map',
'unparsed': [['Map', 1, 2], 'a', 'b'],
'parsed': [['a', 'b']],
}, {
'name': 'BigInt',
'unparsed': [['BigInt', '1']],
'parsed': 1,
}, {
'name': 'Uint8Array',
'unparsed': [['Uint8Array', 'AQID']],
'parsed': [1, 2, 3],
}, {
'name': 'ArrayBuffer',
'unparsed': [['ArrayBuffer', 'AQID']],
'parsed': [1, 2, 3],
}, {
'name': 'str (repetition)',
'unparsed': [[1, 1], 'a string'],
'parsed': ['a string', 'a string'],
}, {
'name': 'None (repetition)',
'unparsed': [[1, 1], None],
'parsed': [None, None],
}, {
'name': 'dict (repetition)',
'unparsed': [[1, 1], {}],
'parsed': [{}, {}],
}, {
'name': 'Object without prototype',
'unparsed': [['null']],
'parsed': {},
}, {
'name': 'cross-realm POJO',
'unparsed': [{}],
'parsed': {},
}, {
'name': 'Infinity',
'unparsed': -4,
'parsed': math.inf,
}, {
'name': 'negative Infinity',
'unparsed': -5,
'parsed': -math.inf,
}]
TEST_CASES_IS = [{
'name': 'bool',
'unparsed': [True],
'parsed': True,
}, {
'name': 'Boolean',
'unparsed': [['Object', False]],
'parsed': False,
}, {
'name': 'undefined',
'unparsed': -1,
'parsed': None,
}, {
'name': 'null',
'unparsed': [None],
'parsed': None,
}, {
'name': 'NaN',
'unparsed': -3,
'parsed': math.nan,
}]
TEST_CASES_INVALID = [{
'name': 'empty string',
'unparsed': '',
}, {
'name': 'hole',
'unparsed': '-2',
}, {
'name': 'string',
'unparsed': 'hello',
}, {
'name': 'number',
'unparsed': 42,
}, {
'name': 'boolean',
'unparsed': True,
}, {
'name': 'null',
'unparsed': None,
}, {
'name': 'object',
'unparsed': {},
}, {
'name': 'empty array',
'unparsed': [],
}]
class TestDevalue(unittest.TestCase):
def test_devalue_parse_equals(self):
for tc in TEST_CASES_EQUALS:
self.assertEqual(devalue.parse(tc['unparsed']), tc['parsed'], tc['name'])
def test_devalue_parse_is(self):
for tc in TEST_CASES_IS:
self.assertIs(devalue.parse(tc['unparsed']), tc['parsed'], tc['name'])
def test_devalue_parse_invalid(self):
pass
# for tc in TEST_CASES_INVALID:
# pass # XXX: Not sure how to write this without seeing the impl
def test_devalue_parse_cyclical(self):
name = 'Map (cyclical)'
result = devalue.parse([['Map', 1, 0], 'self'])
self.assertEqual(result[0][0], 'self', name)
self.assertIs(result, result[0][1], name)
name = 'Set (cyclical)'
result = devalue.parse([['Set', 0, 1], 42])
self.assertEqual(result[1], 42, name)
self.assertIs(result, result[0], name)
result = devalue.parse([[0]])
self.assertIs(result, result[0], 'Array (cyclical)')
name = 'Object (cyclical)'
result = devalue.parse([{'self': 0}])
self.assertIs(result, result['self'], name)
name = 'Object with null prototype (cyclical)'
result = devalue.parse([['null', 'self', 0]])
self.assertIs(result, result['self'], name)
name = 'Objects (cyclical)'
result = devalue.parse([[1, 2], {'second': 2}, {'first': 1}])
self.assertIs(result[0], result[1]['first'], name)
self.assertIs(result[1], result[0]['second'], name)
# XXX: add custom revivers test / or will this be tested by the InfoExtractor._search_nuxt_json tests?
if __name__ == '__main__':
unittest.main()

View File

145
yt_dlp/utils/web/devalue.py Normal file
View File

@ -0,0 +1,145 @@
from __future__ import annotations
import array
import base64
import datetime as dt
import math
import re
TYPE_CHECKING = False
if TYPE_CHECKING:
import collections.abc
import typing
T = typing.TypeVar('T')
_ARRAY_TYPE_LOOKUP = {
'Int8Array': 'b',
'Uint8Array': 'B',
'Uint8ClampedArray': 'B',
'Int16Array': 'h',
'Uint16Array': 'H',
'Int32Array': 'i',
'Uint32Array': 'I',
'Float32Array': 'f',
'Float64Array': 'd',
'BigInt64Array': 'l',
'BigUint64Array': 'L',
'ArrayBuffer': 'B',
}
def parse_iter(parsed: typing.Any, /, *, revivers: dict[str, collections.abc.Callable[[list], typing.Any]] | None = None):
resolved = {
-1: None,
-2: None,
-3: math.nan,
-4: math.inf,
-5: -math.inf,
-6: -0.0,
}
if isinstance(parsed, int):
return resolved[parsed]
elif not isinstance(parsed, list):
raise ValueError('expected int or list as input')
if revivers is None:
revivers = {}
return_value = [None]
stack: list[tuple] = [(return_value, 0, 0)]
while stack:
target, index, source = stack.pop()
if isinstance(source, tuple):
name, source, reviver = source
try:
resolved[source] = target[index] = reviver(target[index])
except Exception as error:
yield TypeError(f'Failed to parse {source} as {name!r}: {error}')
resolved[source] = target[index] = None
continue
if source in resolved:
target[index] = resolved[source]
continue
value = parsed[source]
if isinstance(value, list):
if value and isinstance(value[0], str):
# TODO: implement zips `strict=True`
if reviver := revivers.get(value[0]):
# inverse order: resolve index, revive value
stack.append((target, index, (value[0], value[1], reviver)))
stack.append((target, index, value[1]))
continue
elif value[0] == 'Date':
try:
result = dt.datetime.fromisoformat(value[1])
except Exception:
yield ValueError(f'invalid date: {value[1]!r}')
result = None
elif value[0] == 'Set':
result = [None] * (len(value) - 1)
for offset, new_source in enumerate(value[1:]):
stack.append((result, offset, new_source))
elif value[0] == 'Map':
result = []
for key, new_source in zip(*(iter(value[1:]),) * 2):
pair = [None, None]
stack.append((pair, 0, key))
stack.append((pair, 1, new_source))
result.append(pair)
elif value[0] == 'RegExp':
# XXX: use jsinterp to translate regex flags
# currently ignores `value[2]`
result = re.compile(value[1])
elif value[0] == 'Object':
result = value[1]
elif value[0] == 'BigInt':
result = int(value[1])
elif value[0] == 'null':
result = {}
for key, new_source in zip(*(iter(value[1:]),) * 2):
stack.append((result, key, new_source))
elif value[0] in _ARRAY_TYPE_LOOKUP:
typecode = _ARRAY_TYPE_LOOKUP[value[0]]
data = base64.b64decode(value[1])
result = array.array(typecode, data).tolist()
else:
yield TypeError(f'invalid type at {source}: {value[0]!r}')
result = None
else:
result = len(value) * [None]
for offset, new_source in enumerate(value):
stack.append((result, offset, new_source))
elif isinstance(value, dict):
result = {}
for key, new_source in value.items():
stack.append((result, key, new_source))
else:
result = value
target[index] = resolved[source] = result
return return_value[0]
def parse(parsed: typing.Any, /, *, revivers: dict[str, collections.abc.Callable[[typing.Any], typing.Any]] | None = None):
generator = parse_iter(parsed, revivers=revivers)
while True:
try:
raise generator.send(None)
except StopIteration as error:
return error.value