1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-06-28 01:18:30 +00:00

Merge branch 'yt-dlp:master' into issue12014

This commit is contained in:
Alan Xiao 2025-04-28 16:15:07 -04:00 committed by GitHub
commit 4d9ee9a552
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 123 additions and 328 deletions

View File

@ -38,3 +38,5 @@ jobs:
run: ruff check --output-format github .
- name: Run autopep8
run: autopep8 --diff .
- name: Check file mode
run: git ls-files --format="%(objectmode) %(path)" yt_dlp/ | ( ! grep -v "^100644" )

View File

@ -1799,9 +1799,6 @@ #### generic
#### vikichannel
* `video_types`: Types of videos to download - one or more of `episodes`, `movies`, `clips`, `trailers`
#### niconico
* `segment_duration`: Segment duration in milliseconds for HLS-DMC formats. Use it at your own risk since this feature **may result in your account termination.**
#### youtubewebarchive
* `check_all`: Try to check more at the cost of more requests. One or more of `thumbnails`, `captures`

View File

@ -30,7 +30,7 @@ def get_suitable_downloader(info_dict, params={}, default=NO_DEFAULT, protocol=N
from .http import HttpFD
from .ism import IsmFD
from .mhtml import MhtmlFD
from .niconico import NiconicoDmcFD, NiconicoLiveFD
from .niconico import NiconicoLiveFD
from .rtmp import RtmpFD
from .rtsp import RtspFD
from .websocket import WebSocketFragmentFD
@ -50,7 +50,6 @@ def get_suitable_downloader(info_dict, params={}, default=NO_DEFAULT, protocol=N
'http_dash_segments_generator': DashSegmentsFD,
'ism': IsmFD,
'mhtml': MhtmlFD,
'niconico_dmc': NiconicoDmcFD,
'niconico_live': NiconicoLiveFD,
'fc2_live': FC2LiveFD,
'websocket_frag': WebSocketFragmentFD,
@ -67,7 +66,6 @@ def shorten_protocol_name(proto, simplify=False):
'rtmp_ffmpeg': 'rtmpF',
'http_dash_segments': 'dash',
'http_dash_segments_generator': 'dashG',
'niconico_dmc': 'dmc',
'websocket_frag': 'WSfrag',
}
if simplify:

View File

@ -2,60 +2,12 @@
import threading
import time
from . import get_suitable_downloader
from .common import FileDownloader
from .external import FFmpegFD
from ..networking import Request
from ..utils import DownloadError, str_or_none, try_get
class NiconicoDmcFD(FileDownloader):
""" Downloading niconico douga from DMC with heartbeat """
def real_download(self, filename, info_dict):
from ..extractor.niconico import NiconicoIE
self.to_screen(f'[{self.FD_NAME}] Downloading from DMC')
ie = NiconicoIE(self.ydl)
info_dict, heartbeat_info_dict = ie._get_heartbeat_info(info_dict)
fd = get_suitable_downloader(info_dict, params=self.params)(self.ydl, self.params)
success = download_complete = False
timer = [None]
heartbeat_lock = threading.Lock()
heartbeat_url = heartbeat_info_dict['url']
heartbeat_data = heartbeat_info_dict['data'].encode()
heartbeat_interval = heartbeat_info_dict.get('interval', 30)
request = Request(heartbeat_url, heartbeat_data)
def heartbeat():
try:
self.ydl.urlopen(request).read()
except Exception:
self.to_screen(f'[{self.FD_NAME}] Heartbeat failed')
with heartbeat_lock:
if not download_complete:
timer[0] = threading.Timer(heartbeat_interval, heartbeat)
timer[0].start()
heartbeat_info_dict['ping']()
self.to_screen('[%s] Heartbeat with %d second interval ...' % (self.FD_NAME, heartbeat_interval))
try:
heartbeat()
if type(fd).__name__ == 'HlsFD':
info_dict.update(ie._extract_m3u8_formats(info_dict['url'], info_dict['id'])[0])
success = fd.real_download(filename, info_dict)
finally:
if heartbeat_lock:
with heartbeat_lock:
timer[0].cancel()
download_complete = True
return success
class NiconicoLiveFD(FileDownloader):
""" Downloads niconico live without being stopped """

View File

@ -9,6 +9,7 @@
ExtractorError,
classproperty,
float_or_none,
parse_qs,
traverse_obj,
url_or_none,
)
@ -91,11 +92,15 @@ def _usp_signing_secret(self):
# Rotates every so often, but hardcode a fallback in case of JS change/breakage before rotation
return self._search_regex(
r'\bUSP_SIGNING_SECRET\s*=\s*(["\'])(?P<secret>(?:(?!\1).)+)', player_js,
'usp signing secret', group='secret', fatal=False) or 'odnInCGqhvtyRTtIiddxtuRtawYYICZP'
'usp signing secret', group='secret', fatal=False) or 'hGDtqMKYVeFdofrAfFmBcrsakaZELajI'
def _real_extract(self, url):
user_id, video_id = self._match_valid_url(url).group('user_id', 'id')
query = {'contentId': f'{user_id}-vod-{video_id}', 'provider': 'universe'}
query = {
'contentId': f'{user_id}-vod-{video_id}',
'provider': 'universe',
**traverse_obj(url, ({parse_qs}, 'uss_token', {'signedKey': -1})),
}
info = self._download_json(self._API_INFO_URL, video_id, query=query, fatal=False)
access = self._download_json(
'https://playback.dacast.com/content/access', video_id,

View File

@ -365,13 +365,15 @@ def _real_initialize(self):
'All videos are only available to registered users', method='password')
def _set_device_id(self, username):
if not self._device_id:
self._device_id = self.cache.load(
self._NETRC_MACHINE, 'device_ids', default={}).get(username)
if self._device_id:
return
device_id_cache = self.cache.load(self._NETRC_MACHINE, 'device_ids', default={})
self._device_id = device_id_cache.get(username)
if self._device_id:
return
self._device_id = str(uuid.uuid4())
self.cache.store(self._NETRC_MACHINE, 'device_ids', {username: self._device_id})
device_id_cache[username] = self._device_id
self.cache.store(self._NETRC_MACHINE, 'device_ids', device_id_cache)
def _perform_login(self, username, password):
try:

