1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-07-10 07:18:33 +00:00

move _jwt_is_expired to utils

This commit is contained in:
Michaël De Boey 2025-03-16 23:37:39 +01:00
parent 98a68be9b8
commit 1b1117a9c5
No known key found for this signature in database
12 changed files with 32 additions and 22 deletions

View File

@ -11,6 +11,7 @@
float_or_none,
int_or_none,
js_to_json,
jwt_is_expired,
mimetype2ext,
orderedSet,
parse_age_limit,
@ -653,7 +654,7 @@ def _perform_login(self, username, password):
raise
def _fetch_access_token(self):
if self._jwt_is_expired(self._access_token):
if jwt_is_expired(self._access_token):
try:
self._call_oauth_api({
'grant_type': 'refresh_token',
@ -671,7 +672,7 @@ def _fetch_claims_token(self):
if not self._get_login_info()[0]:
return None
if not self._claims_token or self._jwt_is_expired(self._claims_token):
if not self._claims_token or jwt_is_expired(self._claims_token):
self._claims_token = self._download_json(
'https://services.radio-canada.ca/ott/subscription/v2/gem/Subscriber/profile',
None, 'Downloading claims token', query={'device': 'web'},

View File

@ -69,7 +69,6 @@
int_or_none,
join_nonempty,
js_to_json,
jwt_decode_hs256,
mimetype2ext,
netrc_from_content,
orderedSet,
@ -995,10 +994,6 @@ def _guess_encoding_from_content(content_type, webpage_bytes):
return encoding
@staticmethod
def _jwt_is_expired(token, buffer=300):
return jwt_decode_hs256(token)['exp'] - time.time() < buffer
def __check_blocked(self, content):
first_block = content[:512]
if ('<title>Access to this site is blocked</title>' in content

View File

@ -3,6 +3,7 @@
from ..utils import (
ExtractorError,
jwt_decode_hs256,
jwt_is_expired,
parse_codecs,
try_get,
url_or_none,
@ -86,7 +87,7 @@ class DigitalConcertHallIE(InfoExtractor):
@property
def _access_token_is_expired(self):
return self._jwt_is_expired(self._access_token, 30)
return jwt_is_expired(self._access_token, 30)
def _set_access_token(self, value):
self._access_token = value

View File

@ -8,6 +8,7 @@
ExtractorError,
OnDemandPagedList,
int_or_none,
jwt_is_expired,
mimetype2ext,
qualities,
traverse_obj,
@ -22,7 +23,7 @@ class IwaraBaseIE(InfoExtractor):
def _is_token_expired(self, token, token_type):
# User token TTL == ~3 weeks, Media token TTL == ~1 hour
if self._jwt_is_expired(token, 120):
if jwt_is_expired(token, 120):
self.to_screen(f'{token_type} token has expired')
return True

View File

@ -11,6 +11,7 @@
float_or_none,
int_or_none,
jwt_decode_hs256,
jwt_is_expired,
parse_age_limit,
try_call,
url_or_none,
@ -105,8 +106,9 @@ def _call_login_api(self, endpoint, guest_token, data, note):
'os': ('os', {str}),
})}, data=data)
def _is_token_expired(self, token):
return self._jwt_is_expired(token, 180)
@staticmethod
def _is_token_expired(token):
return jwt_is_expired(token, 180)
def _perform_login(self, username, password):
if self._ACCESS_TOKEN and not self._is_token_expired(self._ACCESS_TOKEN):

View File

@ -9,6 +9,7 @@
determine_ext,
int_or_none,
join_nonempty,
jwt_is_expired,
parse_duration,
parse_iso8601,
try_get,
@ -351,7 +352,7 @@ class MLBTVIE(InfoExtractor):
@property
def _api_headers(self):
if self._jwt_is_expired(self._access_token, 120):
if jwt_is_expired(self._access_token, 120):
self.write_debug('Access token has expired; re-logging in')
self._perform_login(*self._get_login_info())
return {'Authorization': f'Bearer {self._access_token}'}

View File

@ -4,6 +4,7 @@
from ..utils import (
ExtractorError,
int_or_none,
jwt_is_expired,
str_or_none,
traverse_obj,
try_call,
@ -114,7 +115,7 @@ def _real_initialize(self):
self.raise_login_required()
def _get_auth(self):
if self._jwt_is_expired(self._access_token, 120):
if jwt_is_expired(self._access_token, 120):
if not self._refresh_token:
raise ExtractorError(
'Cannot refresh access token, login with yt-dlp or refresh cookies in browser')

View File

@ -1,6 +1,7 @@
from .wrestleuniverse import WrestleUniverseBaseIE
from ..utils import (
int_or_none,
jwt_is_expired,
traverse_obj,
url_or_none,
)
@ -20,7 +21,7 @@ class StacommuBaseIE(WrestleUniverseBaseIE):
@WrestleUniverseBaseIE._TOKEN.getter
def _TOKEN(self):
if self._REAL_TOKEN and self._jwt_is_expired(self._REAL_TOKEN):
if self._REAL_TOKEN and jwt_is_expired(self._REAL_TOKEN):
self._refresh_token()
return self._REAL_TOKEN

View File

@ -14,6 +14,7 @@
get_element_html_by_class,
int_or_none,
jwt_encode_hs256,
jwt_is_expired,
make_archive_id,
merge_dicts,
parse_age_limit,
@ -272,15 +273,15 @@ def _fetch_tokens(self):
access_token = self._get_vrt_cookie(self._ACCESS_TOKEN_COOKIE_NAME)
video_token = self._get_vrt_cookie(self._VIDEO_TOKEN_COOKIE_NAME)
if (access_token and not self._jwt_is_expired(access_token)
and video_token and not self._jwt_is_expired(video_token)):
if (access_token and not jwt_is_expired(access_token)
and video_token and not jwt_is_expired(video_token)):
return access_token, video_token
if has_credentials:
access_token, video_token = self.cache.load(self._NETRC_MACHINE, 'token_data', default=(None, None))
if (access_token and not self._jwt_is_expired(access_token)
and video_token and not self._jwt_is_expired(video_token)):
if (access_token and not jwt_is_expired(access_token)
and video_token and not jwt_is_expired(video_token)):
self.write_debug('Restored tokens from cache')
self._set_cookie(self._TOKEN_COOKIE_DOMAIN, self._ACCESS_TOKEN_COOKIE_NAME, access_token)
self._set_cookie(self._TOKEN_COOKIE_DOMAIN, self._VIDEO_TOKEN_COOKIE_NAME, video_token)
@ -317,12 +318,12 @@ def _get_vrt_cookie(self, cookie_name):
def _perform_login(self, username, password):
refresh_token = self._get_vrt_cookie(self._REFRESH_TOKEN_COOKIE_NAME)
if refresh_token and not self._jwt_is_expired(refresh_token):
if refresh_token and not jwt_is_expired(refresh_token):
self.write_debug('Using refresh token from logged-in cookies; skipping login with credentials')
return
refresh_token = self.cache.load(self._NETRC_MACHINE, 'refresh_token', default=None)
if refresh_token and not self._jwt_is_expired(refresh_token):
if refresh_token and not jwt_is_expired(refresh_token):
self.write_debug('Restored refresh token from cache')
self._set_cookie(self._TOKEN_COOKIE_DOMAIN, self._REFRESH_TOKEN_COOKIE_NAME, refresh_token, path='/vrtmax/sso')
return

View File

@ -8,6 +8,7 @@
from ..utils import (
ExtractorError,
int_or_none,
jwt_is_expired,
traverse_obj,
try_call,
url_basename,
@ -43,7 +44,7 @@ def _TOKEN(self):
self.raise_login_required()
self._TOKEN = token
if not self._REAL_TOKEN or self._jwt_is_expired(self._REAL_TOKEN):
if not self._REAL_TOKEN or jwt_is_expired(self._REAL_TOKEN):
if not self._REFRESH_TOKEN:
raise ExtractorError(
'Expired token. Refresh your cookies in browser and try again', expected=True)

View File

@ -6,6 +6,7 @@
ExtractorError,
int_or_none,
jwt_decode_hs256,
jwt_is_expired,
parse_age_limit,
str_or_none,
try_call,
@ -123,7 +124,7 @@ def _perform_login(self, username, password):
else:
raise ExtractorError(self._LOGIN_HINT, expected=True)
if self._jwt_is_expired(self._USER_TOKEN):
if jwt_is_expired(self._USER_TOKEN):
raise ExtractorError('User token has expired', expected=True)
self._USER_COUNTRY = jwt_decode_hs256(self._USER_TOKEN).get('current_country')

View File

@ -4763,6 +4763,10 @@ def jwt_decode_hs256(jwt):
return json.loads(base64.urlsafe_b64decode(f'{payload_b64}==='))
def jwt_is_expired(token, buffer=300):
return jwt_decode_hs256(token)['exp'] - time.time() < buffer
WINDOWS_VT_MODE = False if os.name == 'nt' else None