From acb19e95eeab0e7f27a939525475df815148178c Mon Sep 17 00:00:00 2001 From: coletdjnz Date: Sun, 10 Aug 2025 16:43:09 +1200 Subject: [PATCH] Add UMPEncoder --- test/test_sabr/test_ump.py | 62 ++++++++++++++++------ yt_dlp/extractor/youtube/_streaming/ump.py | 45 ++++++++++++++++ 2 files changed, 91 insertions(+), 16 deletions(-) diff --git a/test/test_sabr/test_ump.py b/test/test_sabr/test_ump.py index 38514c216b..1de1065f1b 100644 --- a/test/test_sabr/test_ump.py +++ b/test/test_sabr/test_ump.py @@ -1,7 +1,15 @@ import io import pytest -from yt_dlp.extractor.youtube._streaming.ump import varint_size, read_varint, UMPDecoder, UMPPartId +from yt_dlp.extractor.youtube._streaming.ump import ( + varint_size, + read_varint, + UMPDecoder, + UMPPartId, + write_varint, + UMPEncoder, + UMPPart, +) @pytest.mark.parametrize('data, expected', [ @@ -21,28 +29,32 @@ def test_varint_size(data, expected): @pytest.mark.parametrize('data, expected', [ - # 1 byte long varint (b'\x01', 1), - (b'\x4F', 79), - # 2 byte long varint - (b'\x80\x01', 64), - (b'\x8A\x7F', 8138), - (b'\xBF\x7F', 8191), - # 3 byte long varint - (b'\xC0\x80\x01', 12288), - (b'\xDF\x7F\xFF', 2093055), - # 4 byte long varint - (b'\xE0\x80\x80\x01', 1574912), - (b'\xEF\x7F\xFF\xFF', 268433407), - # 5 byte long varint - (b'\xF0\x80\x80\x80\x01', 25198720), - (b'\xFF\x7F\xFF\xFF\xFF', 4294967167), + (b'\xad\x05', 365), + (b'\xd5\x22\x05', 42069), + (b'\xe0\x68\x89\x09', 10000000), + (b'\xf0\xff\xc9\x9a\x3b', 999999999), + (b'\xf0\xff\xff\xff\xff', 4294967295), ], ) def test_readvarint(data, expected): assert read_varint(io.BytesIO(data)) == expected +@pytest.mark.parametrize('value, expected_bytes', [ + (1, b'\x01'), + (365, b'\xad\x05'), + (42069, b'\xd5\x22\x05'), + (10000000, b'\xe0\x68\x89\x09'), + (999999999, b'\xf0\xff\xc9\x9a\x3b'), + (4294967295, b'\xf0\xff\xff\xff\xff'), +]) +def test_writevarint(value, expected_bytes): + fp = io.BytesIO() + write_varint(fp, value) + assert fp.getvalue() == expected_bytes + + class TestUMPDecoder: EXAMPLE_PART_DATA = [ { @@ -100,3 +112,21 @@ def test_unexpected_eof(self): part.data.read() assert mock_file.closed + + +class TestUMPEncoder: + def test_write_part(self): + fp = io.BytesIO() + encoder = UMPEncoder(fp) + part = UMPPart( + part_id=UMPPartId.MEDIA_HEADER, + size=127, + data=io.BytesIO(b'\x01' * 127), + ) + + encoder.write_part(part) + + part_type = b'\x14' # MEDIA_HEADER part type + part_size = b'\x7F' # Part size of 127 + expected_data = part_type + part_size + b'\x01' * 127 + assert fp.getvalue() == expected_data diff --git a/yt_dlp/extractor/youtube/_streaming/ump.py b/yt_dlp/extractor/youtube/_streaming/ump.py index a980c6eeaa..0b3cb9c8b1 100644 --- a/yt_dlp/extractor/youtube/_streaming/ump.py +++ b/yt_dlp/extractor/youtube/_streaming/ump.py @@ -83,6 +83,19 @@ def iter_parts(self): yield UMPPart(UMPPartId(part_type), part_size, io.BytesIO(part_data)) +class UMPEncoder: + def __init__(self, fp: io.BufferedIOBase): + self.fp = fp + + def write_part(self, part: UMPPart) -> None: + if not isinstance(part.part_id, UMPPartId): + raise ValueError('part_id must be an instance of UMPPartId') + + write_varint(self.fp, part.part_id.value) + write_varint(self.fp, part.size) + self.fp.write(part.data.read()) + + def read_varint(fp: io.BufferedIOBase) -> int: # https://web.archive.org/web/20250430054327/https://github.com/gsuberland/UMP_Format/blob/main/UMP_Format.md # https://web.archive.org/web/20250429151021/https://github.com/davidzeng0/innertube/blob/main/googlevideo/ump.md @@ -114,3 +127,35 @@ def read_varint(fp: io.BufferedIOBase) -> int: def varint_size(byte: int) -> int: return 1 if byte < 128 else 2 if byte < 192 else 3 if byte < 224 else 4 if byte < 240 else 5 + + +def write_varint(fp: io.BufferedIOBase, value: int) -> None: + # ref: https://github.com/LuanRT/googlevideo/blob/main/src/core/UmpWriter.ts + if value < 0: + raise ValueError('Value must be a non-negative integer') + + if value < 128: + fp.write(bytes([value])) + elif value < 16384: + fp.write(bytes([ + (value & 0x3F) | 0x80, + value >> 6, + ])) + elif value < 2097152: + fp.write(bytes([ + (value & 0x1F) | 0xC0, + (value >> 5) & 0xFF, + value >> 13, + ])) + elif value < 268435456: + fp.write(bytes([ + (value & 0x0F) | 0xE0, + (value >> 4) & 0xFF, + (value >> 12) & 0xFF, + value >> 20, + ])) + else: + data = bytearray(5) + data[0] = 0xF0 + data[1:5] = value.to_bytes(4, 'little') + fp.write(data)