1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-07-18 11:18:30 +00:00

extract hash from js

This commit is contained in:
c-basalt 2025-07-02 13:38:42 -04:00
parent c316416b97
commit 512b0e676a

View File

@ -1,8 +1,11 @@
import json import json
import re
from .common import InfoExtractor from .common import InfoExtractor
from ..utils import ( from ..utils import (
ExtractorError,
int_or_none, int_or_none,
js_to_json,
make_archive_id, make_archive_id,
parse_iso8601, parse_iso8601,
str_or_none, str_or_none,
@ -13,9 +16,101 @@
class KhanAcademyBaseIE(InfoExtractor): class KhanAcademyBaseIE(InfoExtractor):
_RUNTIME_JS_URL = None
_MAIN_JS_URL = None
_VALID_URL_TEMPL = r'https?://(?:www\.)?khanacademy\.org/(?P<id>(?:[^/]+/){%s}%s[^?#/&]+)' _VALID_URL_TEMPL = r'https?://(?:www\.)?khanacademy\.org/(?P<id>(?:[^/]+/){%s}%s[^?#/&]+)'
_PUBLISHED_CONTENT_VERSION = 'dc34750f0572c80f5effe7134082fe351143c1e4' def _parse_js_urls(self, webpage):
search = lambda name: self._search_regex(
rf'<script src="(https://cdn\.kastatic\.org/khanacademy/{name}\.[0-9a-f]+\.js)">', webpage, name)
self._RUNTIME_JS_URL = search('runtime')
self._MAIN_JS_URL = search('khanacademy')
def _search_query_js(self, query_name):
# runtime.js contains hash version for each js file, which is needed for building js src url
runtime_js = self._download_webpage(self._RUNTIME_JS_URL, None, 'Downloading runtime.js')
js_hashes = self._search_json(
r'return\s*""\+e\+"\."\+\(', runtime_js, 'js resources', None, end_pattern=r'\)\[e\]\+"\.js"',
transform_source=lambda s: re.sub(r'([\da-f]+):', r'"\1":', s))
# traverse all lazy-loaded js to find query-containing js file
main_js = self._download_webpage(self._MAIN_JS_URL, None, 'Downloading khanacademy.js')
for lazy_load in re.finditer(r'lazy\(function\(\)\{return Promise\.all\(\[(.+?)\]\)\.then', main_js):
for js_name in re.finditer(r'X.e\("([0-9a-f]+)"\)', lazy_load[1]):
if not (js_hash := js_hashes.get(js_name[1])):
self.report_warning(f'{js_name[1]} has no hash record for it, skip')
continue
url = f'https://cdn.kastatic.org/khanacademy/{js_name[1]}.{js_hash}.js'
js_src = self._download_webpage(url, None, f'Downloading {js_name[1]}.js')
if f'query {query_name}' in js_src:
return js_src
raise ExtractorError('Failed to find query js')
def _parse_query_src(self, src):
# extract gql strings for each object
queries = {match['obj_id']: json.loads(js_to_json(match['body'])) for match in re.finditer(
r'function (?P<obj_id>_templateObject\d*)\(\)\{var n=\(0,r\._\)\((?P<body>\[.+?\])\);return', src)}
# extract variable name to object query map at end: `x=o()(_templateObject00(), m, n, k)`
return {
match['name']: {
'sort': match['sort'] is not None,
'query': queries[match['obj_id']][0],
'embeds': match['embeds'].strip(',').split(',') if match['embeds'] else [],
} for match in re.finditer(
r'(?:var |,)(?P<name>[A-Za-z$_]+)=(?P<sort>\(0,s\.Fv\)\()?'
r'o\(\)\((?P<obj_id>_templateObject\d*)\(\)(?P<embeds>(?:,[A-Za-z$_]+)*)\)', src)}
def _sanitize_query(self, query: str):
outlines = []
indent = 0
for line in query.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
if line == '}':
if indent > 2 or outlines[0].startswith('fragment'):
outlines.append(f'{" " * indent}__typename')
indent -= 2
outlines.append(f'{" " * indent}{line}')
if line[-1] == '{':
indent += 2
return '\n'.join(outlines)
def _compose_query(self, queries, key):
def _get_fragments(key):
fragments = [self._sanitize_query(queries[key]['query'])]
for key in queries[key]['embeds']:
fragments.extend(_get_fragments(key))
return fragments
# recursively find all fragments then sort them
queries = _get_fragments(key)
if not (query := next((q for q in queries if q.startswith('query ')), None)):
raise ExtractorError(f'Failed to get query for {key}')
fragments = sorted(set(q for q in queries if q.startswith('fragment ')))
return '\n\n'.join([query, *fragments])
def _string_hash(self, input):
hash = 5381
for char in input[::-1]:
hash = ((hash * 33) ^ ord(char)) & 0xFFFFFFFF
return hash
def _get_query_hash(self, query_name):
if cache := self.cache.load('khanacademy', f'{query_name}-hash'):
# change in hash of runtime.js may indicate change of website version
if cache['runtime_js'] == self._RUNTIME_JS_URL:
return cache['hash']
queries = self._parse_query_src(self._search_query_js(query_name))
for key, query_obj in queries.items():
if f'query {query_name}' in query_obj['query']:
query_hash = self._string_hash(self._compose_query(queries, key))
self.cache.store('khanacademy', f'{query_name}-hash', {
'hash': query_hash, 'runtime_js': self._RUNTIME_JS_URL})
return query_hash
raise ExtractorError(f'Failed to find query object for {query_name}')
def _parse_video(self, video): def _parse_video(self, video):
return { return {
@ -34,19 +129,22 @@ def _parse_video(self, video):
def _real_extract(self, url): def _real_extract(self, url):
display_id = self._match_id(url) display_id = self._match_id(url)
webpage = self._download_webpage(url, display_id)
self._parse_js_urls(webpage)
ka_data = self._search_json(r'__KA_DATA__ \s*=', webpage, 'initial state', display_id)
content = self._download_json( content = self._download_json(
'https://www.khanacademy.org/api/internal/graphql/ContentForPath', display_id, 'https://www.khanacademy.org/api/internal/graphql/ContentForPath', display_id,
query={ query={
'fastly_cacheable': 'persist_until_publish', 'fastly_cacheable': 'persist_until_publish',
'pcv': self._PUBLISHED_CONTENT_VERSION, 'pcv': ka_data['KA-published-content-version'],
'hash': '3712657851', 'hash': self._get_query_hash('ContentForPath'),
'variables': json.dumps({ 'variables': json.dumps({
'path': display_id, 'path': display_id,
'countryCode': 'US', 'countryCode': 'US',
'kaLocale': 'en',
'clientPublishedContentVersion': self._PUBLISHED_CONTENT_VERSION,
}), }),
'lang': 'en', 'lang': 'en',
'app': 'khanacademy',
})['data']['contentRoute']['listedPathData'] })['data']['contentRoute']['listedPathData']
return self._parse_component_props(content, display_id) return self._parse_component_props(content, display_id)