1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-07-07 05:48:31 +00:00

use jwt_is_expired in CDA extractor

This commit is contained in:
Michaël De Boey 2025-03-17 00:13:34 +01:00
parent baba0a95ed
commit ed9ec2c67e
No known key found for this signature in database

View File

@ -17,11 +17,11 @@
int_or_none, int_or_none,
merge_dicts, merge_dicts,
multipart_encode, multipart_encode,
parse_duration, parse_duration,jwt_is_expired,
traverse_obj, traverse_obj,
try_call, try_call,
try_get, try_get,
urljoin, urljoin, jwt_encode_hs256, jwt_decode_hs256,
) )
@ -151,8 +151,8 @@ def _perform_login(self, username, password):
self._API_HEADERS['User-Agent'] = f'pl.cda 1.0 (version {app_version}; Android {android_version}; {phone_model})' self._API_HEADERS['User-Agent'] = f'pl.cda 1.0 (version {app_version}; Android {android_version}; {phone_model})'
cached_bearer = self.cache.load(self._BEARER_CACHE, username) or {} cached_bearer = self.cache.load(self._BEARER_CACHE, username) or {}
if cached_bearer.get('valid_until', 0) > dt.datetime.now().timestamp() + 5: if not jwt_is_expired(cached_bearer, 5, 'valid_until'):
self._API_HEADERS['Authorization'] = f'Bearer {cached_bearer["token"]}' self._API_HEADERS['Authorization'] = f'Bearer {jwt_decode_hs256(cached_bearer)["token"]}'
return return
password_hash = base64.urlsafe_b64encode(hmac.new( password_hash = base64.urlsafe_b64encode(hmac.new(
@ -169,10 +169,10 @@ def _perform_login(self, username, password):
'login': username, 'login': username,
'password': password_hash, 'password': password_hash,
}) })
self.cache.store(self._BEARER_CACHE, username, { self.cache.store(self._BEARER_CACHE, username, jwt_encode_hs256({
'token': token_res['access_token'], 'token': token_res['access_token'],
'valid_until': token_res['expires_in'] + dt.datetime.now().timestamp(), 'valid_until': token_res['expires_in'] + dt.datetime.now().timestamp(),
}) }, 'cda.pl'))
self._API_HEADERS['Authorization'] = f'Bearer {token_res["access_token"]}' self._API_HEADERS['Authorization'] = f'Bearer {token_res["access_token"]}'
def _real_extract(self, url): def _real_extract(self, url):