From d20f58d721fe45fe873e3389a0d17a72352aecec Mon Sep 17 00:00:00 2001 From: bashonly <88596187+bashonly@users.noreply.github.com> Date: Sun, 18 Jan 2026 10:34:13 -0600 Subject: [PATCH] [ie/youtube] Solve n challenges for manifest formats (#15602) * Solve n challenges in HLS/DASH manifest URL path parameters * Collect all challenges in advance to solve in bulk once per video * Improve & always use the load/store helper methods for player cache Closes #15569, Closes #15586, Closes #15587, Closes #15600 Authored by: bashonly --- yt_dlp/extractor/youtube/_video.py | 301 +++++++++++++++-------------- 1 file changed, 159 insertions(+), 142 deletions(-) diff --git a/yt_dlp/extractor/youtube/_video.py b/yt_dlp/extractor/youtube/_video.py index 5c0f5a6c40..b7525e95a2 100644 --- a/yt_dlp/extractor/youtube/_video.py +++ b/yt_dlp/extractor/youtube/_video.py @@ -10,7 +10,6 @@ import re import sys import threading import time -import traceback import urllib.parse from ._base import ( @@ -63,6 +62,7 @@ from ...utils import ( unescapeHTML, unified_strdate, unsmuggle_url, + update_url, update_url_query, url_or_none, urljoin, @@ -2193,64 +2193,32 @@ class YoutubeIE(YoutubeBaseInfoExtractor): self._code_cache[player_js_key] = code return self._code_cache.get(player_js_key) - def _sig_spec_cache_id(self, player_url, spec_id): - return join_nonempty(self._player_js_cache_key(player_url), str(spec_id)) + def _load_player_data_from_cache(self, name, player_url, *cache_keys, use_disk_cache=False): + cache_id = (f'youtube-{name}', self._player_js_cache_key(player_url), *map(str_or_none, cache_keys)) + if cache_id in self._player_cache: + return self._player_cache[cache_id] - def _load_sig_spec_from_cache(self, spec_cache_id): - # This is almost identical to _load_player_data_from_cache - # I hate it - if spec_cache_id in self._player_cache: - return self._player_cache[spec_cache_id] - spec = self.cache.load('youtube-sigfuncs', spec_cache_id, min_ver='2025.07.21') - if spec: - self._player_cache[spec_cache_id] = spec - return spec + if not use_disk_cache: + return None - def _store_sig_spec_to_cache(self, spec_cache_id, spec): - if spec_cache_id not in self._player_cache: - self._player_cache[spec_cache_id] = spec - self.cache.store('youtube-sigfuncs', spec_cache_id, spec) - - def _load_player_data_from_cache(self, name, player_url): - cache_id = (f'youtube-{name}', self._player_js_cache_key(player_url)) - - if data := self._player_cache.get(cache_id): - return data - - data = self.cache.load(*cache_id, min_ver='2025.07.21') + data = self.cache.load(cache_id[0], join_nonempty(*cache_id[1:]), min_ver='2025.07.21') if data: self._player_cache[cache_id] = data return data - def _cached(self, func, *cache_id): - def inner(*args, **kwargs): - if cache_id not in self._player_cache: - try: - self._player_cache[cache_id] = func(*args, **kwargs) - except ExtractorError as e: - self._player_cache[cache_id] = e - except Exception as e: - self._player_cache[cache_id] = ExtractorError(traceback.format_exc(), cause=e) - - ret = self._player_cache[cache_id] - if isinstance(ret, Exception): - raise ret - return ret - return inner - - def _store_player_data_to_cache(self, name, player_url, data): - cache_id = (f'youtube-{name}', self._player_js_cache_key(player_url)) + def _store_player_data_to_cache(self, data, name, player_url, *cache_keys, use_disk_cache=False): + cache_id = (f'youtube-{name}', self._player_js_cache_key(player_url), *map(str_or_none, cache_keys)) if cache_id not in self._player_cache: - self.cache.store(*cache_id, data) self._player_cache[cache_id] = data + if use_disk_cache: + self.cache.store(cache_id[0], join_nonempty(*cache_id[1:]), data) def _extract_signature_timestamp(self, video_id, player_url, ytcfg=None, fatal=False): """ Extract signatureTimestamp (sts) Required to tell API what sig/player version is in use. """ - CACHE_ENABLED = False # TODO: enable when preprocessed player JS cache is solved/enabled player_sts_override = self._get_player_js_version()[0] if player_sts_override: @@ -2267,15 +2235,17 @@ class YoutubeIE(YoutubeBaseInfoExtractor): self.report_warning(error_msg) return None - if CACHE_ENABLED and (sts := self._load_player_data_from_cache('sts', player_url)): + # TODO: Pass `use_disk_cache=True` when preprocessed player JS cache is solved + if sts := self._load_player_data_from_cache('sts', player_url): return sts if code := self._load_player(video_id, player_url, fatal=fatal): sts = int_or_none(self._search_regex( r'(?:signatureTimestamp|sts)\s*:\s*(?P[0-9]{5})', code, 'JS player signature timestamp', group='sts', fatal=fatal)) - if CACHE_ENABLED and sts: - self._store_player_data_to_cache('sts', player_url, sts) + if sts: + # TODO: Pass `use_disk_cache=True` when preprocessed player JS cache is solved + self._store_player_data_to_cache(sts, 'sts', player_url) return sts @@ -3233,6 +3203,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor): 'audio_quality_ultralow', 'audio_quality_low', 'audio_quality_medium', 'audio_quality_high', # Audio only formats 'small', 'medium', 'large', 'hd720', 'hd1080', 'hd1440', 'hd2160', 'hd2880', 'highres', ]) + skip_player_js = 'js' in self._configuration_arg('player_skip') format_types = self._configuration_arg('formats') all_formats = 'duplicate' in format_types if self._configuration_arg('include_duplicate_formats'): @@ -3278,6 +3249,98 @@ class YoutubeIE(YoutubeBaseInfoExtractor): return language_code, DEFAULT_LANG_VALUE return language_code, -1 + def get_manifest_n_challenge(manifest_url): + if not url_or_none(manifest_url): + return None + # Same pattern that the player JS uses to read/replace the n challenge value + return self._search_regex( + r'/n/([^/]+)/', urllib.parse.urlparse(manifest_url).path, + 'n challenge', default=None) + + n_challenges = set() + s_challenges = set() + + def solve_js_challenges(): + # Solve all n/sig challenges in bulk and store the results in self._player_cache + challenge_requests = [] + if n_challenges: + challenge_requests.append(JsChallengeRequest( + type=JsChallengeType.N, + video_id=video_id, + input=NChallengeInput(challenges=list(n_challenges), player_url=player_url))) + if s_challenges: + cached_sigfuncs = set() + for spec_id in s_challenges: + if self._load_player_data_from_cache('sigfuncs', player_url, spec_id, use_disk_cache=True): + cached_sigfuncs.add(spec_id) + s_challenges.difference_update(cached_sigfuncs) + + challenge_requests.append(JsChallengeRequest( + type=JsChallengeType.SIG, + video_id=video_id, + input=SigChallengeInput( + challenges=[''.join(map(chr, range(spec_id))) for spec_id in s_challenges], + player_url=player_url))) + + if challenge_requests: + for _challenge_request, challenge_response in self._jsc_director.bulk_solve(challenge_requests): + if challenge_response.type == JsChallengeType.SIG: + for challenge, result in challenge_response.output.results.items(): + spec_id = len(challenge) + self._store_player_data_to_cache( + [ord(c) for c in result], 'sigfuncs', + player_url, spec_id, use_disk_cache=True) + if spec_id in s_challenges: + s_challenges.remove(spec_id) + + elif challenge_response.type == JsChallengeType.N: + for challenge, result in challenge_response.output.results.items(): + self._store_player_data_to_cache(result, 'n', player_url, challenge) + if challenge in n_challenges: + n_challenges.remove(challenge) + + # Raise warning if any challenge requests remain + # Depending on type of challenge request + help_message = ( + 'Ensure you have a supported JavaScript runtime and ' + 'challenge solver script distribution installed. ' + 'Review any warnings presented before this message. ' + f'For more details, refer to {_EJS_WIKI_URL}') + if s_challenges: + self.report_warning( + f'Signature solving failed: Some formats may be missing. {help_message}', + video_id=video_id, only_once=True) + if n_challenges: + self.report_warning( + f'n challenge solving failed: Some formats may be missing. {help_message}', + video_id=video_id, only_once=True) + + # Clear challenge sets so that any subsequent call of this function is a no-op + s_challenges.clear() + n_challenges.clear() + + # 1st pass to collect all n/sig challenges so they can later be solved at once in bulk + for streaming_data in traverse_obj(player_responses, (..., 'streamingData', {dict})): + # HTTPS formats + for fmt_stream in traverse_obj(streaming_data, (('formats', 'adaptiveFormats'), ..., {dict})): + fmt_url = fmt_stream.get('url') + s_challenge = None + if not fmt_url: + sc = urllib.parse.parse_qs(fmt_stream.get('signatureCipher')) + fmt_url = traverse_obj(sc, ('url', 0, {url_or_none})) + s_challenge = traverse_obj(sc, ('s', 0)) + + if s_challenge: + s_challenges.add(len(s_challenge)) + + if n_challenge := traverse_obj(fmt_url, ({parse_qs}, 'n', 0)): + n_challenges.add(n_challenge) + + # Manifest formats + n_challenges.update(traverse_obj( + streaming_data, (('hlsManifestUrl', 'dashManifestUrl'), {get_manifest_n_challenge}))) + + # Final pass to extract formats and solve n/sig challenges as needed for pr in player_responses: streaming_data = traverse_obj(pr, 'streamingData') if not streaming_data: @@ -3385,7 +3448,6 @@ class YoutubeIE(YoutubeBaseInfoExtractor): def process_https_formats(): proto = 'https' https_fmts = [] - skip_player_js = 'js' in self._configuration_arg('player_skip') for fmt_stream in streaming_formats: if fmt_stream.get('targetDurationSec'): @@ -3422,8 +3484,8 @@ class YoutubeIE(YoutubeBaseInfoExtractor): # See: https://github.com/yt-dlp/yt-dlp/issues/14883 get_language_code_and_preference(fmt_stream) sc = urllib.parse.parse_qs(fmt_stream.get('signatureCipher')) - fmt_url = url_or_none(try_get(sc, lambda x: x['url'][0])) - encrypted_sig = try_get(sc, lambda x: x['s'][0]) + fmt_url = traverse_obj(sc, ('url', 0, {url_or_none})) + encrypted_sig = traverse_obj(sc, ('s', 0)) if not all((sc, fmt_url, skip_player_js or player_url, encrypted_sig)): msg = f'Some {client_name} client https formats have been skipped as they are missing a URL. ' if client_name in ('web', 'web_safari'): @@ -3444,19 +3506,17 @@ class YoutubeIE(YoutubeBaseInfoExtractor): continue # signature - # Attempt to load sig spec from cache if encrypted_sig: if skip_player_js: continue - spec_cache_id = self._sig_spec_cache_id(player_url, len(encrypted_sig)) - spec = self._load_sig_spec_from_cache(spec_cache_id) - if spec: - self.write_debug(f'Using cached signature function {spec_cache_id}', only_once=True) - fmt_url += '&{}={}'.format(traverse_obj(sc, ('sp', -1)) or 'signature', - solve_sig(encrypted_sig, spec)) - else: - fmt['_jsc_s_challenge'] = encrypted_sig - fmt['_jsc_s_sc'] = sc + solve_js_challenges() + spec = self._load_player_data_from_cache( + 'sigfuncs', player_url, len(encrypted_sig), use_disk_cache=True) + if not spec: + continue + fmt_url += '&{}={}'.format( + traverse_obj(sc, ('sp', -1)) or 'signature', + solve_sig(encrypted_sig, spec)) # n challenge query = parse_qs(fmt_url) @@ -3464,10 +3524,11 @@ class YoutubeIE(YoutubeBaseInfoExtractor): if skip_player_js: continue n_challenge = query['n'][0] - if n_challenge in self._player_cache: - fmt_url = update_url_query(fmt_url, {'n': self._player_cache[n_challenge]}) - else: - fmt['_jsc_n_challenge'] = n_challenge + solve_js_challenges() + n_result = self._load_player_data_from_cache('n', player_url, n_challenge) + if not n_result: + continue + fmt_url = update_url_query(fmt_url, {'n': n_result}) if po_token: fmt_url = update_url_query(fmt_url, {'pot': po_token}) @@ -3484,80 +3545,6 @@ class YoutubeIE(YoutubeBaseInfoExtractor): https_fmts.append(fmt) - # Bulk process sig/n handling - # Retrieve all JSC Sig and n requests for this player response in one go - n_challenges = {} - s_challenges = {} - for fmt in https_fmts: - # This will de-duplicate requests - n_challenge = fmt.pop('_jsc_n_challenge', None) - if n_challenge is not None: - n_challenges.setdefault(n_challenge, []).append(fmt) - - s_challenge = fmt.pop('_jsc_s_challenge', None) - if s_challenge is not None: - s_challenges.setdefault(len(s_challenge), {}).setdefault(s_challenge, []).append(fmt) - - challenge_requests = [] - if n_challenges: - challenge_requests.append(JsChallengeRequest( - type=JsChallengeType.N, - video_id=video_id, - input=NChallengeInput(challenges=list(n_challenges.keys()), player_url=player_url))) - if s_challenges: - challenge_requests.append(JsChallengeRequest( - type=JsChallengeType.SIG, - video_id=video_id, - input=SigChallengeInput(challenges=[''.join(map(chr, range(spec_id))) for spec_id in s_challenges], player_url=player_url))) - - if challenge_requests: - for _challenge_request, challenge_response in self._jsc_director.bulk_solve(challenge_requests): - if challenge_response.type == JsChallengeType.SIG: - for challenge, result in challenge_response.output.results.items(): - spec_id = len(challenge) - spec = [ord(c) for c in result] - self._store_sig_spec_to_cache(self._sig_spec_cache_id(player_url, spec_id), spec) - s_challenge_data = s_challenges.pop(spec_id, {}) - if not s_challenge_data: - continue - for s_challenge, fmts in s_challenge_data.items(): - solved_challenge = solve_sig(s_challenge, spec) - for fmt in fmts: - sc = fmt.pop('_jsc_s_sc') - fmt['url'] += '&{}={}'.format( - traverse_obj(sc, ('sp', -1)) or 'signature', - solved_challenge) - - elif challenge_response.type == JsChallengeType.N: - for challenge, result in challenge_response.output.results.items(): - fmts = n_challenges.pop(challenge, []) - for fmt in fmts: - self._player_cache[challenge] = result - fmt['url'] = update_url_query(fmt['url'], {'n': result}) - - # Raise warning if any challenge requests remain - # Depending on type of challenge request - - help_message = ( - 'Ensure you have a supported JavaScript runtime and ' - 'challenge solver script distribution installed. ' - 'Review any warnings presented before this message. ' - f'For more details, refer to {_EJS_WIKI_URL}') - - if s_challenges: - self.report_warning( - f'Signature solving failed: Some formats may be missing. {help_message}', - video_id=video_id, only_once=True) - if n_challenges: - self.report_warning( - f'n challenge solving failed: Some formats may be missing. {help_message}', - video_id=video_id, only_once=True) - - for cfmts in list(s_challenges.values()) + list(n_challenges.values()): - for fmt in cfmts: - if fmt in https_fmts: - https_fmts.remove(fmt) - for fmt in https_fmts: if (all_formats or 'dashy' in format_types) and fmt['filesize']: yield { @@ -3640,17 +3627,34 @@ class YoutubeIE(YoutubeBaseInfoExtractor): hls_manifest_url = 'hls' not in skip_manifests and streaming_data.get('hlsManifestUrl') if hls_manifest_url: + manifest_path = urllib.parse.urlparse(hls_manifest_url).path + if m := re.fullmatch(r'(?P.+)(?P/(?:file|playlist)/index\.m3u8)', manifest_path): + manifest_path, manifest_suffix = m.group('path', 'suffix') + else: + manifest_suffix = '' + + solved_n = False + n_challenge = get_manifest_n_challenge(hls_manifest_url) + if n_challenge and not skip_player_js: + solve_js_challenges() + n_result = self._load_player_data_from_cache('n', player_url, n_challenge) + if n_result: + manifest_path = manifest_path.replace(f'/n/{n_challenge}', f'/n/{n_result}') + solved_n = n_result in manifest_path + pot_policy: GvsPoTokenPolicy = self._get_default_ytcfg( client_name)['GVS_PO_TOKEN_POLICY'][StreamingProtocol.HLS] require_po_token = gvs_pot_required(pot_policy, is_premium_subscriber, player_token_provided) po_token = gvs_pots.get(client_name, fetch_po_token_func(required=require_po_token or pot_policy.recommended)) if po_token: - hls_manifest_url = hls_manifest_url.rstrip('/') + f'/pot/{po_token}' + manifest_path = manifest_path.rstrip('/') + f'/pot/{po_token}' if client_name not in gvs_pots: gvs_pots[client_name] = po_token + if require_po_token and not po_token and 'missing_pot' not in self._configuration_arg('formats'): self._report_pot_format_skipped(video_id, client_name, 'hls') - else: + elif solved_n or not n_challenge: + hls_manifest_url = update_url(hls_manifest_url, path=f'{manifest_path}{manifest_suffix}') fmts, subs = self._extract_m3u8_formats_and_subtitles( hls_manifest_url, video_id, 'mp4', fatal=False, live=live_status == 'is_live') for sub in traverse_obj(subs, (..., ..., {dict})): @@ -3665,17 +3669,30 @@ class YoutubeIE(YoutubeBaseInfoExtractor): dash_manifest_url = 'dash' not in skip_manifests and streaming_data.get('dashManifestUrl') if dash_manifest_url: + manifest_path = urllib.parse.urlparse(dash_manifest_url).path + + solved_n = False + n_challenge = get_manifest_n_challenge(dash_manifest_url) + if n_challenge and not skip_player_js: + solve_js_challenges() + n_result = self._load_player_data_from_cache('n', player_url, n_challenge) + if n_result: + manifest_path = manifest_path.replace(f'/n/{n_challenge}', f'/n/{n_result}') + solved_n = n_result in manifest_path + pot_policy: GvsPoTokenPolicy = self._get_default_ytcfg( client_name)['GVS_PO_TOKEN_POLICY'][StreamingProtocol.DASH] require_po_token = gvs_pot_required(pot_policy, is_premium_subscriber, player_token_provided) po_token = gvs_pots.get(client_name, fetch_po_token_func(required=require_po_token or pot_policy.recommended)) if po_token: - dash_manifest_url = dash_manifest_url.rstrip('/') + f'/pot/{po_token}' + manifest_path = manifest_path.rstrip('/') + f'/pot/{po_token}' if client_name not in gvs_pots: gvs_pots[client_name] = po_token + if require_po_token and not po_token and 'missing_pot' not in self._configuration_arg('formats'): self._report_pot_format_skipped(video_id, client_name, 'dash') - else: + elif solved_n or not n_challenge: + dash_manifest_url = update_url(dash_manifest_url, path=manifest_path) formats, subs = self._extract_mpd_formats_and_subtitles(dash_manifest_url, video_id, fatal=False) for sub in traverse_obj(subs, (..., ..., {dict})): # TODO: If DASH video requires a PO Token, do the subs also require pot?