1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-08-13 16:08:29 +00:00

HlsDL: ability to process the image cloaking fragments

This commit is contained in:
mozartilize 2025-08-09 16:54:36 +07:00
parent e8d49b1c7f
commit 8f2bfe8880
6 changed files with 65 additions and 2 deletions

View File

@ -447,6 +447,10 @@ class YoutubeDL:
Actual sleep time will be a random float from range
[sleep_interval; max_sleep_interval].
sleep_interval_subtitles: Number of seconds to sleep before each subtitle download
fragment_image_cloaking:
Wheather strip out at the begining up to the length
of the image cloaking signature.
Has value of auto or list of mime:bytes_length separates by comma.
listformats: Print an overview of available video formats and exit.
list_thumbnails: Print a table of all thumbnails and exit.
match_filter: A function that gets called for every video with the signature
@ -810,6 +814,17 @@ def check_deprecated(param, option, suggestion):
'Set the LC_ALL environment variable to fix this.')
self.params['restrictfilenames'] = True
if self.params.get('fragment_image_cloaking') is not None:
if self.params['fragment_image_cloaking'] != 'auto':
self.params['fragment_image_cloaking'] = {
mime: int(length)
for mime, length in (
p.split(':') for p in self.params['fragment_image_cloaking'].split(',')
)
}
else:
self.params['fragment_image_cloaking'] = {}
self._parse_outtmpl()
# Creating format selector here allows us to catch syntax errors before the extraction

View File

@ -952,6 +952,7 @@ def parse_options(argv=None):
'sleep_interval': opts.sleep_interval,
'max_sleep_interval': opts.max_sleep_interval,
'sleep_interval_subtitles': opts.sleep_interval_subtitles,
'fragment_image_cloaking': opts.fragment_image_cloaking,
'external_downloader': opts.external_downloader,
'download_ranges': opts.download_ranges,
'force_keyframes_at_cuts': opts.force_keyframes_at_cuts,

View File

@ -127,6 +127,7 @@ def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_dat
if fragment_info_dict.get('filetime'):
ctx['fragment_filetime'] = fragment_info_dict.get('filetime')
ctx['fragment_filename_sanitized'] = fragment_filename
ctx['fragment_content_type'] = fragment_info_dict.get('fragment_content_type')
return True
def _read_fragment(self, ctx):
@ -488,14 +489,25 @@ def append_fragment(frag_content, frag_index, ctx):
def _download_fragment(fragment):
ctx_copy = ctx.copy()
download_fragment(fragment, ctx_copy)
return fragment, fragment['frag_index'], ctx_copy.get('fragment_filename_sanitized')
return (
fragment,
fragment['frag_index'],
ctx_copy.get('fragment_filename_sanitized'),
ctx_copy.get('fragment_content_type'),
)
with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
try:
for fragment, frag_index, frag_filename in pool.map(_download_fragment, fragments):
for (
fragment,
frag_index,
frag_filename,
fragment_content_type,
) in pool.map(_download_fragment, fragments):
ctx.update({
'fragment_filename_sanitized': frag_filename,
'fragment_index': frag_index,
'fragment_content_type': fragment_content_type,
})
if not append_fragment(decrypt_fragment(fragment, self._read_fragment(ctx)), frag_index, ctx):
return False

View File

@ -15,9 +15,17 @@
traverse_obj,
update_url_query,
urljoin,
YoutubeDLError,
)
from ..utils._utils import _request_dump_filename
IMAGE_CLOAKING_HEADER_LENGTHS = {
'image/png': 8,
'image/bmp': 2,
'image/jpg': 4,
'image/jpeg': 4,
}
class HlsFD(FragmentFD):
"""
@ -407,3 +415,22 @@ def fin_fragments():
ctx, fragments, info_dict, pack_func=pack_fragment, finish_func=fin_fragments)
else:
return self.download_and_append_fragments(ctx, fragments, info_dict)
def _image_cloaking_stripper(self, ctx, frag_content, fragment_image_cloaking):
content_type = ctx.get('fragment_content_type')
bl = (
fragment_image_cloaking.get(content_type)
or IMAGE_CLOAKING_HEADER_LENGTHS.get(content_type)
)
if bl:
return frag_content[bl:]
raise YoutubeDLError(f'Unknown length to strip for fragment type of {content_type}')
def _append_fragment(self, ctx, frag_content):
if self.ydl.params.get('fragment_image_cloaking') is None:
processed_frag_content = frag_content
else:
processed_frag_content = self._image_cloaking_stripper(
ctx, frag_content, self.ydl.params['fragment_image_cloaking'])
super()._append_fragment(ctx, processed_frag_content)

View File

@ -208,6 +208,9 @@ def download():
# doing auto decompression. (See: https://github.com/yt-dlp/yt-dlp/pull/6176)
data_len = None
# The content type might be not video due to image cloaking.
info_dict['fragment_content_type'] = ctx.data.headers.get('Content-Type')
# Range HTTP header may be ignored/unsupported by a webserver
# (e.g. extractor/scivee.py, extractor/bambuser.py).
# However, for a test we still would like to download just a piece of a file.

View File

@ -1192,6 +1192,11 @@ def _preset_alias_callback(option, opt_str, value, parser):
'--sleep-subtitles', metavar='SECONDS',
dest='sleep_interval_subtitles', default=0, type=int,
help='Number of seconds to sleep before each subtitle download')
workarounds.add_option(
'--fragment-image-cloaking', metavar='MIME:BYTES_LENGTH[,...]',
dest='fragment_image_cloaking', type=str,
help=('Wheather strip out at the begining up to the length of the image cloaking signature. '
'Set auto to use predefined lengths based on fragment content type.'))
verbosity = optparse.OptionGroup(parser, 'Verbosity and Simulation Options')
verbosity.add_option(