View File

@ -16,7 +16,6 @@
determine_ext,
float_or_none,
int_or_none,
join_nonempty,
parse_duration,
parse_iso8601,
parse_qs,
@ -181,13 +180,6 @@ class NiconicoIE(InfoExtractor):
_VALID_URL = r'https?://(?:(?:www\.|secure\.|sp\.)?nicovideo\.jp/watch|nico\.ms)/(?P<id>(?:[a-z]{2})?[0-9]+)'
_NETRC_MACHINE = 'niconico'
_API_HEADERS = {
'X-Frontend-ID': '6',
'X-Frontend-Version': '0',
'X-Niconico-Language': 'en-us',
'Referer': 'https://www.nicovideo.jp/',
'Origin': 'https://www.nicovideo.jp',
}
def _perform_login(self, username, password):
login_ok = True
@ -229,181 +221,6 @@ def _perform_login(self, username, password):
self.report_warning('Unable to log in: bad username or password')
return login_ok
def _get_heartbeat_info(self, info_dict):
video_id, video_src_id, audio_src_id = info_dict['url'].split(':')[1].split('/')
dmc_protocol = info_dict['expected_protocol']
api_data = (
info_dict.get('_api_data')
or self._parse_json(
self._html_search_regex(
'data-api-data="([^"]+)"',
self._download_webpage('https://www.nicovideo.jp/watch/' + video_id, video_id),
'API data', default='{}'),
video_id))
session_api_data = try_get(api_data, lambda x: x['media']['delivery']['movie']['session'])
session_api_endpoint = try_get(session_api_data, lambda x: x['urls'][0])
def ping():
tracking_id = traverse_obj(api_data, ('media', 'delivery', 'trackingId'))
if tracking_id:
tracking_url = update_url_query('https://nvapi.nicovideo.jp/v1/2ab0cbaa/watch', {'t': tracking_id})
watch_request_response = self._download_json(
tracking_url, video_id,
note='Acquiring permission for downloading video', fatal=False,
headers=self._API_HEADERS)
if traverse_obj(watch_request_response, ('meta', 'status')) != 200:
self.report_warning('Failed to acquire permission for playing video. Video download may fail.')
yesno = lambda x: 'yes' if x else 'no'
if dmc_protocol == 'http':
protocol = 'http'
protocol_parameters = {
'http_output_download_parameters': {
'use_ssl': yesno(session_api_data['urls'][0]['isSsl']),
'use_well_known_port': yesno(session_api_data['urls'][0]['isWellKnownPort']),
},
}
elif dmc_protocol == 'hls':
protocol = 'm3u8'
segment_duration = try_get(self._configuration_arg('segment_duration'), lambda x: int(x[0])) or 6000
parsed_token = self._parse_json(session_api_data['token'], video_id)
encryption = traverse_obj(api_data, ('media', 'delivery', 'encryption'))
protocol_parameters = {
'hls_parameters': {
'segment_duration': segment_duration,
'transfer_preset': '',
'use_ssl': yesno(session_api_data['urls'][0]['isSsl']),
'use_well_known_port': yesno(session_api_data['urls'][0]['isWellKnownPort']),
},
}
if 'hls_encryption' in parsed_token and encryption:
protocol_parameters['hls_parameters']['encryption'] = {
parsed_token['hls_encryption']: {
'encrypted_key': encryption['encryptedKey'],
'key_uri': encryption['keyUri'],
},
}
else:
protocol = 'm3u8_native'
else:
raise ExtractorError(f'Unsupported DMC protocol: {dmc_protocol}')
session_response = self._download_json(
session_api_endpoint['url'], video_id,
query={'_format': 'json'},
headers={'Content-Type': 'application/json'},
note='Downloading JSON metadata for {}'.format(info_dict['format_id']),
data=json.dumps({
'session': {
'client_info': {
'player_id': session_api_data.get('playerId'),
},
'content_auth': {
'auth_type': try_get(session_api_data, lambda x: x['authTypes'][session_api_data['protocols'][0]]),
'content_key_timeout': session_api_data.get('contentKeyTimeout'),
'service_id': 'nicovideo',
'service_user_id': session_api_data.get('serviceUserId'),
},
'content_id': session_api_data.get('contentId'),
'content_src_id_sets': [{
'content_src_ids': [{
'src_id_to_mux': {
'audio_src_ids': [audio_src_id],
'video_src_ids': [video_src_id],
},
}],
}],
'content_type': 'movie',
'content_uri': '',
'keep_method': {
'heartbeat': {
'lifetime': session_api_data.get('heartbeatLifetime'),
},
},
'priority': session_api_data['priority'],
'protocol': {
'name': 'http',
'parameters': {
'http_parameters': {
'parameters': protocol_parameters,
},
},
},
'recipe_id': session_api_data.get('recipeId'),
'session_operation_auth': {
'session_operation_auth_by_signature': {
'signature': session_api_data.get('signature'),
'token': session_api_data.get('token'),
},
},
'timing_constraint': 'unlimited',
},
}).encode())
info_dict['url'] = session_response['data']['session']['content_uri']
info_dict['protocol'] = protocol
# get heartbeat info
heartbeat_info_dict = {
'url': session_api_endpoint['url'] + '/' + session_response['data']['session']['id'] + '?_format=json&_method=PUT',
'data': json.dumps(session_response['data']),
# interval, convert milliseconds to seconds, then halve to make a buffer.
'interval': float_or_none(session_api_data.get('heartbeatLifetime'), scale=3000),
'ping': ping,
}
return info_dict, heartbeat_info_dict
def _extract_format_for_quality(self, video_id, audio_quality, video_quality, dmc_protocol):
if not audio_quality.get('isAvailable') or not video_quality.get('isAvailable'):
return None
format_id = '-'.join(
[remove_start(s['id'], 'archive_') for s in (video_quality, audio_quality)] + [dmc_protocol])
vid_qual_label = traverse_obj(video_quality, ('metadata', 'label'))
return {
'url': 'niconico_dmc:{}/{}/{}'.format(video_id, video_quality['id'], audio_quality['id']),
'format_id': format_id,
'format_note': join_nonempty('DMC', vid_qual_label, dmc_protocol.upper(), delim=' '),
'ext': 'mp4', # Session API are used in HTML5, which always serves mp4
'acodec': 'aac',
'vcodec': 'h264',
**traverse_obj(audio_quality, ('metadata', {
'abr': ('bitrate', {float_or_none(scale=1000)}),
'asr': ('samplingRate', {int_or_none}),
})),
**traverse_obj(video_quality, ('metadata', {
'vbr': ('bitrate', {float_or_none(scale=1000)}),
'height': ('resolution', 'height', {int_or_none}),
'width': ('resolution', 'width', {int_or_none}),
})),
'quality': -2 if 'low' in video_quality['id'] else None,
'protocol': 'niconico_dmc',
'expected_protocol': dmc_protocol, # XXX: This is not a documented field
'http_headers': {
'Origin': 'https://www.nicovideo.jp',
'Referer': 'https://www.nicovideo.jp/watch/' + video_id,
},
}
def _yield_dmc_formats(self, api_data, video_id):
dmc_data = traverse_obj(api_data, ('media', 'delivery', 'movie'))
audios = traverse_obj(dmc_data, ('audios', ..., {dict}))
videos = traverse_obj(dmc_data, ('videos', ..., {dict}))
protocols = traverse_obj(dmc_data, ('session', 'protocols', ..., {str}))
if not all((audios, videos, protocols)):
return
for audio_quality, video_quality, protocol in itertools.product(audios, videos, protocols):
if fmt := self._extract_format_for_quality(video_id, audio_quality, video_quality, protocol):
yield fmt
def _yield_dms_formats(self, api_data, video_id):
fmt_filter = lambda _, v: v['isAvailable'] and v['id']
videos = traverse_obj(api_data, ('media', 'domand', 'videos', fmt_filter))
@ -485,8 +302,8 @@ def _real_extract(self, url):
'needs_premium': ('isPremium', {bool}),
'needs_subscription': ('isAdmission', {bool}),
})) or {'needs_auth': True}))
formats = [*self._yield_dmc_formats(api_data, video_id),
*self._yield_dms_formats(api_data, video_id)]
formats = list(self._yield_dms_formats(api_data, video_id))
if not formats:
fail_msg = clean_html(self._html_search_regex(
r'<p[^>]+\bclass="fail-message"[^>]*>(?P<msg>.+?)</p>',

View File

@ -321,6 +321,27 @@ class RaiPlayIE(RaiBaseIE):
'timestamp': 1348495020,
'upload_date': '20120924',
},
}, {
# checking program_info gives false positive for DRM
'url': 'https://www.raiplay.it/video/2022/10/Ad-ogni-costo---Un-giorno-in-Pretura---Puntata-del-15102022-1dfd1295-ea38-4bac-b51e-f87e2881693b.html',
'md5': '572c6f711b7c5f2d670ba419b4ae3b08',
'info_dict': {
'id': '1dfd1295-ea38-4bac-b51e-f87e2881693b',
'ext': 'mp4',
'title': 'Ad ogni costo - Un giorno in Pretura - Puntata del 15/10/2022',
'alt_title': 'St 2022/23 - Un giorno in pretura - Ad ogni costo',
'description': 'md5:4046d97b2687f74f06a8b8270ba5599f',
'uploader': 'Rai 3',
'duration': 3773.0,
'thumbnail': 'https://www.raiplay.it/dl/img/2022/10/12/1665586539957_2048x2048.png',
'creators': ['Rai 3'],
'series': 'Un giorno in pretura',
'season': '2022/23',
'episode': 'Ad ogni costo',
'timestamp': 1665507240,
'upload_date': '20221011',
'release_year': 2025,
},
}, {
'url': 'http://www.raiplay.it/video/2016/11/gazebotraindesi-efebe701-969c-4593-92f3-285f0d1ce750.html?',
'only_matching': True,
@ -340,8 +361,7 @@ def _real_extract(self, url):
media = self._download_json(
f'{base}.json', video_id, 'Downloading video JSON')
if not self.get_param('allow_unplayable_formats'):
if traverse_obj(media, (('program_info', None), 'rights_management', 'rights', 'drm')):
if traverse_obj(media, ('rights_management', 'rights', 'drm')):
self.report_drm(video_id)
video = media['video']

View File

@ -1221,20 +1221,10 @@ class TwitterIE(TwitterBaseIE):
}]
_MEDIA_ID_RE = re.compile(r'_video/(\d+)/')
@property
def _GRAPHQL_ENDPOINT(self):
if self.is_logged_in:
return 'zZXycP0V6H7m-2r0mOnFcA/TweetDetail'
return '2ICDjqPd81tulZcYrtpTuQ/TweetResultByRestId'
_GRAPHQL_ENDPOINT = '2ICDjqPd81tulZcYrtpTuQ/TweetResultByRestId'
def _graphql_to_legacy(self, data, twid):
result = traverse_obj(data, (
'threaded_conversation_with_injections_v2', 'instructions', 0, 'entries',
lambda _, v: v['entryId'] == f'tweet-{twid}', 'content', 'itemContent',
'tweet_results', 'result', ('tweet', None), {dict},
), default={}, get_all=False) if self.is_logged_in else traverse_obj(
data, ('tweetResult', 'result', {dict}), default={})
result = traverse_obj(data, ('tweetResult', 'result', {dict})) or {}
typename = result.get('__typename')
if typename not in ('Tweet', 'TweetWithVisibilityResults', 'TweetTombstone', 'TweetUnavailable', None):
@ -1278,37 +1268,6 @@ def _graphql_to_legacy(self, data, twid):
def _build_graphql_query(self, media_id):
return {
'variables': {
'focalTweetId': media_id,
'includePromotedContent': True,
'with_rux_injections': False,
'withBirdwatchNotes': True,
'withCommunity': True,
'withDownvotePerspective': False,
'withQuickPromoteEligibilityTweetFields': True,
'withReactionsMetadata': False,
'withReactionsPerspective': False,
'withSuperFollowsTweetFields': True,
'withSuperFollowsUserFields': True,
'withV2Timeline': True,
'withVoice': True,
},
'features': {
'graphql_is_translatable_rweb_tweet_is_translatable_enabled': False,
'interactive_text_enabled': True,
'responsive_web_edit_tweet_api_enabled': True,
'responsive_web_enhance_cards_enabled': True,
'responsive_web_graphql_timeline_navigation_enabled': False,
'responsive_web_text_conversations_enabled': False,
'responsive_web_uc_gql_enabled': True,
'standardized_nudges_misinfo': True,
'tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled': False,
'tweetypie_unmention_optimization_enabled': True,
'unified_cards_ad_metadata_container_dynamic_card_content_query_enabled': True,
'verified_phone_label_enabled': False,
'vibe_api_enabled': True,
},
} if self.is_logged_in else {
'variables': {
'tweetId': media_id,
'withCommunity': False,
@ -1717,21 +1676,22 @@ class TwitterSpacesIE(TwitterBaseIE):
_VALID_URL = TwitterBaseIE._BASE_REGEX + r'i/spaces/(?P<id>[0-9a-zA-Z]{13})'
_TESTS = [{
'url': 'https://twitter.com/i/spaces/1RDxlgyvNXzJL',
'url': 'https://twitter.com/i/spaces/1OwxWwQOPlNxQ',
'info_dict': {
'id': '1RDxlgyvNXzJL',
'id': '1OwxWwQOPlNxQ',
'ext': 'm4a',
'title': 'King Carlo e la mossa Kansas City per fare il Grande Centro',
'description': 'Twitter Space participated by annarita digiorgio, Signor Ernesto, Raffaello Colosimo, Simone M. Sepe',
'uploader': r're:Lucio Di Gaetano.*?',
'uploader_id': 'luciodigaetano',
'title': 'Everybody in: @mtbarra & @elonmusk discuss the future of EV charging',
'description': 'Twitter Space participated by Elon Musk',
'live_status': 'was_live',
'timestamp': 1659877956,
'upload_date': '20220807',
'release_timestamp': 1659904215,
'release_date': '20220807',
'release_date': '20230608',
'release_timestamp': 1686256230,
'thumbnail': r're:https?://pbs\.twimg\.com/profile_images/.+',
'timestamp': 1686254250,
'upload_date': '20230608',
'uploader': 'Mary Barra',
'uploader_id': 'mtbarra',
},
'skip': 'No longer available',
'params': {'skip_download': 'm3u8'},
}, {
# post_live/TimedOut but downloadable
'url': 'https://twitter.com/i/spaces/1vAxRAVQWONJl',
@ -1743,9 +1703,10 @@ class TwitterSpacesIE(TwitterBaseIE):
'uploader': 'Google Cloud',
'uploader_id': 'googlecloud',
'live_status': 'post_live',
'thumbnail': r're:https?://pbs\.twimg\.com/profile_images/.+',
'timestamp': 1681409554,
'upload_date': '20230413',
'release_timestamp': 1681839000,
'release_timestamp': 1681839082,
'release_date': '20230418',
'protocol': 'm3u8', # ffmpeg is forced
'container': 'm4a_dash', # audio-only format fixup is applied
@ -1762,6 +1723,9 @@ class TwitterSpacesIE(TwitterBaseIE):
'uploader': '息根とめる',
'uploader_id': 'tomeru_ikinone',
'live_status': 'was_live',
'release_date': '20230601',
'release_timestamp': 1685617200,
'thumbnail': r're:https?://pbs\.twimg\.com/profile_images/.+',
'timestamp': 1685617198,
'upload_date': '20230601',
'protocol': 'm3u8', # ffmpeg is forced
@ -1779,9 +1743,10 @@ class TwitterSpacesIE(TwitterBaseIE):
'uploader': 'Candace Owens',
'uploader_id': 'RealCandaceO',
'live_status': 'was_live',
'thumbnail': r're:https?://pbs\.twimg\.com/profile_images/.+',
'timestamp': 1723931351,
'upload_date': '20240817',
'release_timestamp': 1723932000,
'release_timestamp': 1723932056,
'release_date': '20240817',
'protocol': 'm3u8_native', # not ffmpeg, detected as video space
},
@ -1861,18 +1826,21 @@ def _real_extract(self, url):
return {
'id': space_id,
'title': metadata.get('title'),
'description': f'Twitter Space participated by {participants}',
'uploader': traverse_obj(
metadata, ('creator_results', 'result', 'legacy', 'name')),
'uploader_id': traverse_obj(
metadata, ('creator_results', 'result', 'legacy', 'screen_name')),
'live_status': live_status,
'release_timestamp': try_call(
lambda: int_or_none(metadata['scheduled_start'], scale=1000)),
'timestamp': int_or_none(metadata.get('created_at'), scale=1000),
'formats': formats,
'http_headers': headers,
'live_status': live_status,
**traverse_obj(metadata, {
'title': ('title', {str}),
# started_at is None when stream is_upcoming so fallback to scheduled_start for --wait-for-video
'release_timestamp': (('started_at', 'scheduled_start'), {int_or_none(scale=1000)}, any),
'timestamp': ('created_at', {int_or_none(scale=1000)}),
}),
**traverse_obj(metadata, ('creator_results', 'result', 'legacy', {
'uploader': ('name', {str}),
'uploader_id': ('screen_name', {str_or_none}),
'thumbnail': ('profile_image_url_https', {lambda x: x.replace('_normal', '_400x400')}, {url_or_none}),
})),
}

0
yt_dlp/extractor/vk.py Executable file → Normal file
View File

View File

@ -417,6 +417,8 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
_NETRC_MACHINE = 'youtube'
_COOKIE_HOWTO_WIKI_URL = 'https://github.com/yt-dlp/yt-dlp/wiki/Extractors#exporting-youtube-cookies'
def ucid_or_none(self, ucid):
return self._search_regex(rf'^({self._YT_CHANNEL_UCID_RE})$', ucid, 'UC-id', default=None)
@ -451,17 +453,15 @@ def _preferred_lang(self):
return preferred_lang
def _initialize_consent(self):
cookies = self._get_cookies('https://www.youtube.com/')
if cookies.get('__Secure-3PSID'):
if self._has_auth_cookies:
return
socs = cookies.get('SOCS')
socs = self._youtube_cookies.get('SOCS')
if socs and not socs.value.startswith('CAA'): # not consented
return
self._set_cookie('.youtube.com', 'SOCS', 'CAI', secure=True) # accept all (required for mixes)
def _initialize_pref(self):
cookies = self._get_cookies('https://www.youtube.com/')
pref_cookie = cookies.get('PREF')
pref_cookie = self._youtube_cookies.get('PREF')
pref = {}
if pref_cookie:
try:
@ -472,8 +472,9 @@ def _initialize_pref(self):
self._set_cookie('.youtube.com', name='PREF', value=urllib.parse.urlencode(pref))
def _initialize_cookie_auth(self):
yt_sapisid, yt_1psapisid, yt_3psapisid = self._get_sid_cookies()
if yt_sapisid or yt_1psapisid or yt_3psapisid:
self._passed_auth_cookies = False
if self._has_auth_cookies:
self._passed_auth_cookies = True
self.write_debug('Found YouTube account cookies')
def _real_initialize(self):
@ -492,8 +493,7 @@ def _perform_login(self, username, password):
@property
def _youtube_login_hint(self):
return (f'{self._login_hint(method="cookies")}. Also see '
'https://github.com/yt-dlp/yt-dlp/wiki/Extractors#exporting-youtube-cookies '
return (f'{self._login_hint(method="cookies")}. Also see {self._COOKIE_HOWTO_WIKI_URL} '
'for tips on effectively exporting YouTube cookies')
def _check_login_required(self):
@ -553,12 +553,16 @@ def _make_sid_authorization(scheme, sid, origin, additional_parts):
return f'{scheme} {"_".join(parts)}'
@property
def _youtube_cookies(self):
return self._get_cookies('https://www.youtube.com')
def _get_sid_cookies(self):
"""
Get SAPISID, 1PSAPISID, 3PSAPISID cookie values
@returns sapisid, 1psapisid, 3psapisid
"""
yt_cookies = self._get_cookies('https://www.youtube.com')
yt_cookies = self._youtube_cookies
yt_sapisid = try_call(lambda: yt_cookies['SAPISID'].value)
yt_3papisid = try_call(lambda: yt_cookies['__Secure-3PAPISID'].value)
yt_1papisid = try_call(lambda: yt_cookies['__Secure-1PAPISID'].value)
@ -595,6 +599,31 @@ def _get_sid_authorization_header(self, origin='https://www.youtube.com', user_s
return ' '.join(authorizations)
@property
def is_authenticated(self):
return self._has_auth_cookies
@property
def _has_auth_cookies(self):
yt_sapisid, yt_1psapisid, yt_3psapisid = self._get_sid_cookies()
# YouTube doesn't appear to clear 3PSAPISID when rotating cookies (as of 2025-04-26)
# But LOGIN_INFO is cleared and should exist if logged in
has_login_info = 'LOGIN_INFO' in self._youtube_cookies
return bool(has_login_info and (yt_sapisid or yt_1psapisid or yt_3psapisid))
def _request_webpage(self, *args, **kwargs):
response = super()._request_webpage(*args, **kwargs)
# Check that we are still logged-in and cookies have not rotated after every request
if getattr(self, '_passed_auth_cookies', None) and not self._has_auth_cookies:
self.report_warning(
'The provided YouTube account cookies are no longer valid. '
'They have likely been rotated in the browser as a security measure. '
f'For tips on how to effectively export YouTube cookies, refer to {self._COOKIE_HOWTO_WIKI_URL} .',
only_once=False)
return response
def _call_api(self, ep, query, video_id, fatal=True, headers=None,
note='Downloading API JSON', errnote='Unable to download API page',
context=None, api_key=None, api_hostname=None, default_client='web'):
@ -695,10 +724,6 @@ def _extract_visitor_data(self, *args):
args, [('VISITOR_DATA', ('INNERTUBE_CONTEXT', 'client', 'visitorData'), ('responseContext', 'visitorData'))],
expected_type=str)
@functools.cached_property
def is_authenticated(self):
return bool(self._get_sid_authorization_header())
def extract_ytcfg(self, video_id, webpage):
if not webpage:
return {}

View File

@ -1982,7 +1982,9 @@ def _download_player_url(self, video_id, fatal=False):
def _player_js_cache_key(self, player_url):
player_id = self._extract_player_info(player_url)
player_path = remove_start(urllib.parse.urlparse(player_url).path, f'/s/player/{player_id}/')
variant = self._INVERSE_PLAYER_JS_VARIANT_MAP.get(player_path)
variant = self._INVERSE_PLAYER_JS_VARIANT_MAP.get(player_path) or next((
v for k, v in self._INVERSE_PLAYER_JS_VARIANT_MAP.items()
if re.fullmatch(re.escape(k).replace('en_US', r'[a-zA-Z0-9_]+'), player_path)), None)
if not variant:
self.write_debug(
f'Unable to determine player JS variant\n'
@ -3648,6 +3650,13 @@ def feed_entry(name):
reason = f'{remove_end(reason.strip(), ".")}. {self._youtube_login_hint}'
elif get_first(playability_statuses, ('errorScreen', 'playerCaptchaViewModel', {dict})):
reason += '. YouTube is requiring a captcha challenge before playback'
elif "This content isn't available, try again later" in reason:
reason = (
f'{remove_end(reason.strip(), ".")}. {"Your account" if self.is_authenticated else "The current session"} '
f'has been rate-limited by YouTube for up to an hour. It is recommended to use `-t sleep` to add a delay '
f'between video requests to avoid exceeding the rate limit. For more information, refer to '
f'https://github.com/yt-dlp/yt-dlp/wiki/Extractors#this-content-isnt-available-try-again-later'
)
self.raise_no_formats(reason, expected=True)
keywords = get_first(video_details, 'keywords', expected_type=list) or []