mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2026-01-26 16:51:24 +00:00
[ie/tiktok] Solve JS challenges with native Python implementation (#15672)
Closes #15418 Authored by: bashonly, DTrombett Co-authored-by: DTrombett <d@trombett.org>
This commit is contained in:
@@ -1,4 +1,6 @@
|
|||||||
|
import base64
|
||||||
import functools
|
import functools
|
||||||
|
import hashlib
|
||||||
import itertools
|
import itertools
|
||||||
import json
|
import json
|
||||||
import random
|
import random
|
||||||
@@ -15,6 +17,7 @@ from ..utils import (
|
|||||||
UnsupportedError,
|
UnsupportedError,
|
||||||
UserNotLive,
|
UserNotLive,
|
||||||
determine_ext,
|
determine_ext,
|
||||||
|
extract_attributes,
|
||||||
filter_dict,
|
filter_dict,
|
||||||
format_field,
|
format_field,
|
||||||
int_or_none,
|
int_or_none,
|
||||||
@@ -25,13 +28,13 @@ from ..utils import (
|
|||||||
qualities,
|
qualities,
|
||||||
srt_subtitles_timecode,
|
srt_subtitles_timecode,
|
||||||
str_or_none,
|
str_or_none,
|
||||||
traverse_obj,
|
|
||||||
truncate_string,
|
truncate_string,
|
||||||
try_call,
|
try_call,
|
||||||
try_get,
|
try_get,
|
||||||
url_or_none,
|
url_or_none,
|
||||||
urlencode_postdata,
|
urlencode_postdata,
|
||||||
)
|
)
|
||||||
|
from ..utils.traversal import find_element, require, traverse_obj
|
||||||
|
|
||||||
|
|
||||||
class TikTokBaseIE(InfoExtractor):
|
class TikTokBaseIE(InfoExtractor):
|
||||||
@@ -217,38 +220,94 @@ class TikTokBaseIE(InfoExtractor):
|
|||||||
raise ExtractorError('Unable to extract aweme detail info', video_id=aweme_id)
|
raise ExtractorError('Unable to extract aweme detail info', video_id=aweme_id)
|
||||||
return self._parse_aweme_video_app(aweme_detail)
|
return self._parse_aweme_video_app(aweme_detail)
|
||||||
|
|
||||||
|
def _solve_challenge_and_set_cookie(self, webpage):
|
||||||
|
challenge_data = traverse_obj(webpage, (
|
||||||
|
{find_element(id='cs', html=True)}, {extract_attributes}, 'class',
|
||||||
|
filter, {lambda x: f'{x}==='}, {base64.b64decode}, {json.loads}))
|
||||||
|
|
||||||
|
if not challenge_data:
|
||||||
|
if 'Please wait...' in webpage:
|
||||||
|
raise ExtractorError('Unable to extract challenge data')
|
||||||
|
raise ExtractorError('Unexpected response from webpage request')
|
||||||
|
|
||||||
|
self.to_screen('Solving JS challenge using native Python implementation')
|
||||||
|
|
||||||
|
expected_digest = traverse_obj(challenge_data, (
|
||||||
|
'v', 'c', {str}, {base64.b64decode},
|
||||||
|
{require('challenge expected digest')}))
|
||||||
|
|
||||||
|
base_hash = traverse_obj(challenge_data, (
|
||||||
|
'v', 'a', {str}, {base64.b64decode},
|
||||||
|
{hashlib.sha256}, {require('challenge base hash')}))
|
||||||
|
|
||||||
|
for i in range(1_000_001):
|
||||||
|
number = str(i).encode()
|
||||||
|
test_hash = base_hash.copy()
|
||||||
|
test_hash.update(number)
|
||||||
|
if test_hash.digest() == expected_digest:
|
||||||
|
challenge_data['d'] = base64.b64encode(number).decode()
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise ExtractorError('Unable to solve JS challenge')
|
||||||
|
|
||||||
|
cookie_value = base64.b64encode(
|
||||||
|
json.dumps(challenge_data, separators=(',', ':')).encode()).decode()
|
||||||
|
|
||||||
|
# At time of writing, the cookie name was _wafchallengeid
|
||||||
|
cookie_name = traverse_obj(webpage, (
|
||||||
|
{find_element(id='wci', html=True)}, {extract_attributes},
|
||||||
|
'class', {require('challenge cookie name')}))
|
||||||
|
|
||||||
|
# Actual JS sets Max-Age=1, but we need to adjust for --sleep-requests and Python slowness
|
||||||
|
expire_time = int(time.time()) + (self.get_param('sleep_interval_requests') or 0) + 2
|
||||||
|
self._set_cookie('.tiktok.com', cookie_name, cookie_value, expire_time=expire_time)
|
||||||
|
|
||||||
def _extract_web_data_and_status(self, url, video_id, fatal=True):
|
def _extract_web_data_and_status(self, url, video_id, fatal=True):
|
||||||
video_data, status = {}, -1
|
video_data, status = {}, -1
|
||||||
|
|
||||||
res = self._download_webpage_handle(url, video_id, fatal=fatal, impersonate=True)
|
def get_webpage(note='Downloading webpage'):
|
||||||
if res is False:
|
res = self._download_webpage_handle(url, video_id, note, fatal=fatal, impersonate=True)
|
||||||
|
if res is False:
|
||||||
|
return False
|
||||||
|
|
||||||
|
webpage, urlh = res
|
||||||
|
if urllib.parse.urlparse(urlh.url).path == '/login':
|
||||||
|
message = 'TikTok is requiring login for access to this content'
|
||||||
|
if fatal:
|
||||||
|
self.raise_login_required(message)
|
||||||
|
self.report_warning(f'{message}. {self._login_hint()}', video_id=video_id)
|
||||||
|
return False
|
||||||
|
|
||||||
|
return webpage
|
||||||
|
|
||||||
|
webpage = get_webpage()
|
||||||
|
if webpage is False:
|
||||||
return video_data, status
|
return video_data, status
|
||||||
|
|
||||||
webpage, urlh = res
|
universal_data = self._get_universal_data(webpage, video_id)
|
||||||
if urllib.parse.urlparse(urlh.url).path == '/login':
|
if not universal_data:
|
||||||
message = 'TikTok is requiring login for access to this content'
|
try:
|
||||||
|
self._solve_challenge_and_set_cookie(webpage)
|
||||||
|
except ExtractorError as e:
|
||||||
|
if fatal:
|
||||||
|
raise
|
||||||
|
self.report_warning(e.orig_msg, video_id=video_id)
|
||||||
|
return video_data, status
|
||||||
|
|
||||||
|
webpage = get_webpage(note='Downloading webpage with challenge cookie')
|
||||||
|
if webpage is False:
|
||||||
|
return video_data, status
|
||||||
|
universal_data = self._get_universal_data(webpage, video_id)
|
||||||
|
|
||||||
|
if not universal_data:
|
||||||
|
message = 'Unable to extract universal data for rehydration'
|
||||||
if fatal:
|
if fatal:
|
||||||
self.raise_login_required(message)
|
raise ExtractorError(message)
|
||||||
self.report_warning(f'{message}. {self._login_hint()}')
|
self.report_warning(message, video_id=video_id)
|
||||||
return video_data, status
|
return video_data, status
|
||||||
|
|
||||||
if universal_data := self._get_universal_data(webpage, video_id):
|
status = traverse_obj(universal_data, ('webapp.video-detail', 'statusCode', {int})) or 0
|
||||||
self.write_debug('Found universal data for rehydration')
|
video_data = traverse_obj(universal_data, ('webapp.video-detail', 'itemInfo', 'itemStruct', {dict}))
|
||||||
status = traverse_obj(universal_data, ('webapp.video-detail', 'statusCode', {int})) or 0
|
|
||||||
video_data = traverse_obj(universal_data, ('webapp.video-detail', 'itemInfo', 'itemStruct', {dict}))
|
|
||||||
|
|
||||||
elif sigi_data := self._get_sigi_state(webpage, video_id):
|
|
||||||
self.write_debug('Found sigi state data')
|
|
||||||
status = traverse_obj(sigi_data, ('VideoPage', 'statusCode', {int})) or 0
|
|
||||||
video_data = traverse_obj(sigi_data, ('ItemModule', video_id, {dict}))
|
|
||||||
|
|
||||||
elif next_data := self._search_nextjs_data(webpage, video_id, default={}):
|
|
||||||
self.write_debug('Found next.js data')
|
|
||||||
status = traverse_obj(next_data, ('props', 'pageProps', 'statusCode', {int})) or 0
|
|
||||||
video_data = traverse_obj(next_data, ('props', 'pageProps', 'itemInfo', 'itemStruct', {dict}))
|
|
||||||
|
|
||||||
elif fatal:
|
|
||||||
raise ExtractorError('Unable to extract webpage video data')
|
|
||||||
|
|
||||||
if not traverse_obj(video_data, ('video', {dict})) and traverse_obj(video_data, ('isContentClassified', {bool})):
|
if not traverse_obj(video_data, ('video', {dict})) and traverse_obj(video_data, ('isContentClassified', {bool})):
|
||||||
message = 'This post may not be comfortable for some audiences. Log in for access'
|
message = 'This post may not be comfortable for some audiences. Log in for access'
|
||||||
|
|||||||
Reference in New Issue
Block a user