From e3f0d8b731b40176bcc632bf92cfe5149402b202 Mon Sep 17 00:00:00 2001 From: bashonly <88596187+bashonly@users.noreply.github.com> Date: Sun, 25 Jan 2026 17:16:01 -0600 Subject: [PATCH] [ie/tiktok] Solve JS challenges with native Python implementation (#15672) Closes #15418 Authored by: bashonly, DTrombett Co-authored-by: DTrombett --- yt_dlp/extractor/tiktok.py | 109 ++++++++++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 25 deletions(-) diff --git a/yt_dlp/extractor/tiktok.py b/yt_dlp/extractor/tiktok.py index 02ec2b2f45..2729061ca5 100644 --- a/yt_dlp/extractor/tiktok.py +++ b/yt_dlp/extractor/tiktok.py @@ -1,4 +1,6 @@ +import base64 import functools +import hashlib import itertools import json import random @@ -15,6 +17,7 @@ from ..utils import ( UnsupportedError, UserNotLive, determine_ext, + extract_attributes, filter_dict, format_field, int_or_none, @@ -25,13 +28,13 @@ from ..utils import ( qualities, srt_subtitles_timecode, str_or_none, - traverse_obj, truncate_string, try_call, try_get, url_or_none, urlencode_postdata, ) +from ..utils.traversal import find_element, require, traverse_obj class TikTokBaseIE(InfoExtractor): @@ -217,38 +220,94 @@ class TikTokBaseIE(InfoExtractor): raise ExtractorError('Unable to extract aweme detail info', video_id=aweme_id) return self._parse_aweme_video_app(aweme_detail) + def _solve_challenge_and_set_cookie(self, webpage): + challenge_data = traverse_obj(webpage, ( + {find_element(id='cs', html=True)}, {extract_attributes}, 'class', + filter, {lambda x: f'{x}==='}, {base64.b64decode}, {json.loads})) + + if not challenge_data: + if 'Please wait...' in webpage: + raise ExtractorError('Unable to extract challenge data') + raise ExtractorError('Unexpected response from webpage request') + + self.to_screen('Solving JS challenge using native Python implementation') + + expected_digest = traverse_obj(challenge_data, ( + 'v', 'c', {str}, {base64.b64decode}, + {require('challenge expected digest')})) + + base_hash = traverse_obj(challenge_data, ( + 'v', 'a', {str}, {base64.b64decode}, + {hashlib.sha256}, {require('challenge base hash')})) + + for i in range(1_000_001): + number = str(i).encode() + test_hash = base_hash.copy() + test_hash.update(number) + if test_hash.digest() == expected_digest: + challenge_data['d'] = base64.b64encode(number).decode() + break + else: + raise ExtractorError('Unable to solve JS challenge') + + cookie_value = base64.b64encode( + json.dumps(challenge_data, separators=(',', ':')).encode()).decode() + + # At time of writing, the cookie name was _wafchallengeid + cookie_name = traverse_obj(webpage, ( + {find_element(id='wci', html=True)}, {extract_attributes}, + 'class', {require('challenge cookie name')})) + + # Actual JS sets Max-Age=1, but we need to adjust for --sleep-requests and Python slowness + expire_time = int(time.time()) + (self.get_param('sleep_interval_requests') or 0) + 2 + self._set_cookie('.tiktok.com', cookie_name, cookie_value, expire_time=expire_time) + def _extract_web_data_and_status(self, url, video_id, fatal=True): video_data, status = {}, -1 - res = self._download_webpage_handle(url, video_id, fatal=fatal, impersonate=True) - if res is False: + def get_webpage(note='Downloading webpage'): + res = self._download_webpage_handle(url, video_id, note, fatal=fatal, impersonate=True) + if res is False: + return False + + webpage, urlh = res + if urllib.parse.urlparse(urlh.url).path == '/login': + message = 'TikTok is requiring login for access to this content' + if fatal: + self.raise_login_required(message) + self.report_warning(f'{message}. {self._login_hint()}', video_id=video_id) + return False + + return webpage + + webpage = get_webpage() + if webpage is False: return video_data, status - webpage, urlh = res - if urllib.parse.urlparse(urlh.url).path == '/login': - message = 'TikTok is requiring login for access to this content' + universal_data = self._get_universal_data(webpage, video_id) + if not universal_data: + try: + self._solve_challenge_and_set_cookie(webpage) + except ExtractorError as e: + if fatal: + raise + self.report_warning(e.orig_msg, video_id=video_id) + return video_data, status + + webpage = get_webpage(note='Downloading webpage with challenge cookie') + if webpage is False: + return video_data, status + universal_data = self._get_universal_data(webpage, video_id) + + if not universal_data: + message = 'Unable to extract universal data for rehydration' if fatal: - self.raise_login_required(message) - self.report_warning(f'{message}. {self._login_hint()}') + raise ExtractorError(message) + self.report_warning(message, video_id=video_id) return video_data, status - if universal_data := self._get_universal_data(webpage, video_id): - self.write_debug('Found universal data for rehydration') - status = traverse_obj(universal_data, ('webapp.video-detail', 'statusCode', {int})) or 0 - video_data = traverse_obj(universal_data, ('webapp.video-detail', 'itemInfo', 'itemStruct', {dict})) - - elif sigi_data := self._get_sigi_state(webpage, video_id): - self.write_debug('Found sigi state data') - status = traverse_obj(sigi_data, ('VideoPage', 'statusCode', {int})) or 0 - video_data = traverse_obj(sigi_data, ('ItemModule', video_id, {dict})) - - elif next_data := self._search_nextjs_data(webpage, video_id, default={}): - self.write_debug('Found next.js data') - status = traverse_obj(next_data, ('props', 'pageProps', 'statusCode', {int})) or 0 - video_data = traverse_obj(next_data, ('props', 'pageProps', 'itemInfo', 'itemStruct', {dict})) - - elif fatal: - raise ExtractorError('Unable to extract webpage video data') + status = traverse_obj(universal_data, ('webapp.video-detail', 'statusCode', {int})) or 0 + video_data = traverse_obj(universal_data, ('webapp.video-detail', 'itemInfo', 'itemStruct', {dict})) if not traverse_obj(video_data, ('video', {dict})) and traverse_obj(video_data, ('isContentClassified', {bool})): message = 'This post may not be comfortable for some audiences. Log in for access'