1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2026-02-22 08:26:00 +00:00

Merge branch 'master' into jsi

This commit is contained in:
c-basalt
2025-05-24 08:29:56 -04:00
53 changed files with 5188 additions and 245 deletions

View File

@@ -2147,6 +2147,7 @@ from .toggle import (
from .toggo import ToggoIE
from .tonline import TOnlineIE
from .toongoggles import ToonGogglesIE
from .toutiao import ToutiaoIE
from .toutv import TouTvIE
from .toypics import (
ToypicsIE,
@@ -2369,6 +2370,7 @@ from .vimeo import (
VHXEmbedIE,
VimeoAlbumIE,
VimeoChannelIE,
VimeoEventIE,
VimeoGroupsIE,
VimeoIE,
VimeoLikesIE,

View File

@@ -2,7 +2,6 @@ from .common import InfoExtractor
from ..utils import (
clean_html,
merge_dicts,
str_or_none,
traverse_obj,
unified_timestamp,
url_or_none,
@@ -138,13 +137,15 @@ class LRTRadioIE(LRTBaseIE):
'https://www.lrt.lt/radioteka/api/media', video_id,
query={'url': f'/mediateka/irasas/{video_id}/{path}'})
return traverse_obj(media, {
'id': ('id', {int}, {str_or_none}),
'title': ('title', {str}),
'tags': ('tags', ..., 'name', {str}),
'categories': ('playlist_item', 'category', {str}, filter, all, filter),
'description': ('content', {clean_html}, {str}),
'timestamp': ('date', {lambda x: x.replace('.', '/')}, {unified_timestamp}),
'thumbnail': ('playlist_item', 'image', {urljoin('https://www.lrt.lt')}),
'formats': ('playlist_item', 'file', {lambda x: self._extract_m3u8_formats(x, video_id)}),
})
return {
'id': video_id,
'formats': self._extract_m3u8_formats(media['playlist_item']['file'], video_id),
**traverse_obj(media, {
'title': ('title', {str}),
'tags': ('tags', ..., 'name', {str}),
'categories': ('playlist_item', 'category', {str}, filter, all, filter),
'description': ('content', {clean_html}, {str}),
'timestamp': ('date', {lambda x: x.replace('.', '/')}, {unified_timestamp}),
'thumbnail': ('playlist_item', 'image', {urljoin('https://www.lrt.lt')}),
}),
}

View File

@@ -32,7 +32,7 @@ from ..utils import (
urlencode_postdata,
urljoin,
)
from ..utils.traversal import find_element, traverse_obj
from ..utils.traversal import find_element, require, traverse_obj
class NiconicoBaseIE(InfoExtractor):
@@ -283,35 +283,54 @@ class NiconicoIE(NiconicoBaseIE):
lambda _, v: v['id'] == video_fmt['format_id'], 'qualityLevel', {int_or_none}, any)) or -1
yield video_fmt
def _extract_server_response(self, webpage, video_id, fatal=True):
try:
return traverse_obj(
self._parse_json(self._html_search_meta('server-response', webpage) or '', video_id),
('data', 'response', {dict}, {require('server response')}))
except ExtractorError:
if not fatal:
return {}
raise
def _real_extract(self, url):
video_id = self._match_id(url)
try:
webpage, handle = self._download_webpage_handle(
'https://www.nicovideo.jp/watch/' + video_id, video_id)
f'https://www.nicovideo.jp/watch/{video_id}', video_id,
headers=self.geo_verification_headers())
if video_id.startswith('so'):
video_id = self._match_id(handle.url)
api_data = traverse_obj(
self._parse_json(self._html_search_meta('server-response', webpage) or '', video_id),
('data', 'response', {dict}))
if not api_data:
raise ExtractorError('Server response data not found')
api_data = self._extract_server_response(webpage, video_id)
except ExtractorError as e:
try:
api_data = self._download_json(
f'https://www.nicovideo.jp/api/watch/v3/{video_id}?_frontendId=6&_frontendVersion=0&actionTrackId=AAAAAAAAAA_{round(time.time() * 1000)}', video_id,
note='Downloading API JSON', errnote='Unable to fetch data')['data']
f'https://www.nicovideo.jp/api/watch/v3/{video_id}', video_id,
'Downloading API JSON', 'Unable to fetch data', query={
'_frontendId': '6',
'_frontendVersion': '0',
'actionTrackId': f'AAAAAAAAAA_{round(time.time() * 1000)}',
}, headers=self.geo_verification_headers())['data']
except ExtractorError:
if not isinstance(e.cause, HTTPError):
# Raise if original exception was from _parse_json or utils.traversal.require
raise
# The webpage server response has more detailed error info than the API response
webpage = e.cause.response.read().decode('utf-8', 'replace')
error_msg = self._html_search_regex(
r'(?s)<section\s+class="(?:(?:ErrorMessage|WatchExceptionPage-message)\s*)+">(.+?)</section>',
webpage, 'error reason', default=None)
if not error_msg:
reason_code = self._extract_server_response(
webpage, video_id, fatal=False).get('reasonCode')
if not reason_code:
raise
raise ExtractorError(clean_html(error_msg), expected=True)
if reason_code in ('DOMESTIC_VIDEO', 'HIGH_RISK_COUNTRY_VIDEO'):
self.raise_geo_restricted(countries=self._GEO_COUNTRIES)
elif reason_code == 'HIDDEN_VIDEO':
raise ExtractorError(
'The viewing period of this video has expired', expected=True)
elif reason_code == 'DELETED_VIDEO':
raise ExtractorError('This video has been deleted', expected=True)
raise ExtractorError(f'Niconico says: {reason_code}')
availability = self._availability(**(traverse_obj(api_data, ('payment', 'video', {
'needs_premium': ('isPremium', {bool}),

View File

@@ -340,8 +340,9 @@ class PatreonIE(PatreonBaseIE):
'channel_follower_count': ('attributes', 'patron_count', {int_or_none}),
}))
# all-lowercase 'referer' so we can smuggle it to Generic, SproutVideo, Vimeo
headers = {'referer': 'https://patreon.com/'}
# Must be all-lowercase 'referer' so we can smuggle it to Generic, SproutVideo, and Vimeo.
# patreon.com URLs redirect to www.patreon.com; this matters when requesting mux.com m3u8s
headers = {'referer': 'https://www.patreon.com/'}
# handle Vimeo embeds
if traverse_obj(attributes, ('embed', 'provider')) == 'Vimeo':
@@ -352,7 +353,7 @@ class PatreonIE(PatreonBaseIE):
v_url, video_id, 'Checking Vimeo embed URL', headers=headers,
fatal=False, errnote=False, expected_status=429): # 429 is TLS fingerprint rejection
entries.append(self.url_result(
VimeoIE._smuggle_referrer(v_url, 'https://patreon.com/'),
VimeoIE._smuggle_referrer(v_url, headers['referer']),
VimeoIE, url_transparent=True))
embed_url = traverse_obj(attributes, ('embed', 'url', {url_or_none}))
@@ -379,11 +380,13 @@ class PatreonIE(PatreonBaseIE):
'url': post_file['url'],
})
elif name == 'video' or determine_ext(post_file.get('url')) == 'm3u8':
formats, subtitles = self._extract_m3u8_formats_and_subtitles(post_file['url'], video_id)
formats, subtitles = self._extract_m3u8_formats_and_subtitles(
post_file['url'], video_id, headers=headers)
entries.append({
'id': video_id,
'formats': formats,
'subtitles': subtitles,
'http_headers': headers,
})
can_view_post = traverse_obj(attributes, 'current_user_can_view')

View File

@@ -10,7 +10,8 @@ from ..utils import (
class PicartoIE(InfoExtractor):
_VALID_URL = r'https?://(?:www.)?picarto\.tv/(?P<id>[a-zA-Z0-9]+)'
IE_NAME = 'picarto'
_VALID_URL = r'https?://(?:www.)?picarto\.tv/(?P<id>[^/#?]+)/?(?:$|[?#])'
_TEST = {
'url': 'https://picarto.tv/Setz',
'info_dict': {
@@ -89,7 +90,8 @@ class PicartoIE(InfoExtractor):
class PicartoVodIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?picarto\.tv/(?:videopopout|\w+/videos)/(?P<id>[^/?#&]+)'
IE_NAME = 'picarto:vod'
_VALID_URL = r'https?://(?:www\.)?picarto\.tv/(?:videopopout|\w+(?:/profile)?/videos)/(?P<id>[^/?#&]+)'
_TESTS = [{
'url': 'https://picarto.tv/videopopout/ArtofZod_2017.12.12.00.13.23.flv',
'md5': '3ab45ba4352c52ee841a28fb73f2d9ca',
@@ -111,6 +113,18 @@ class PicartoVodIE(InfoExtractor):
'channel': 'ArtofZod',
'age_limit': 18,
},
}, {
'url': 'https://picarto.tv/DrechuArt/profile/videos/400347',
'md5': 'f9ea54868b1d9dec40eb554b484cc7bf',
'info_dict': {
'id': '400347',
'ext': 'mp4',
'title': 'Welcome to the Show',
'thumbnail': r're:^https?://.*\.jpg',
'channel': 'DrechuArt',
'age_limit': 0,
},
}, {
'url': 'https://picarto.tv/videopopout/Plague',
'only_matching': True,

View File

@@ -9,11 +9,10 @@ from ..utils import (
int_or_none,
join_nonempty,
parse_qs,
traverse_obj,
update_url_query,
urlencode_postdata,
)
from ..utils.traversal import unpack
from ..utils.traversal import traverse_obj, unpack
class PlaySuisseIE(InfoExtractor):

View File

@@ -5,11 +5,13 @@ from .common import InfoExtractor
from ..utils import (
OnDemandPagedList,
float_or_none,
int_or_none,
orderedSet,
str_or_none,
str_to_int,
traverse_obj,
unified_timestamp,
url_or_none,
)
from ..utils.traversal import require, traverse_obj
class PodchaserIE(InfoExtractor):
@@ -21,24 +23,25 @@ class PodchaserIE(InfoExtractor):
'id': '104365585',
'title': 'Ep. 285 freeze me off',
'description': 'cam ahn',
'thumbnail': r're:^https?://.*\.jpg$',
'thumbnail': r're:https?://.+/.+\.jpg',
'ext': 'mp3',
'categories': ['Comedy'],
'categories': ['Comedy', 'News', 'Politics', 'Arts'],
'tags': ['comedy', 'dark humor'],
'series': 'Cum Town',
'series': 'The Adam Friedland Show Podcast',
'duration': 3708,
'timestamp': 1636531259,
'upload_date': '20211110',
'average_rating': 4.0,
'series_id': '36924',
},
}, {
'url': 'https://www.podchaser.com/podcasts/the-bone-zone-28853',
'info_dict': {
'id': '28853',
'title': 'The Bone Zone',
'description': 'Podcast by The Bone Zone',
'description': r're:The official home of the Bone Zone podcast.+',
},
'playlist_count': 275,
'playlist_mincount': 275,
}, {
'url': 'https://www.podchaser.com/podcasts/sean-carrolls-mindscape-scienc-699349/episodes',
'info_dict': {
@@ -51,19 +54,33 @@ class PodchaserIE(InfoExtractor):
@staticmethod
def _parse_episode(episode, podcast):
return {
'id': str(episode.get('id')),
'title': episode.get('title'),
'description': episode.get('description'),
'url': episode.get('audio_url'),
'thumbnail': episode.get('image_url'),
'duration': str_to_int(episode.get('length')),
'timestamp': unified_timestamp(episode.get('air_date')),
'average_rating': float_or_none(episode.get('rating')),
'categories': list(set(traverse_obj(podcast, (('summary', None), 'categories', ..., 'text')))),
'tags': traverse_obj(podcast, ('tags', ..., 'text')),
'series': podcast.get('title'),
}
info = traverse_obj(episode, {
'id': ('id', {int}, {str_or_none}, {require('episode ID')}),
'title': ('title', {str}),
'description': ('description', {str}),
'url': ('audio_url', {url_or_none}),
'thumbnail': ('image_url', {url_or_none}),
'duration': ('length', {int_or_none}),
'timestamp': ('air_date', {unified_timestamp}),
'average_rating': ('rating', {float_or_none}),
})
info.update(traverse_obj(podcast, {
'series': ('title', {str}),
'series_id': ('id', {int}, {str_or_none}),
'categories': (('summary', None), 'categories', ..., 'text', {str}, filter, all, {orderedSet}),
'tags': ('tags', ..., 'text', {str}),
}))
info['vcodec'] = 'none'
if info.get('series_id'):
podcast_slug = traverse_obj(podcast, ('slug', {str})) or 'podcast'
episode_slug = traverse_obj(episode, ('slug', {str})) or 'episode'
info['webpage_url'] = '/'.join((
'https://www.podchaser.com/podcasts',
'-'.join((podcast_slug[:30].rstrip('-'), info['series_id'])),
'-'.join((episode_slug[:30].rstrip('-'), info['id']))))
return info
def _call_api(self, path, *args, **kwargs):
return self._download_json(f'https://api.podchaser.com/{path}', *args, **kwargs)
@@ -93,5 +110,5 @@ class PodchaserIE(InfoExtractor):
OnDemandPagedList(functools.partial(self._fetch_page, podcast_id, podcast), self._PAGE_SIZE),
str_or_none(podcast.get('id')), podcast.get('title'), podcast.get('description'))
episode = self._call_api(f'episodes/{episode_id}', episode_id)
episode = self._call_api(f'podcasts/{podcast_id}/episodes/{episode_id}/player_ids', episode_id)
return self._parse_episode(episode, podcast)

View File

@@ -697,7 +697,7 @@ class SoundcloudIE(SoundcloudBaseIE):
try:
return self._extract_info_dict(info, full_title, token)
except ExtractorError as e:
if not isinstance(e.cause, HTTPError) or not e.cause.status == 429:
if not isinstance(e.cause, HTTPError) or e.cause.status != 429:
raise
self.report_warning(
'You have reached the API rate limit, which is ~600 requests per '

121
yt_dlp/extractor/toutiao.py Normal file
View File

@@ -0,0 +1,121 @@
import json
import urllib.parse
from .common import InfoExtractor
from ..utils import (
float_or_none,
int_or_none,
str_or_none,
try_call,
url_or_none,
)
from ..utils.traversal import find_element, traverse_obj
class ToutiaoIE(InfoExtractor):
IE_NAME = 'toutiao'
IE_DESC = '今日头条'
_VALID_URL = r'https?://www\.toutiao\.com/video/(?P<id>\d+)/?(?:[?#]|$)'
_TESTS = [{
'url': 'https://www.toutiao.com/video/7505382061495176511/',
'info_dict': {
'id': '7505382061495176511',
'ext': 'mp4',
'title': '新疆多地现不明飞行物,目击者称和月亮一样亮,几秒内突然加速消失,气象部门回应',
'comment_count': int,
'duration': 9.753,
'like_count': int,
'release_date': '20250517',
'release_timestamp': 1747483344,
'thumbnail': r're:https?://p\d+-sign\.toutiaoimg\.com/.+$',
'uploader': '极目新闻',
'uploader_id': 'MS4wLjABAAAAeateBb9Su8I3MJOZozmvyzWktmba5LMlliRDz1KffnM',
'view_count': int,
},
}, {
'url': 'https://www.toutiao.com/video/7479446610359878153/',
'info_dict': {
'id': '7479446610359878153',
'ext': 'mp4',
'title': '小伙竟然利用两块磁铁制作成磁力减震器,简直太有创意了!',
'comment_count': int,
'duration': 118.374,
'like_count': int,
'release_date': '20250308',
'release_timestamp': 1741444368,
'thumbnail': r're:https?://p\d+-sign\.toutiaoimg\.com/.+$',
'uploader': '小莉创意发明',
'uploader_id': 'MS4wLjABAAAA4f7d4mwtApALtHIiq-QM20dwXqe32NUz0DeWF7wbHKw',
'view_count': int,
},
}]
def _real_initialize(self):
if self._get_cookies('https://www.toutiao.com').get('ttwid'):
return
urlh = self._request_webpage(
'https://ttwid.bytedance.com/ttwid/union/register/', None,
'Fetching ttwid', 'Unable to fetch ttwid', headers={
'Content-Type': 'application/json',
}, data=json.dumps({
'aid': 24,
'needFid': False,
'region': 'cn',
'service': 'www.toutiao.com',
'union': True,
}).encode(),
)
if ttwid := try_call(lambda: self._get_cookies(urlh.url)['ttwid'].value):
self._set_cookie('.toutiao.com', 'ttwid', ttwid)
return
self.raise_login_required()
def _real_extract(self, url):
video_id = self._match_id(url)
webpage = self._download_webpage(url, video_id)
video_data = traverse_obj(webpage, (
{find_element(tag='script', id='RENDER_DATA')},
{urllib.parse.unquote}, {json.loads}, 'data', 'initialVideo',
))
formats = []
for video in traverse_obj(video_data, (
'videoPlayInfo', 'video_list', lambda _, v: v['main_url'],
)):
formats.append({
'url': video['main_url'],
**traverse_obj(video, ('video_meta', {
'acodec': ('audio_profile', {str}),
'asr': ('audio_sample_rate', {int_or_none}),
'audio_channels': ('audio_channels', {float_or_none}, {int_or_none}),
'ext': ('vtype', {str}),
'filesize': ('size', {int_or_none}),
'format_id': ('definition', {str}),
'fps': ('fps', {int_or_none}),
'height': ('vheight', {int_or_none}),
'tbr': ('real_bitrate', {float_or_none(scale=1000)}),
'vcodec': ('codec_type', {str}),
'width': ('vwidth', {int_or_none}),
})),
})
return {
'id': video_id,
'formats': formats,
**traverse_obj(video_data, {
'comment_count': ('commentCount', {int_or_none}),
'duration': ('videoPlayInfo', 'video_duration', {float_or_none}),
'like_count': ('repinCount', {int_or_none}),
'release_timestamp': ('publishTime', {int_or_none}),
'thumbnail': (('poster', 'coverUrl'), {url_or_none}, any),
'title': ('title', {str}),
'uploader': ('userInfo', 'name', {str}),
'uploader_id': ('userInfo', 'userId', {str_or_none}),
'view_count': ('playCount', {int_or_none}),
'webpage_url': ('detailUrl', {url_or_none}),
}),
}

View File

@@ -1,4 +1,5 @@
import base64
import hashlib
import itertools
import re
@@ -16,6 +17,7 @@ from ..utils import (
str_to_int,
try_get,
unified_timestamp,
update_url_query,
url_or_none,
urlencode_postdata,
urljoin,
@@ -171,6 +173,10 @@ class TwitCastingIE(InfoExtractor):
'player': 'pc_web',
})
password_params = {
'word': hashlib.md5(video_password.encode()).hexdigest(),
} if video_password else None
formats = []
# low: 640x360, medium: 1280x720, high: 1920x1080
qq = qualities(['low', 'medium', 'high'])
@@ -178,7 +184,7 @@ class TwitCastingIE(InfoExtractor):
'tc-hls', 'streams', {dict.items}, lambda _, v: url_or_none(v[1]),
)):
formats.append({
'url': m3u8_url,
'url': update_url_query(m3u8_url, password_params),
'format_id': f'hls-{quality}',
'ext': 'mp4',
'quality': qq(quality),
@@ -192,7 +198,7 @@ class TwitCastingIE(InfoExtractor):
'llfmp4', 'streams', {dict.items}, lambda _, v: url_or_none(v[1]),
)):
formats.append({
'url': ws_url,
'url': update_url_query(ws_url, password_params),
'format_id': f'ws-{mode}',
'ext': 'mp4',
'quality': qq(mode),

View File

@@ -187,7 +187,7 @@ class TwitchBaseIE(InfoExtractor):
'url': thumbnail,
}] if thumbnail else None
def _extract_twitch_m3u8_formats(self, path, video_id, token, signature):
def _extract_twitch_m3u8_formats(self, path, video_id, token, signature, live_from_start=False):
formats = self._extract_m3u8_formats(
f'{self._USHER_BASE}/{path}/{video_id}.m3u8', video_id, 'mp4', query={
'allow_source': 'true',
@@ -204,7 +204,10 @@ class TwitchBaseIE(InfoExtractor):
for fmt in formats:
if fmt.get('vcodec') and fmt['vcodec'].startswith('av01'):
# mpegts does not yet have proper support for av1
fmt['downloader_options'] = {'ffmpeg_args_out': ['-f', 'mp4']}
fmt.setdefault('downloader_options', {}).update({'ffmpeg_args_out': ['-f', 'mp4']})
if live_from_start:
fmt.setdefault('downloader_options', {}).update({'ffmpeg_args': ['-live_start_index', '0']})
fmt['is_from_start'] = True
return formats
@@ -550,7 +553,8 @@ class TwitchVodIE(TwitchBaseIE):
access_token = self._download_access_token(vod_id, 'video', 'id')
formats = self._extract_twitch_m3u8_formats(
'vod', vod_id, access_token['value'], access_token['signature'])
'vod', vod_id, access_token['value'], access_token['signature'],
live_from_start=self.get_param('live_from_start'))
formats.extend(self._extract_storyboard(vod_id, video.get('storyboard'), info.get('duration')))
self._prefer_source(formats)
@@ -633,6 +637,10 @@ class TwitchPlaylistBaseIE(TwitchBaseIE):
_PAGE_LIMIT = 100
def _entries(self, channel_name, *args):
"""
Subclasses must define _make_variables() and _extract_entry(),
as well as set _OPERATION_NAME, _ENTRY_KIND, _EDGE_KIND, and _NODE_KIND
"""
cursor = None
variables_common = self._make_variables(channel_name, *args)
entries_key = f'{self._ENTRY_KIND}s'
@@ -672,7 +680,22 @@ class TwitchPlaylistBaseIE(TwitchBaseIE):
break
class TwitchVideosIE(TwitchPlaylistBaseIE):
class TwitchVideosBaseIE(TwitchPlaylistBaseIE):
_OPERATION_NAME = 'FilterableVideoTower_Videos'
_ENTRY_KIND = 'video'
_EDGE_KIND = 'VideoEdge'
_NODE_KIND = 'Video'
@staticmethod
def _make_variables(channel_name, broadcast_type, sort):
return {
'channelOwnerLogin': channel_name,
'broadcastType': broadcast_type,
'videoSort': sort.upper(),
}
class TwitchVideosIE(TwitchVideosBaseIE):
_VALID_URL = r'https?://(?:(?:www|go|m)\.)?twitch\.tv/(?P<id>[^/]+)/(?:videos|profile)'
_TESTS = [{
@@ -751,11 +774,6 @@ class TwitchVideosIE(TwitchPlaylistBaseIE):
'views': 'Popular',
}
_OPERATION_NAME = 'FilterableVideoTower_Videos'
_ENTRY_KIND = 'video'
_EDGE_KIND = 'VideoEdge'
_NODE_KIND = 'Video'
@classmethod
def suitable(cls, url):
return (False
@@ -764,14 +782,6 @@ class TwitchVideosIE(TwitchPlaylistBaseIE):
TwitchVideosCollectionsIE))
else super().suitable(url))
@staticmethod
def _make_variables(channel_name, broadcast_type, sort):
return {
'channelOwnerLogin': channel_name,
'broadcastType': broadcast_type,
'videoSort': sort.upper(),
}
@staticmethod
def _extract_entry(node):
return _make_video_result(node)
@@ -919,7 +929,7 @@ class TwitchVideosCollectionsIE(TwitchPlaylistBaseIE):
playlist_title=f'{channel_name} - Collections')
class TwitchStreamIE(TwitchBaseIE):
class TwitchStreamIE(TwitchVideosBaseIE):
IE_NAME = 'twitch:stream'
_VALID_URL = r'''(?x)
https?://
@@ -982,6 +992,7 @@ class TwitchStreamIE(TwitchBaseIE):
'skip_download': 'Livestream',
},
}]
_PAGE_LIMIT = 1
@classmethod
def suitable(cls, url):
@@ -995,6 +1006,20 @@ class TwitchStreamIE(TwitchBaseIE):
TwitchClipsIE))
else super().suitable(url))
@staticmethod
def _extract_entry(node):
if not isinstance(node, dict) or not node.get('id'):
return None
video_id = node['id']
return {
'_type': 'url',
'ie_key': TwitchVodIE.ie_key(),
'id': 'v' + video_id,
'url': f'https://www.twitch.tv/videos/{video_id}',
'title': node.get('title'),
'timestamp': unified_timestamp(node.get('publishedAt')) or 0,
}
def _real_extract(self, url):
channel_name = self._match_id(url).lower()
@@ -1029,6 +1054,16 @@ class TwitchStreamIE(TwitchBaseIE):
if not stream:
raise UserNotLive(video_id=channel_name)
timestamp = unified_timestamp(stream.get('createdAt'))
if self.get_param('live_from_start'):
self.to_screen(f'{channel_name}: Extracting VOD to download live from start')
entry = next(self._entries(channel_name, None, 'time'), None)
if entry and entry.pop('timestamp') >= (timestamp or float('inf')):
return entry
self.report_warning(
'Unable to extract the VOD associated with this livestream', video_id=channel_name)
access_token = self._download_access_token(
channel_name, 'stream', 'channelName')
@@ -1038,7 +1073,6 @@ class TwitchStreamIE(TwitchBaseIE):
self._prefer_source(formats)
view_count = stream.get('viewers')
timestamp = unified_timestamp(stream.get('createdAt'))
sq_user = try_get(gql, lambda x: x[1]['data']['user'], dict) or {}
uploader = sq_user.get('displayName')

View File

@@ -20,7 +20,6 @@ from ..utils import (
remove_end,
str_or_none,
strip_or_none,
traverse_obj,
truncate_string,
try_call,
try_get,
@@ -29,6 +28,7 @@ from ..utils import (
url_or_none,
xpath_text,
)
from ..utils.traversal import require, traverse_obj
class TwitterBaseIE(InfoExtractor):
@@ -1342,7 +1342,7 @@ class TwitterIE(TwitterBaseIE):
'tweet_mode': 'extended',
})
except ExtractorError as e:
if not isinstance(e.cause, HTTPError) or not e.cause.status == 429:
if not isinstance(e.cause, HTTPError) or e.cause.status != 429:
raise
self.report_warning('Rate-limit exceeded; falling back to syndication endpoint')
status = self._call_syndication_api(twid)
@@ -1596,8 +1596,8 @@ class TwitterAmplifyIE(TwitterBaseIE):
class TwitterBroadcastIE(TwitterBaseIE, PeriscopeBaseIE):
IE_NAME = 'twitter:broadcast'
_VALID_URL = TwitterBaseIE._BASE_REGEX + r'i/broadcasts/(?P<id>[0-9a-zA-Z]{13})'
_VALID_URL = TwitterBaseIE._BASE_REGEX + r'i/(?P<type>broadcasts|events)/(?P<id>\w+)'
_TESTS = [{
# untitled Periscope video
'url': 'https://twitter.com/i/broadcasts/1yNGaQLWpejGj',
@@ -1605,6 +1605,7 @@ class TwitterBroadcastIE(TwitterBaseIE, PeriscopeBaseIE):
'id': '1yNGaQLWpejGj',
'ext': 'mp4',
'title': 'Andrea May Sahouri - Periscope Broadcast',
'display_id': '1yNGaQLWpejGj',
'uploader': 'Andrea May Sahouri',
'uploader_id': 'andreamsahouri',
'uploader_url': 'https://twitter.com/andreamsahouri',
@@ -1612,6 +1613,8 @@ class TwitterBroadcastIE(TwitterBaseIE, PeriscopeBaseIE):
'upload_date': '20200601',
'thumbnail': r're:^https?://[^?#]+\.jpg\?token=',
'view_count': int,
'concurrent_view_count': int,
'live_status': 'was_live',
},
}, {
'url': 'https://twitter.com/i/broadcasts/1ZkKzeyrPbaxv',
@@ -1619,6 +1622,7 @@ class TwitterBroadcastIE(TwitterBaseIE, PeriscopeBaseIE):
'id': '1ZkKzeyrPbaxv',
'ext': 'mp4',
'title': 'Starship | SN10 | High-Altitude Flight Test',
'display_id': '1ZkKzeyrPbaxv',
'uploader': 'SpaceX',
'uploader_id': 'SpaceX',
'uploader_url': 'https://twitter.com/SpaceX',
@@ -1626,6 +1630,8 @@ class TwitterBroadcastIE(TwitterBaseIE, PeriscopeBaseIE):
'upload_date': '20210303',
'thumbnail': r're:^https?://[^?#]+\.jpg\?token=',
'view_count': int,
'concurrent_view_count': int,
'live_status': 'was_live',
},
}, {
'url': 'https://twitter.com/i/broadcasts/1OyKAVQrgzwGb',
@@ -1633,6 +1639,7 @@ class TwitterBroadcastIE(TwitterBaseIE, PeriscopeBaseIE):
'id': '1OyKAVQrgzwGb',
'ext': 'mp4',
'title': 'Starship Flight Test',
'display_id': '1OyKAVQrgzwGb',
'uploader': 'SpaceX',
'uploader_id': 'SpaceX',
'uploader_url': 'https://twitter.com/SpaceX',
@@ -1640,21 +1647,58 @@ class TwitterBroadcastIE(TwitterBaseIE, PeriscopeBaseIE):
'upload_date': '20230420',
'thumbnail': r're:^https?://[^?#]+\.jpg\?token=',
'view_count': int,
'concurrent_view_count': int,
'live_status': 'was_live',
},
}, {
'url': 'https://x.com/i/events/1910629646300762112',
'info_dict': {
'id': '1LyxBWDRNqyKN',
'ext': 'mp4',
'title': '#ガンニバル ウォッチパーティー',
'concurrent_view_count': int,
'display_id': '1910629646300762112',
'live_status': 'was_live',
'release_date': '20250423',
'release_timestamp': 1745409000,
'tags': ['ガンニバル'],
'thumbnail': r're:https?://[^?#]+\.jpg\?token=',
'timestamp': 1745403328,
'upload_date': '20250423',
'uploader': 'ディズニープラス公式',
'uploader_id': 'DisneyPlusJP',
'uploader_url': 'https://twitter.com/DisneyPlusJP',
'view_count': int,
},
}]
def _real_extract(self, url):
broadcast_id = self._match_id(url)
broadcast_type, display_id = self._match_valid_url(url).group('type', 'id')
if broadcast_type == 'events':
timeline = self._call_api(
f'live_event/1/{display_id}/timeline.json', display_id)
broadcast_id = traverse_obj(timeline, (
'twitter_objects', 'broadcasts', ..., ('id', 'broadcast_id'),
{str}, any, {require('broadcast ID')}))
else:
broadcast_id = display_id
broadcast = self._call_api(
'broadcasts/show.json', broadcast_id,
{'ids': broadcast_id})['broadcasts'][broadcast_id]
if not broadcast:
raise ExtractorError('Broadcast no longer exists', expected=True)
info = self._parse_broadcast_data(broadcast, broadcast_id)
info['title'] = broadcast.get('status') or info.get('title')
info['uploader_id'] = broadcast.get('twitter_username') or info.get('uploader_id')
info['uploader_url'] = format_field(broadcast, 'twitter_username', 'https://twitter.com/%s', default=None)
info.update({
'display_id': display_id,
'title': broadcast.get('status') or info.get('title'),
'uploader_id': broadcast.get('twitter_username') or info.get('uploader_id'),
'uploader_url': format_field(
broadcast, 'twitter_username', 'https://twitter.com/%s', default=None),
})
if info['live_status'] == 'is_upcoming':
self.raise_no_formats('This live broadcast has not yet started', expected=True)
return info
media_key = broadcast['media_key']

View File

@@ -3,6 +3,7 @@ import functools
import itertools
import json
import re
import time
import urllib.parse
from .common import InfoExtractor
@@ -13,10 +14,12 @@ from ..utils import (
OnDemandPagedList,
clean_html,
determine_ext,
filter_dict,
get_element_by_class,
int_or_none,
join_nonempty,
js_to_json,
jwt_decode_hs256,
merge_dicts,
parse_filesize,
parse_iso8601,
@@ -39,6 +42,9 @@ class VimeoBaseInfoExtractor(InfoExtractor):
_NETRC_MACHINE = 'vimeo'
_LOGIN_REQUIRED = False
_LOGIN_URL = 'https://vimeo.com/log_in'
_REFERER_HINT = (
'Cannot download embed-only video without embedding URL. Please call yt-dlp '
'with the URL of the page that embeds this video.')
_IOS_CLIENT_AUTH = 'MTMxNzViY2Y0NDE0YTQ5YzhjZTc0YmU0NjVjNDQxYzNkYWVjOWRlOTpHKzRvMmgzVUh4UkxjdU5FRW80cDNDbDhDWGR5dVJLNUJZZ055dHBHTTB4V1VzaG41bEx1a2hiN0NWYWNUcldSSW53dzRUdFRYZlJEZmFoTTArOTBUZkJHS3R4V2llYU04Qnl1bERSWWxUdXRidjNqR2J4SHFpVmtFSUcyRktuQw=='
_IOS_CLIENT_HEADERS = {
'Accept': 'application/vnd.vimeo.*+json; version=3.4.10',
@@ -47,6 +53,7 @@ class VimeoBaseInfoExtractor(InfoExtractor):
}
_IOS_OAUTH_CACHE_KEY = 'oauth-token-ios'
_ios_oauth_token = None
_viewer_info = None
@staticmethod
def _smuggle_referrer(url, referrer_url):
@@ -60,8 +67,21 @@ class VimeoBaseInfoExtractor(InfoExtractor):
headers['Referer'] = data['referer']
return url, data, headers
def _jwt_is_expired(self, token):
return jwt_decode_hs256(token)['exp'] - time.time() < 120
def _fetch_viewer_info(self, display_id=None, fatal=True):
if self._viewer_info and not self._jwt_is_expired(self._viewer_info['jwt']):
return self._viewer_info
self._viewer_info = self._download_json(
'https://vimeo.com/_next/viewer', display_id, 'Downloading web token info',
'Failed to download web token info', fatal=fatal, headers={'Accept': 'application/json'})
return self._viewer_info
def _perform_login(self, username, password):
viewer = self._download_json('https://vimeo.com/_next/viewer', None, 'Downloading login token')
viewer = self._fetch_viewer_info()
data = {
'action': 'login',
'email': username,
@@ -96,11 +116,10 @@ class VimeoBaseInfoExtractor(InfoExtractor):
expected=True)
return password
def _verify_video_password(self, video_id):
def _verify_video_password(self, video_id, path=None):
video_password = self._get_video_password()
token = self._download_json(
'https://vimeo.com/_next/viewer', video_id, 'Downloading viewer info')['xsrft']
url = f'https://vimeo.com/{video_id}'
token = self._fetch_viewer_info(video_id)['xsrft']
url = join_nonempty('https://vimeo.com', path, video_id, delim='/')
try:
self._request_webpage(
f'{url}/password', video_id,
@@ -117,6 +136,10 @@ class VimeoBaseInfoExtractor(InfoExtractor):
raise ExtractorError('Wrong password', expected=True)
raise
def _extract_config_url(self, webpage, **kwargs):
return self._html_search_regex(
r'\bdata-config-url="([^"]+)"', webpage, 'config URL', **kwargs)
def _extract_vimeo_config(self, webpage, video_id, *args, **kwargs):
vimeo_config = self._search_regex(
r'vimeo\.config\s*=\s*(?:({.+?})|_extend\([^,]+,\s+({.+?})\));',
@@ -164,6 +187,7 @@ class VimeoBaseInfoExtractor(InfoExtractor):
sep_pattern = r'/sep/video/'
for files_type in ('hls', 'dash'):
for cdn_name, cdn_data in (try_get(config_files, lambda x: x[files_type]['cdns']) or {}).items():
# TODO: Also extract 'avc_url'? Investigate if there are 'hevc_url', 'av1_url'?
manifest_url = cdn_data.get('url')
if not manifest_url:
continue
@@ -244,7 +268,10 @@ class VimeoBaseInfoExtractor(InfoExtractor):
'formats': formats,
'subtitles': subtitles,
'live_status': live_status,
'release_timestamp': traverse_obj(live_event, ('ingest', 'scheduled_start_time', {parse_iso8601})),
'release_timestamp': traverse_obj(live_event, ('ingest', (
('scheduled_start_time', {parse_iso8601}),
('start_time', {int_or_none}),
), any)),
# Note: Bitrates are completely broken. Single m3u8 may contain entries in kbps and bps
# at the same time without actual units specified.
'_format_sort_fields': ('quality', 'res', 'fps', 'hdr:12', 'source'),
@@ -353,7 +380,7 @@ class VimeoIE(VimeoBaseInfoExtractor):
(?:
(?P<u>user)|
(?!(?:channels|album|showcase)/[^/?#]+/?(?:$|[?#])|[^/]+/review/|ondemand/)
(?:.*?/)??
(?:(?!event/).*?/)??
(?P<q>
(?:
play_redirect_hls|
@@ -933,8 +960,7 @@ class VimeoIE(VimeoBaseInfoExtractor):
r'vimeo\.com/(?:album|showcase)/([^/]+)', url, 'album id', default=None)
if not album_id:
return
viewer = self._download_json(
'https://vimeo.com/_rv/viewer', album_id, fatal=False)
viewer = self._fetch_viewer_info(album_id, fatal=False)
if not viewer:
webpage = self._download_webpage(url, album_id)
viewer = self._parse_json(self._search_regex(
@@ -992,9 +1018,7 @@ class VimeoIE(VimeoBaseInfoExtractor):
raise
errmsg = error.cause.response.read()
if b'Because of its privacy settings, this video cannot be played here' in errmsg:
raise ExtractorError(
'Cannot download embed-only video without embedding URL. Please call yt-dlp '
'with the URL of the page that embeds this video.', expected=True)
raise ExtractorError(self._REFERER_HINT, expected=True)
# 403 == vimeo.com TLS fingerprint or DC IP block; 429 == player.vimeo.com TLS FP block
status = error.cause.status
dcip_msg = 'If you are using a data center IP or VPN/proxy, your IP may be blocked'
@@ -1039,8 +1063,7 @@ class VimeoIE(VimeoBaseInfoExtractor):
channel_id = self._search_regex(
r'vimeo\.com/channels/([^/]+)', url, 'channel id', default=None)
if channel_id:
config_url = self._html_search_regex(
r'\bdata-config-url="([^"]+)"', webpage, 'config URL', default=None)
config_url = self._extract_config_url(webpage, default=None)
video_description = clean_html(get_element_by_class('description', webpage))
info_dict.update({
'channel_id': channel_id,
@@ -1333,8 +1356,7 @@ class VimeoAlbumIE(VimeoBaseInfoExtractor):
def _real_extract(self, url):
album_id = self._match_id(url)
viewer = self._download_json(
'https://vimeo.com/_rv/viewer', album_id, fatal=False)
viewer = self._fetch_viewer_info(album_id, fatal=False)
if not viewer:
webpage = self._download_webpage(url, album_id)
viewer = self._parse_json(self._search_regex(
@@ -1626,3 +1648,377 @@ class VimeoProIE(VimeoBaseInfoExtractor):
return self.url_result(vimeo_url, VimeoIE, video_id, url_transparent=True,
description=description)
class VimeoEventIE(VimeoBaseInfoExtractor):
IE_NAME = 'vimeo:event'
_VALID_URL = r'''(?x)
https?://(?:www\.)?vimeo\.com/event/(?P<id>\d+)(?:/
(?:
(?:embed/)?(?P<unlisted_hash>[\da-f]{10})|
videos/(?P<video_id>\d+)
)
)?'''
_EMBED_REGEX = [r'<iframe\b[^>]+\bsrc=["\'](?P<url>https?://vimeo\.com/event/\d+/embed(?:[/?][^"\']*)?)["\'][^>]*>']
_TESTS = [{
# stream_privacy.view: 'anybody'
'url': 'https://vimeo.com/event/5116195',
'info_dict': {
'id': '1082194134',
'ext': 'mp4',
'display_id': '5116195',
'title': 'Skidmore College Commencement 2025',
'description': 'md5:1902dd5165d21f98aa198297cc729d23',
'uploader': 'Skidmore College',
'uploader_id': 'user116066434',
'uploader_url': 'https://vimeo.com/user116066434',
'comment_count': int,
'like_count': int,
'duration': 9810,
'thumbnail': r're:https://i\.vimeocdn\.com/video/\d+-[\da-f]+-d',
'timestamp': 1747502974,
'upload_date': '20250517',
'release_timestamp': 1747502998,
'release_date': '20250517',
'live_status': 'was_live',
},
'params': {'skip_download': 'm3u8'},
'expected_warnings': ['Failed to parse XML: not well-formed'],
}, {
# stream_privacy.view: 'embed_only'
'url': 'https://vimeo.com/event/5034253/embed',
'info_dict': {
'id': '1071439154',
'ext': 'mp4',
'display_id': '5034253',
'title': 'Advancing Humans with AI',
'description': r're:AI is here to stay, but how do we ensure that people flourish in a world of pervasive AI use.{322}$',
'uploader': 'MIT Media Lab',
'uploader_id': 'mitmedialab',
'uploader_url': 'https://vimeo.com/mitmedialab',
'duration': 23235,
'thumbnail': r're:https://i\.vimeocdn\.com/video/\d+-[\da-f]+-d',
'chapters': 'count:37',
'release_timestamp': 1744290000,
'release_date': '20250410',
'live_status': 'was_live',
},
'params': {
'skip_download': 'm3u8',
'http_headers': {'Referer': 'https://www.media.mit.edu/events/aha-symposium/'},
},
'expected_warnings': ['Failed to parse XML: not well-formed'],
}, {
# Last entry on 2nd page of the 37 video playlist, but use clip_to_play_id API param shortcut
'url': 'https://vimeo.com/event/4753126/videos/1046153257',
'info_dict': {
'id': '1046153257',
'ext': 'mp4',
'display_id': '4753126',
'title': 'January 12, 2025 The True Vine (Pastor John Mindrup)',
'description': 'The True Vine (Pastor \tJohn Mindrup)',
'uploader': 'Salem United Church of Christ',
'uploader_id': 'user230181094',
'uploader_url': 'https://vimeo.com/user230181094',
'comment_count': int,
'like_count': int,
'duration': 4962,
'thumbnail': r're:https://i\.vimeocdn\.com/video/\d+-[\da-f]+-d',
'timestamp': 1736702464,
'upload_date': '20250112',
'release_timestamp': 1736702543,
'release_date': '20250112',
'live_status': 'was_live',
},
'params': {'skip_download': 'm3u8'},
'expected_warnings': ['Failed to parse XML: not well-formed'],
}, {
# "24/7" livestream
'url': 'https://vimeo.com/event/4768062',
'info_dict': {
'id': '1079901414',
'ext': 'mp4',
'display_id': '4768062',
'title': r're:GRACELAND CAM \d{4}-\d{2}-\d{2} \d{2}:\d{2}$',
'description': '24/7 camera at Graceland Mansion',
'uploader': 'Elvis Presley\'s Graceland',
'uploader_id': 'visitgraceland',
'uploader_url': 'https://vimeo.com/visitgraceland',
'release_timestamp': 1745975450,
'release_date': '20250430',
'live_status': 'is_live',
},
'params': {'skip_download': 'livestream'},
}, {
# stream_privacy.view: 'unlisted' with unlisted_hash in URL path (stream_privacy.embed: 'whitelist')
'url': 'https://vimeo.com/event/4259978/3db517c479',
'info_dict': {
'id': '939104114',
'ext': 'mp4',
'display_id': '4259978',
'title': 'Enhancing Credibility in Your Community Science Project',
'description': 'md5:eab953341168b9c146bc3cfe3f716070',
'uploader': 'NOAA Research',
'uploader_id': 'noaaresearch',
'uploader_url': 'https://vimeo.com/noaaresearch',
'comment_count': int,
'like_count': int,
'duration': 3961,
'thumbnail': r're:https://i\.vimeocdn\.com/video/\d+-[\da-f]+-d',
'timestamp': 1716408008,
'upload_date': '20240522',
'release_timestamp': 1716408062,
'release_date': '20240522',
'live_status': 'was_live',
},
'params': {'skip_download': 'm3u8'},
'expected_warnings': ['Failed to parse XML: not well-formed'],
}, {
# "done" event with video_id in URL and unlisted_hash in VimeoIE URL
'url': 'https://vimeo.com/event/595460/videos/498149131/',
'info_dict': {
'id': '498149131',
'ext': 'mp4',
'display_id': '595460',
'title': '2021 Eighth Annual John Cardinal Foley Lecture on Social Communications',
'description': 'Replay: https://vimeo.com/catholicphilly/review/498149131/544f26a12f',
'uploader': 'Kearns Media Consulting LLC',
'uploader_id': 'kearnsmediaconsulting',
'uploader_url': 'https://vimeo.com/kearnsmediaconsulting',
'comment_count': int,
'like_count': int,
'duration': 4466,
'thumbnail': r're:https://i\.vimeocdn\.com/video/\d+-[\da-f]+-d',
'timestamp': 1612228466,
'upload_date': '20210202',
'release_timestamp': 1612228538,
'release_date': '20210202',
'live_status': 'was_live',
},
'params': {'skip_download': 'm3u8'},
'expected_warnings': ['Failed to parse XML: not well-formed'],
}, {
# stream_privacy.view: 'password'; stream_privacy.embed: 'public'
'url': 'https://vimeo.com/event/4940578',
'info_dict': {
'id': '1059263570',
'ext': 'mp4',
'display_id': '4940578',
'title': 'TMAC AKC AGILITY 2-22-2025',
'uploader': 'Paws \'N Effect',
'uploader_id': 'pawsneffect',
'uploader_url': 'https://vimeo.com/pawsneffect',
'comment_count': int,
'like_count': int,
'duration': 33115,
'thumbnail': r're:https://i\.vimeocdn\.com/video/\d+-[\da-f]+-d',
'timestamp': 1740261836,
'upload_date': '20250222',
'release_timestamp': 1740261873,
'release_date': '20250222',
'live_status': 'was_live',
},
'params': {
'videopassword': '22',
'skip_download': 'm3u8',
},
'expected_warnings': ['Failed to parse XML: not well-formed'],
}, {
# API serves a playlist of 37 videos, but the site only streams the newest one (changes every Sunday)
'url': 'https://vimeo.com/event/4753126',
'only_matching': True,
}, {
# Scheduled for 2025.05.15 but never started; "unavailable"; stream_privacy.view: "anybody"
'url': 'https://vimeo.com/event/5120811/embed',
'only_matching': True,
}, {
'url': 'https://vimeo.com/event/5112969/embed?muted=1',
'only_matching': True,
}, {
'url': 'https://vimeo.com/event/5097437/embed/interaction?muted=1',
'only_matching': True,
}, {
'url': 'https://vimeo.com/event/5113032/embed?autoplay=1&muted=1',
'only_matching': True,
}, {
# Ended livestream with video_id
'url': 'https://vimeo.com/event/595460/videos/507329569/',
'only_matching': True,
}, {
# stream_privacy.view: 'unlisted' with unlisted_hash in URL path (stream_privacy.embed: 'public')
'url': 'https://vimeo.com/event/4606123/embed/358d60ce2e',
'only_matching': True,
}]
_WEBPAGE_TESTS = [{
# Same result as https://vimeo.com/event/5034253/embed
'url': 'https://www.media.mit.edu/events/aha-symposium/',
'info_dict': {
'id': '1071439154',
'ext': 'mp4',
'display_id': '5034253',
'title': 'Advancing Humans with AI',
'description': r're:AI is here to stay, but how do we ensure that people flourish in a world of pervasive AI use.{322}$',
'uploader': 'MIT Media Lab',
'uploader_id': 'mitmedialab',
'uploader_url': 'https://vimeo.com/mitmedialab',
'duration': 23235,
'thumbnail': r're:https://i\.vimeocdn\.com/video/\d+-[\da-f]+-d',
'chapters': 'count:37',
'release_timestamp': 1744290000,
'release_date': '20250410',
'live_status': 'was_live',
},
'params': {'skip_download': 'm3u8'},
'expected_warnings': ['Failed to parse XML: not well-formed'],
}]
_EVENT_FIELDS = (
'title', 'uri', 'schedule', 'stream_description', 'stream_privacy.embed', 'stream_privacy.view',
'clip_to_play.name', 'clip_to_play.uri', 'clip_to_play.config_url', 'clip_to_play.live.status',
'clip_to_play.privacy.embed', 'clip_to_play.privacy.view', 'clip_to_play.password',
'streamable_clip.name', 'streamable_clip.uri', 'streamable_clip.config_url', 'streamable_clip.live.status',
)
_VIDEOS_FIELDS = ('items', 'uri', 'name', 'config_url', 'duration', 'live.status')
def _call_events_api(
self, event_id, ep=None, unlisted_hash=None, note=None,
fields=(), referrer=None, query=None, headers=None,
):
resource = join_nonempty('event', ep, note, 'API JSON', delim=' ')
return self._download_json(
join_nonempty(
'https://api.vimeo.com/live_events',
join_nonempty(event_id, unlisted_hash, delim=':'), ep, delim='/'),
event_id, f'Downloading {resource}', f'Failed to download {resource}',
query=filter_dict({
'fields': ','.join(fields) or [],
# Correct spelling with 4 R's is deliberate
'referrer': referrer,
**(query or {}),
}), headers=filter_dict({
'Accept': 'application/json',
'Authorization': f'jwt {self._fetch_viewer_info(event_id)["jwt"]}',
'Referer': referrer,
**(headers or {}),
}))
@staticmethod
def _extract_video_id_and_unlisted_hash(video):
if not traverse_obj(video, ('uri', {lambda x: x.startswith('/videos/')})):
return None, None
video_id, _, unlisted_hash = video['uri'][8:].partition(':')
return video_id, unlisted_hash or None
def _vimeo_url_result(self, video_id, unlisted_hash=None, event_id=None):
# VimeoIE can extract more metadata and formats for was_live event videos
return self.url_result(
join_nonempty('https://vimeo.com', video_id, unlisted_hash, delim='/'), VimeoIE,
video_id, display_id=event_id, live_status='was_live', url_transparent=True)
@classmethod
def _extract_embed_urls(cls, url, webpage):
for embed_url in super()._extract_embed_urls(url, webpage):
yield cls._smuggle_referrer(embed_url, url)
def _real_extract(self, url):
url, _, headers = self._unsmuggle_headers(url)
# XXX: Keep key name in sync with _unsmuggle_headers
referrer = headers.get('Referer')
event_id, unlisted_hash, video_id = self._match_valid_url(url).group('id', 'unlisted_hash', 'video_id')
for retry in (False, True):
try:
live_event_data = self._call_events_api(
event_id, unlisted_hash=unlisted_hash, fields=self._EVENT_FIELDS,
referrer=referrer, query={'clip_to_play_id': video_id or '0'},
headers={'Accept': 'application/vnd.vimeo.*+json;version=3.4.9'})
break
except ExtractorError as e:
if retry or not isinstance(e.cause, HTTPError) or e.cause.status not in (400, 403):
raise
response = traverse_obj(e.cause.response.read(), ({json.loads}, {dict})) or {}
error_code = response.get('error_code')
if error_code == 2204:
self._verify_video_password(event_id, path='event')
continue
if error_code == 3200:
raise ExtractorError(self._REFERER_HINT, expected=True)
if error_msg := response.get('error'):
raise ExtractorError(f'Vimeo says: {error_msg}', expected=True)
raise
# stream_privacy.view can be: 'anybody', 'embed_only', 'nobody', 'password', 'unlisted'
view_policy = live_event_data['stream_privacy']['view']
if view_policy == 'nobody':
raise ExtractorError('This event has not been made available to anyone', expected=True)
clip_data = traverse_obj(live_event_data, ('clip_to_play', {dict})) or {}
# live.status can be: 'streaming' (is_live), 'done' (was_live), 'unavailable' (is_upcoming OR dead)
clip_status = traverse_obj(clip_data, ('live', 'status', {str}))
start_time = traverse_obj(live_event_data, ('schedule', 'start_time', {str}))
release_timestamp = parse_iso8601(start_time)
if clip_status == 'unavailable' and release_timestamp and release_timestamp > time.time():
self.raise_no_formats(f'This live event is scheduled for {start_time}', expected=True)
live_status = 'is_upcoming'
config_url = None
elif view_policy == 'embed_only':
webpage = self._download_webpage(
join_nonempty('https://vimeo.com/event', event_id, 'embed', unlisted_hash, delim='/'),
event_id, 'Downloading embed iframe webpage', impersonate=True, headers=headers)
# The _parse_config result will overwrite live_status w/ 'is_live' if livestream is active
live_status = 'was_live'
config_url = self._extract_config_url(webpage)
else: # view_policy in ('anybody', 'password', 'unlisted')
if video_id:
clip_id, clip_hash = self._extract_video_id_and_unlisted_hash(clip_data)
if video_id == clip_id and clip_status == 'done' and (clip_hash or view_policy != 'unlisted'):
return self._vimeo_url_result(clip_id, clip_hash, event_id)
video_filter = lambda _, v: self._extract_video_id_and_unlisted_hash(v)[0] == video_id
else:
video_filter = lambda _, v: v['live']['status'] in ('streaming', 'done')
for page in itertools.count(1):
videos_data = self._call_events_api(
event_id, 'videos', unlisted_hash=unlisted_hash, note=f'page {page}',
fields=self._VIDEOS_FIELDS, referrer=referrer, query={'page': page},
headers={'Accept': 'application/vnd.vimeo.*;version=3.4.1'})
video = traverse_obj(videos_data, ('data', video_filter, any))
if video or not traverse_obj(videos_data, ('paging', 'next', {str})):
break
live_status = {
'streaming': 'is_live',
'done': 'was_live',
}.get(traverse_obj(video, ('live', 'status', {str})))
if not live_status: # requested video_id is unavailable or no videos are available
raise ExtractorError('This event video is unavailable', expected=True)
elif live_status == 'was_live':
return self._vimeo_url_result(*self._extract_video_id_and_unlisted_hash(video), event_id)
config_url = video['config_url']
if config_url: # view_policy == 'embed_only' or live_status == 'is_live'
info = filter_dict(self._parse_config(
self._download_json(config_url, event_id, 'Downloading config JSON'), event_id))
else: # live_status == 'is_upcoming'
info = {'id': event_id}
if info.get('live_status') == 'post_live':
self.report_warning('This live event recently ended and some formats may not yet be available')
return {
**traverse_obj(live_event_data, {
'title': ('title', {str}),
'description': ('stream_description', {str}),
}),
'display_id': event_id,
'live_status': live_status,
'release_timestamp': release_timestamp,
**info,
}

View File

@@ -45,7 +45,7 @@ class XinpianchangIE(InfoExtractor):
def _real_extract(self, url):
video_id = self._match_id(url)
webpage = self._download_webpage(url, video_id=video_id)
webpage = self._download_webpage(url, video_id=video_id, headers={'Referer': url})
video_data = self._search_nextjs_data(webpage, video_id)['props']['pageProps']['detail']['video']
data = self._download_json(

View File

@@ -35,6 +35,7 @@ from ...utils import (
class _PoTokenContext(enum.Enum):
PLAYER = 'player'
GVS = 'gvs'
SUBS = 'subs'
# any clients starting with _ cannot be explicitly requested by the user
@@ -787,6 +788,7 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
def _download_ytcfg(self, client, video_id):
url = {
'mweb': 'https://m.youtube.com',
'web': 'https://www.youtube.com',
'web_music': 'https://music.youtube.com',
'web_embedded': f'https://www.youtube.com/embed/{video_id}?html5=1',

View File

@@ -23,6 +23,8 @@ from ._base import (
_split_innertube_client,
short_client_name,
)
from .pot._director import initialize_pot_director
from .pot.provider import PoTokenContext, PoTokenRequest
from ...jsinterp import JSInterpreter, PhantomJSwrapper
from ...networking.exceptions import HTTPError
from ...utils import (
@@ -65,9 +67,13 @@ from ...utils import (
urljoin,
variadic,
)
from ...utils.networking import clean_headers, clean_proxies, select_proxy
STREAMING_DATA_CLIENT_NAME = '__yt_dlp_client'
STREAMING_DATA_INITIAL_PO_TOKEN = '__yt_dlp_po_token'
STREAMING_DATA_FETCH_SUBS_PO_TOKEN = '__yt_dlp_fetch_subs_po_token'
STREAMING_DATA_INNERTUBE_CONTEXT = '__yt_dlp_innertube_context'
PO_TOKEN_GUIDE_URL = 'https://github.com/yt-dlp/yt-dlp/wiki/PO-Token-Guide'
@@ -1808,6 +1814,11 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
super().__init__(*args, **kwargs)
self._code_cache = {}
self._player_cache = {}
self._pot_director = None
def _real_initialize(self):
super()._real_initialize()
self._pot_director = initialize_pot_director(self)
def _prepare_live_from_start_formats(self, formats, video_id, live_start_time, url, webpage_url, smuggled_data, is_live):
lock = threading.Lock()
@@ -2854,7 +2865,8 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
continue
def fetch_po_token(self, client='web', context=_PoTokenContext.GVS, ytcfg=None, visitor_data=None,
data_sync_id=None, session_index=None, player_url=None, video_id=None, **kwargs):
data_sync_id=None, session_index=None, player_url=None, video_id=None, webpage=None,
required=False, **kwargs):
"""
Fetch a PO Token for a given client and context. This function will validate required parameters for a given context and client.
@@ -2868,10 +2880,15 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
@param session_index: session index.
@param player_url: player URL.
@param video_id: video ID.
@param webpage: video webpage.
@param required: Whether the PO Token is required (i.e. try to fetch unless policy is "never").
@param kwargs: Additional arguments to pass down. May be more added in the future.
@return: The fetched PO Token. None if it could not be fetched.
"""
# TODO(future): This validation should be moved into pot framework.
# Some sort of middleware or validation provider perhaps?
# GVS WebPO Token is bound to visitor_data / Visitor ID when logged out.
# Must have visitor_data for it to function.
if player_url and context == _PoTokenContext.GVS and not visitor_data and not self.is_authenticated:
@@ -2893,6 +2910,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
f'Got a GVS PO Token for {client} client, but missing Data Sync ID for account. Formats may not work.'
f'You may need to pass a Data Sync ID with --extractor-args "youtube:data_sync_id=XXX"')
self.write_debug(f'{video_id}: Retrieved a {context.value} PO Token for {client} client from config')
return config_po_token
# Require GVS WebPO Token if logged in for external fetching
@@ -2902,7 +2920,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
f'You may need to pass a Data Sync ID with --extractor-args "youtube:data_sync_id=XXX"')
return
return self._fetch_po_token(
po_token = self._fetch_po_token(
client=client,
context=context.value,
ytcfg=ytcfg,
@@ -2911,11 +2929,68 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
session_index=session_index,
player_url=player_url,
video_id=video_id,
video_webpage=webpage,
required=required,
**kwargs,
)
if po_token:
self.write_debug(f'{video_id}: Retrieved a {context.value} PO Token for {client} client')
return po_token
def _fetch_po_token(self, client, **kwargs):
"""(Unstable) External PO Token fetch stub"""
context = kwargs.get('context')
# Avoid fetching PO Tokens when not required
fetch_pot_policy = self._configuration_arg('fetch_pot', [''], ie_key=YoutubeIE)[0]
if fetch_pot_policy not in ('never', 'auto', 'always'):
fetch_pot_policy = 'auto'
if (
fetch_pot_policy == 'never'
or (
fetch_pot_policy == 'auto'
and _PoTokenContext(context) not in self._get_default_ytcfg(client)['PO_TOKEN_REQUIRED_CONTEXTS']
and not kwargs.get('required', False)
)
):
return None
headers = self.get_param('http_headers').copy()
proxies = self._downloader.proxies.copy()
clean_headers(headers)
clean_proxies(proxies, headers)
innertube_host = self._select_api_hostname(None, default_client=client)
pot_request = PoTokenRequest(
context=PoTokenContext(context),
innertube_context=traverse_obj(kwargs, ('ytcfg', 'INNERTUBE_CONTEXT')),
innertube_host=innertube_host,
internal_client_name=client,
session_index=kwargs.get('session_index'),
player_url=kwargs.get('player_url'),
video_webpage=kwargs.get('video_webpage'),
is_authenticated=self.is_authenticated,
visitor_data=kwargs.get('visitor_data'),
data_sync_id=kwargs.get('data_sync_id'),
video_id=kwargs.get('video_id'),
request_cookiejar=self._downloader.cookiejar,
# All requests that would need to be proxied should be in the
# context of www.youtube.com or the innertube host
request_proxy=(
select_proxy('https://www.youtube.com', proxies)
or select_proxy(f'https://{innertube_host}', proxies)
),
request_headers=headers,
request_timeout=self.get_param('socket_timeout'),
request_verify_tls=not self.get_param('nocheckcertificate'),
request_source_address=self.get_param('source_address'),
bypass_cache=False,
)
return self._pot_director.get_po_token(pot_request)
@staticmethod
def _is_agegated(player_response):
@@ -3064,6 +3139,8 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
player_url = self._download_player_url(video_id)
tried_iframe_fallback = True
pr = initial_pr if client == 'web' else None
visitor_data = visitor_data or self._extract_visitor_data(master_ytcfg, initial_pr, player_ytcfg)
data_sync_id = data_sync_id or self._extract_data_sync_id(master_ytcfg, initial_pr, player_ytcfg)
@@ -3073,16 +3150,24 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
'video_id': video_id,
'data_sync_id': data_sync_id if self.is_authenticated else None,
'player_url': player_url if require_js_player else None,
'webpage': webpage,
'session_index': self._extract_session_index(master_ytcfg, player_ytcfg),
'ytcfg': player_ytcfg,
'ytcfg': player_ytcfg or self._get_default_ytcfg(client),
}
player_po_token = self.fetch_po_token(
# Don't need a player PO token for WEB if using player response from webpage
player_po_token = None if pr else self.fetch_po_token(
context=_PoTokenContext.PLAYER, **fetch_po_token_args)
gvs_po_token = self.fetch_po_token(
context=_PoTokenContext.GVS, **fetch_po_token_args)
fetch_subs_po_token_func = functools.partial(
self.fetch_po_token,
context=_PoTokenContext.SUBS,
**fetch_po_token_args,
)
required_pot_contexts = self._get_default_ytcfg(client)['PO_TOKEN_REQUIRED_CONTEXTS']
if (
@@ -3109,7 +3194,6 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
only_once=True)
deprioritize_pr = True
pr = initial_pr if client == 'web' else None
try:
pr = pr or self._extract_player_response(
client, video_id,
@@ -3127,10 +3211,13 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
if pr_id := self._invalid_player_response(pr, video_id):
skipped_clients[client] = pr_id
elif pr:
# Save client name for introspection later
sd = traverse_obj(pr, ('streamingData', {dict})) or {}
# Save client details for introspection later
innertube_context = traverse_obj(player_ytcfg or self._get_default_ytcfg(client), 'INNERTUBE_CONTEXT')
sd = pr.setdefault('streamingData', {})
sd[STREAMING_DATA_CLIENT_NAME] = client
sd[STREAMING_DATA_INITIAL_PO_TOKEN] = gvs_po_token
sd[STREAMING_DATA_INNERTUBE_CONTEXT] = innertube_context
sd[STREAMING_DATA_FETCH_SUBS_PO_TOKEN] = fetch_subs_po_token_func
for f in traverse_obj(sd, (('formats', 'adaptiveFormats'), ..., {dict})):
f[STREAMING_DATA_CLIENT_NAME] = client
f[STREAMING_DATA_INITIAL_PO_TOKEN] = gvs_po_token
@@ -3192,6 +3279,25 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
else:
self.report_warning(msg, only_once=True)
def _report_pot_subtitles_skipped(self, video_id, client_name, msg=None):
msg = msg or (
f'{video_id}: Some {client_name} client subtitles require a PO Token which was not provided. '
'They will be discarded since they are not downloadable as-is. '
f'You can manually pass a Subtitles PO Token for this client with '
f'--extractor-args "youtube:po_token={client_name}.subs+XXX" . '
f'For more information, refer to {PO_TOKEN_GUIDE_URL}')
subs_wanted = any((
self.get_param('writesubtitles'),
self.get_param('writeautomaticsub'),
self.get_param('listsubtitles')))
# Only raise a warning for non-default clients, to not confuse users.
if not subs_wanted or client_name in (*self._DEFAULT_CLIENTS, *self._DEFAULT_AUTHED_CLIENTS):
self.write_debug(msg, only_once=True)
else:
self.report_warning(msg, only_once=True)
def _extract_formats_and_subtitles(self, streaming_data, video_id, player_url, live_status, duration):
CHUNK_SIZE = 10 << 20
PREFERRED_LANG_VALUE = 10
@@ -3483,6 +3589,9 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
hls_manifest_url = hls_manifest_url.rstrip('/') + f'/pot/{po_token}'
fmts, subs = self._extract_m3u8_formats_and_subtitles(
hls_manifest_url, video_id, 'mp4', fatal=False, live=live_status == 'is_live')
for sub in traverse_obj(subs, (..., ..., {dict})):
# HLS subs (m3u8) do not need a PO token; save client name for debugging
sub[STREAMING_DATA_CLIENT_NAME] = client_name
subtitles = self._merge_subtitles(subs, subtitles)
for f in fmts:
if process_manifest_format(f, 'hls', client_name, self._search_regex(
@@ -3494,6 +3603,9 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
if po_token:
dash_manifest_url = dash_manifest_url.rstrip('/') + f'/pot/{po_token}'
formats, subs = self._extract_mpd_formats_and_subtitles(dash_manifest_url, video_id, fatal=False)
for sub in traverse_obj(subs, (..., ..., {dict})):
# TODO: Investigate if DASH subs ever need a PO token; save client name for debugging
sub[STREAMING_DATA_CLIENT_NAME] = client_name
subtitles = self._merge_subtitles(subs, subtitles) # Prioritize HLS subs over DASH
for f in formats:
if process_manifest_format(f, 'dash', client_name, f['format_id'], po_token):
@@ -3685,7 +3797,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
reason = self._get_text(pemr, 'reason') or get_first(playability_statuses, 'reason')
subreason = clean_html(self._get_text(pemr, 'subreason') or '')
if subreason:
if subreason == 'The uploader has not made this video available in your country.':
if subreason.startswith('The uploader has not made this video available in your country'):
countries = get_first(microformats, 'availableCountries')
if not countries:
regions_allowed = search_meta('regionsAllowed')
@@ -3820,47 +3932,85 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
'quality', 'res', 'fps', 'hdr:12', 'source', 'vcodec', 'channels', 'acodec', 'lang', 'proto'),
}
def get_lang_code(track):
return (remove_start(track.get('vssId') or '', '.').replace('.', '-')
or track.get('languageCode'))
def process_language(container, base_url, lang_code, sub_name, client_name, query):
lang_subs = container.setdefault(lang_code, [])
for fmt in self._SUBTITLE_FORMATS:
query = {**query, 'fmt': fmt}
lang_subs.append({
'ext': fmt,
'url': urljoin('https://www.youtube.com', update_url_query(base_url, query)),
'name': sub_name,
STREAMING_DATA_CLIENT_NAME: client_name,
})
subtitles = {}
pctr = traverse_obj(player_responses, (..., 'captions', 'playerCaptionsTracklistRenderer'), expected_type=dict)
if pctr:
def get_lang_code(track):
return (remove_start(track.get('vssId') or '', '.').replace('.', '-')
or track.get('languageCode'))
skipped_subs_clients = set()
# Converted into dicts to remove duplicates
captions = {
get_lang_code(sub): sub
for sub in traverse_obj(pctr, (..., 'captionTracks', ...))}
translation_languages = {
lang.get('languageCode'): self._get_text(lang.get('languageName'), max_runs=1)
for lang in traverse_obj(pctr, (..., 'translationLanguages', ...))}
# Only web/mweb clients provide translationLanguages, so include initial_pr in the traversal
translation_languages = {
lang['languageCode']: self._get_text(lang['languageName'], max_runs=1)
for lang in traverse_obj(player_responses, (
..., 'captions', 'playerCaptionsTracklistRenderer', 'translationLanguages',
lambda _, v: v['languageCode'] and v['languageName']))
}
# NB: Constructing the full subtitle dictionary is slow
get_translated_subs = 'translated_subs' not in self._configuration_arg('skip') and (
self.get_param('writeautomaticsub', False) or self.get_param('listsubtitles'))
def process_language(container, base_url, lang_code, sub_name, query):
lang_subs = container.setdefault(lang_code, [])
for fmt in self._SUBTITLE_FORMATS:
query.update({
'fmt': fmt,
})
lang_subs.append({
'ext': fmt,
'url': urljoin('https://www.youtube.com', update_url_query(base_url, query)),
'name': sub_name,
})
# Filter out initial_pr which does not have streamingData (smuggled client context)
prs = traverse_obj(player_responses, (
lambda _, v: v['streamingData'] and v['captions']['playerCaptionsTracklistRenderer']))
all_captions = traverse_obj(prs, (
..., 'captions', 'playerCaptionsTracklistRenderer', 'captionTracks', ..., {dict}))
need_subs_langs = {get_lang_code(sub) for sub in all_captions if sub.get('kind') != 'asr'}
need_caps_langs = {
remove_start(get_lang_code(sub), 'a-')
for sub in all_captions if sub.get('kind') == 'asr'}
# NB: Constructing the full subtitle dictionary is slow
get_translated_subs = 'translated_subs' not in self._configuration_arg('skip') and (
self.get_param('writeautomaticsub', False) or self.get_param('listsubtitles'))
for lang_code, caption_track in captions.items():
base_url = caption_track.get('baseUrl')
orig_lang = parse_qs(base_url).get('lang', [None])[-1]
if not base_url:
continue
for pr in prs:
pctr = pr['captions']['playerCaptionsTracklistRenderer']
client_name = pr['streamingData'][STREAMING_DATA_CLIENT_NAME]
innertube_client_name = pr['streamingData'][STREAMING_DATA_INNERTUBE_CONTEXT]['client']['clientName']
required_contexts = self._get_default_ytcfg(client_name)['PO_TOKEN_REQUIRED_CONTEXTS']
fetch_subs_po_token_func = pr['streamingData'][STREAMING_DATA_FETCH_SUBS_PO_TOKEN]
pot_params = {}
already_fetched_pot = False
for caption_track in traverse_obj(pctr, ('captionTracks', lambda _, v: v['baseUrl'])):
base_url = caption_track['baseUrl']
qs = parse_qs(base_url)
lang_code = get_lang_code(caption_track)
requires_pot = (
# We can detect the experiment for now
any(e in traverse_obj(qs, ('exp', ...)) for e in ('xpe', 'xpv'))
or _PoTokenContext.SUBS in required_contexts)
if not already_fetched_pot:
already_fetched_pot = True
if subs_po_token := fetch_subs_po_token_func(required=requires_pot):
pot_params.update({
'pot': subs_po_token,
'potc': '1',
'c': innertube_client_name,
})
if not pot_params and requires_pot:
skipped_subs_clients.add(client_name)
self._report_pot_subtitles_skipped(video_id, client_name)
break
orig_lang = qs.get('lang', [None])[-1]
lang_name = self._get_text(caption_track, 'name', max_runs=1)
if caption_track.get('kind') != 'asr':
if not lang_code:
continue
process_language(
subtitles, base_url, lang_code, lang_name, {})
subtitles, base_url, lang_code, lang_name, client_name, pot_params)
if not caption_track.get('isTranslatable'):
continue
for trans_code, trans_name in translation_languages.items():
@@ -3880,10 +4030,25 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
# Add an "-orig" label to the original language so that it can be distinguished.
# The subs are returned without "-orig" as well for compatibility
process_language(
automatic_captions, base_url, f'{trans_code}-orig', f'{trans_name} (Original)', {})
automatic_captions, base_url, f'{trans_code}-orig',
f'{trans_name} (Original)', client_name, pot_params)
# Setting tlang=lang returns damaged subtitles.
process_language(automatic_captions, base_url, trans_code, trans_name,
{} if orig_lang == orig_trans_code else {'tlang': trans_code})
process_language(
automatic_captions, base_url, trans_code, trans_name, client_name,
pot_params if orig_lang == orig_trans_code else {'tlang': trans_code, **pot_params})
# Avoid duplication if we've already got everything we need
need_subs_langs.difference_update(subtitles)
need_caps_langs.difference_update(automatic_captions)
if not (need_subs_langs or need_caps_langs):
break
if skipped_subs_clients and (need_subs_langs or need_caps_langs):
self._report_pot_subtitles_skipped(video_id, True, msg=join_nonempty(
f'{video_id}: There are missing subtitles languages because a PO token was not provided.',
need_subs_langs and f'Subtitles for these languages are missing: {", ".join(need_subs_langs)}.',
need_caps_langs and f'Automatic captions for {len(need_caps_langs)} languages are missing.',
delim=' '))
info['automatic_captions'] = automatic_captions
info['subtitles'] = subtitles

View File

@@ -0,0 +1,309 @@
# YoutubeIE PO Token Provider Framework
As part of the YouTube extractor, we have a framework for providing PO Tokens programmatically. This can be used by plugins.
Refer to the [PO Token Guide](https://github.com/yt-dlp/yt-dlp/wiki/PO-Token-Guide) for more information on PO Tokens.
> [!TIP]
> If publishing a PO Token Provider plugin to GitHub, add the [yt-dlp-pot-provider](https://github.com/topics/yt-dlp-pot-provider) topic to your repository to help users find it.
## Public APIs
- `yt_dlp.extractor.youtube.pot.cache`
- `yt_dlp.extractor.youtube.pot.provider`
- `yt_dlp.extractor.youtube.pot.utils`
Everything else is internal-only and no guarantees are made about the API stability.
> [!WARNING]
> We will try our best to maintain stability with the public APIs.
> However, due to the nature of extractors and YouTube, we may need to remove or change APIs in the future.
> If you are using these APIs outside yt-dlp plugins, please account for this by importing them safely.
## PO Token Provider
`yt_dlp.extractor.youtube.pot.provider`
```python
from yt_dlp.extractor.youtube.pot.provider import (
PoTokenRequest,
PoTokenContext,
PoTokenProvider,
PoTokenResponse,
PoTokenProviderError,
PoTokenProviderRejectedRequest,
register_provider,
register_preference,
ExternalRequestFeature,
)
from yt_dlp.networking.common import Request
from yt_dlp.extractor.youtube.pot.utils import get_webpo_content_binding
from yt_dlp.utils import traverse_obj
from yt_dlp.networking.exceptions import RequestError
import json
@register_provider
class MyPoTokenProviderPTP(PoTokenProvider): # Provider class name must end with "PTP"
PROVIDER_VERSION = '0.2.1'
# Define a unique display name for the provider
PROVIDER_NAME = 'my-provider'
BUG_REPORT_LOCATION = 'https://issues.example.com/report'
# -- Validation shortcuts. Set these to None to disable. --
# Innertube Client Name.
# For example, "WEB", "ANDROID", "TVHTML5".
# For a list of WebPO client names,
# see yt_dlp.extractor.youtube.pot.utils.WEBPO_CLIENTS.
# Also see yt_dlp.extractor.youtube._base.INNERTUBE_CLIENTS
# for a list of client names currently supported by the YouTube extractor.
_SUPPORTED_CLIENTS = ('WEB', 'TVHTML5')
_SUPPORTED_CONTEXTS = (
PoTokenContext.GVS,
)
# If your provider makes external requests to websites (i.e. to youtube.com)
# using another library or service (i.e., not _request_webpage),
# set the request features that are supported here.
# If only using _request_webpage to make external requests, set this to None.
_SUPPORTED_EXTERNAL_REQUEST_FEATURES = (
ExternalRequestFeature.PROXY_SCHEME_HTTP,
ExternalRequestFeature.SOURCE_ADDRESS,
ExternalRequestFeature.DISABLE_TLS_VERIFICATION
)
def is_available(self) -> bool:
"""
Check if the provider is available (e.g. all required dependencies are available)
This is used to determine if the provider should be used and to provide debug information.
IMPORTANT: This method SHOULD NOT make any network requests or perform any expensive operations.
Since this is called multiple times, we recommend caching the result.
"""
return True
def close(self):
# Optional close hook, called when YoutubeDL is closed.
pass
def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
# If you need to validate the request before making the request to the external source.
# Raise yt_dlp.extractor.youtube.pot.provider.PoTokenProviderRejectedRequest if the request is not supported.
if request.is_authenticated:
raise PoTokenProviderRejectedRequest(
'This provider does not support authenticated requests'
)
# Settings are pulled from extractor args passed to yt-dlp with the key `youtubepot-<PROVIDER_KEY>`.
# For this example, the extractor arg would be:
# `--extractor-args "youtubepot-mypotokenprovider:url=https://custom.example.com/get_pot"`
external_provider_url = self._configuration_arg(
'url', default=['https://provider.example.com/get_pot'])[0]
# See below for logging guidelines
self.logger.trace(f'Using external provider URL: {external_provider_url}')
# You should use the internal HTTP client to make requests where possible,
# as it will handle cookies and other networking settings passed to yt-dlp.
try:
# See docstring in _request_webpage method for request tips
response = self._request_webpage(
Request(external_provider_url, data=json.dumps({
'content_binding': get_webpo_content_binding(request),
'proxy': request.request_proxy,
'headers': request.request_headers,
'source_address': request.request_source_address,
'verify_tls': request.request_verify_tls,
# Important: If your provider has its own caching, please respect `bypass_cache`.
# This may be used in the future to request a fresh PO Token if required.
'do_not_cache': request.bypass_cache,
}).encode(), proxies={'all': None}),
pot_request=request,
note=(
f'Requesting {request.context.value} PO Token '
f'for {request.internal_client_name} client from external provider'),
)
except RequestError as e:
# If there is an error, raise PoTokenProviderError.
# You can specify whether it is expected or not. If it is unexpected,
# the log will include a link to the bug report location (BUG_REPORT_LOCATION).
raise PoTokenProviderError(
'Networking error while fetching to get PO Token from external provider',
expected=True
) from e
# Note: PO Token is expected to be base64url encoded
po_token = traverse_obj(response, 'po_token')
if not po_token:
raise PoTokenProviderError(
'Bad PO Token Response from external provider',
expected=False
)
return PoTokenResponse(
po_token=po_token,
# Optional, add a custom expiration timestamp for the token. Use for caching.
# By default, yt-dlp will use the default ttl from a registered cache spec (see below)
# Set to 0 or -1 to not cache this response.
expires_at=None,
)
# If there are multiple PO Token Providers that can handle the same PoTokenRequest,
# you can define a preference function to increase/decrease the priority of providers.
@register_preference(MyPoTokenProviderPTP)
def my_provider_preference(provider: PoTokenProvider, request: PoTokenRequest) -> int:
return 50
```
## Logging Guidelines
- Use the `self.logger` object to log messages.
- When making HTTP requests or any other expensive operation, use `self.logger.info` to log a message to standard non-verbose output.
- This lets users know what is happening when a time-expensive operation is taking place.
- It is recommended to include the PO Token context and internal client name in the message if possible.
- For example, `self.logger.info(f'Requesting {request.context.value} PO Token for {request.internal_client_name} client from external provider')`.
- Use `self.logger.debug` to log a message to the verbose output (`--verbose`).
- For debugging information visible to users posting verbose logs.
- Try to not log too much, prefer using trace logging for detailed debug messages.
- Use `self.logger.trace` to log a message to the PO Token debug output (`--extractor-args "youtube:pot_trace=true"`).
- Log as much as you like here as needed for debugging your provider.
- Avoid logging PO Tokens or any sensitive information to debug or info output.
## Debugging
- Use `-v --extractor-args "youtube:pot_trace=true"` to enable PO Token debug output.
## Caching
> [!WARNING]
> The following describes more advance features that most users/developers will not need to use.
> [!IMPORTANT]
> yt-dlp currently has a built-in LRU Memory Cache Provider and a cache spec provider for WebPO Tokens.
> You should only need to implement cache providers if you want an external cache, or a cache spec if you are handling non-WebPO Tokens.
### Cache Providers
`yt_dlp.extractor.youtube.pot.cache`
```python
from yt_dlp.extractor.youtube.pot.cache import (
PoTokenCacheProvider,
register_preference,
register_provider
)
from yt_dlp.extractor.youtube.pot.provider import PoTokenRequest
@register_provider
class MyCacheProviderPCP(PoTokenCacheProvider): # Provider class name must end with "PCP"
PROVIDER_VERSION = '0.1.0'
# Define a unique display name for the provider
PROVIDER_NAME = 'my-cache-provider'
BUG_REPORT_LOCATION = 'https://issues.example.com/report'
def is_available(self) -> bool:
"""
Check if the provider is available (e.g. all required dependencies are available)
This is used to determine if the provider should be used and to provide debug information.
IMPORTANT: This method SHOULD NOT make any network requests or perform any expensive operations.
Since this is called multiple times, we recommend caching the result.
"""
return True
def get(self, key: str):
# Similar to PO Token Providers, Cache Providers and Cache Spec Providers
# are passed down extractor args matching key youtubepot-<PROVIDER_KEY>.
some_setting = self._configuration_arg('some_setting', default=['default_value'])[0]
return self.my_cache.get(key)
def store(self, key: str, value: str, expires_at: int):
# ⚠ expires_at MUST be respected.
# Cache entries should not be returned if they have expired.
self.my_cache.store(key, value, expires_at)
def delete(self, key: str):
self.my_cache.delete(key)
def close(self):
# Optional close hook, called when the YoutubeDL instance is closed.
pass
# If there are multiple PO Token Cache Providers available, you can
# define a preference function to increase/decrease the priority of providers.
# IMPORTANT: Providers should be in preference of cache lookup time.
# For example, a memory cache should have a higher preference than a disk cache.
# VERY IMPORTANT: yt-dlp has a built-in memory cache with a priority of 10000.
# Your cache provider should be lower than this.
@register_preference(MyCacheProviderPCP)
def my_cache_preference(provider: PoTokenCacheProvider, request: PoTokenRequest) -> int:
return 50
```
### Cache Specs
`yt_dlp.extractor.youtube.pot.cache`
These are used to provide information on how to cache a particular PO Token Request.
You might have a different cache spec for different kinds of PO Tokens.
```python
from yt_dlp.extractor.youtube.pot.cache import (
PoTokenCacheSpec,
PoTokenCacheSpecProvider,
CacheProviderWritePolicy,
register_spec,
)
from yt_dlp.utils import traverse_obj
from yt_dlp.extractor.youtube.pot.provider import PoTokenRequest
@register_spec
class MyCacheSpecProviderPCSP(PoTokenCacheSpecProvider): # Provider class name must end with "PCSP"
PROVIDER_VERSION = '0.1.0'
# Define a unique display name for the provider
PROVIDER_NAME = 'mycachespec'
BUG_REPORT_LOCATION = 'https://issues.example.com/report'
def generate_cache_spec(self, request: PoTokenRequest):
client_name = traverse_obj(request.innertube_context, ('client', 'clientName'))
if client_name != 'ANDROID':
# If the request is not supported by the cache spec, return None
return None
# Generate a cache spec for the request
return PoTokenCacheSpec(
# Key bindings to uniquely identify the request. These are used to generate a cache key.
key_bindings={
'client_name': client_name,
'content_binding': 'unique_content_binding',
'ip': traverse_obj(request.innertube_context, ('client', 'remoteHost')),
'source_address': request.request_source_address,
'proxy': request.request_proxy,
},
# Default Cache TTL in seconds
default_ttl=21600,
# Optional: Specify a write policy.
# WRITE_FIRST will write to the highest priority provider only,
# whereas WRITE_ALL will write to all providers.
# WRITE_FIRST may be useful if the PO Token is short-lived
# and there is no use writing to all providers.
write_policy=CacheProviderWritePolicy.WRITE_ALL,
)
```

View File

@@ -0,0 +1,3 @@
# Trigger import of built-in providers
from ._builtin.memory_cache import MemoryLRUPCP as _MemoryLRUPCP # noqa: F401
from ._builtin.webpo_cachespec import WebPoPCSP as _WebPoPCSP # noqa: F401

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
import datetime as dt
import typing
from threading import Lock
from yt_dlp.extractor.youtube.pot._provider import BuiltinIEContentProvider
from yt_dlp.extractor.youtube.pot._registry import _pot_memory_cache
from yt_dlp.extractor.youtube.pot.cache import (
PoTokenCacheProvider,
register_preference,
register_provider,
)
def initialize_global_cache(max_size: int):
if _pot_memory_cache.value.get('cache') is None:
_pot_memory_cache.value['cache'] = {}
_pot_memory_cache.value['lock'] = Lock()
_pot_memory_cache.value['max_size'] = max_size
if _pot_memory_cache.value['max_size'] != max_size:
raise ValueError('Cannot change max_size of initialized global memory cache')
return (
_pot_memory_cache.value['cache'],
_pot_memory_cache.value['lock'],
_pot_memory_cache.value['max_size'],
)
@register_provider
class MemoryLRUPCP(PoTokenCacheProvider, BuiltinIEContentProvider):
PROVIDER_NAME = 'memory'
DEFAULT_CACHE_SIZE = 25
def __init__(
self,
*args,
initialize_cache: typing.Callable[[int], tuple[dict[str, tuple[str, int]], Lock, int]] = initialize_global_cache,
**kwargs,
):
super().__init__(*args, **kwargs)
self.cache, self.lock, self.max_size = initialize_cache(self.DEFAULT_CACHE_SIZE)
def is_available(self) -> bool:
return True
def get(self, key: str) -> str | None:
with self.lock:
if key not in self.cache:
return None
value, expires_at = self.cache.pop(key)
if expires_at < int(dt.datetime.now(dt.timezone.utc).timestamp()):
return None
self.cache[key] = (value, expires_at)
return value
def store(self, key: str, value: str, expires_at: int):
with self.lock:
if expires_at < int(dt.datetime.now(dt.timezone.utc).timestamp()):
return
if key in self.cache:
self.cache.pop(key)
self.cache[key] = (value, expires_at)
if len(self.cache) > self.max_size:
oldest_key = next(iter(self.cache))
self.cache.pop(oldest_key)
def delete(self, key: str):
with self.lock:
self.cache.pop(key, None)
@register_preference(MemoryLRUPCP)
def memorylru_preference(*_, **__):
# Memory LRU Cache SHOULD be the highest priority
return 10000

View File

@@ -0,0 +1,48 @@
from __future__ import annotations
from yt_dlp.extractor.youtube.pot._provider import BuiltinIEContentProvider
from yt_dlp.extractor.youtube.pot.cache import (
CacheProviderWritePolicy,
PoTokenCacheSpec,
PoTokenCacheSpecProvider,
register_spec,
)
from yt_dlp.extractor.youtube.pot.provider import (
PoTokenRequest,
)
from yt_dlp.extractor.youtube.pot.utils import ContentBindingType, get_webpo_content_binding
from yt_dlp.utils import traverse_obj
@register_spec
class WebPoPCSP(PoTokenCacheSpecProvider, BuiltinIEContentProvider):
PROVIDER_NAME = 'webpo'
def generate_cache_spec(self, request: PoTokenRequest) -> PoTokenCacheSpec | None:
bind_to_visitor_id = self._configuration_arg(
'bind_to_visitor_id', default=['true'])[0] == 'true'
content_binding, content_binding_type = get_webpo_content_binding(
request, bind_to_visitor_id=bind_to_visitor_id)
if not content_binding or not content_binding_type:
return None
write_policy = CacheProviderWritePolicy.WRITE_ALL
if content_binding_type == ContentBindingType.VIDEO_ID:
write_policy = CacheProviderWritePolicy.WRITE_FIRST
return PoTokenCacheSpec(
key_bindings={
't': 'webpo',
'cb': content_binding,
'cbt': content_binding_type.value,
'ip': traverse_obj(request.innertube_context, ('client', 'remoteHost')),
'sa': request.request_source_address,
'px': request.request_proxy,
},
# Integrity token response usually states it has a ttl of 12 hours (43200 seconds).
# We will default to 6 hours to be safe.
default_ttl=21600,
write_policy=write_policy,
)

View File

@@ -0,0 +1,468 @@
from __future__ import annotations
import base64
import binascii
import dataclasses
import datetime as dt
import hashlib
import json
import typing
import urllib.parse
from collections.abc import Iterable
from yt_dlp.extractor.youtube.pot._provider import (
BuiltinIEContentProvider,
IEContentProvider,
IEContentProviderLogger,
)
from yt_dlp.extractor.youtube.pot._registry import (
_pot_cache_provider_preferences,
_pot_cache_providers,
_pot_pcs_providers,
_pot_providers,
_ptp_preferences,
)
from yt_dlp.extractor.youtube.pot.cache import (
CacheProviderWritePolicy,
PoTokenCacheProvider,
PoTokenCacheProviderError,
PoTokenCacheSpec,
PoTokenCacheSpecProvider,
)
from yt_dlp.extractor.youtube.pot.provider import (
PoTokenProvider,
PoTokenProviderError,
PoTokenProviderRejectedRequest,
PoTokenRequest,
PoTokenResponse,
provider_bug_report_message,
)
from yt_dlp.utils import bug_reports_message, format_field, join_nonempty
if typing.TYPE_CHECKING:
from yt_dlp.extractor.youtube.pot.cache import CacheProviderPreference
from yt_dlp.extractor.youtube.pot.provider import Preference
class YoutubeIEContentProviderLogger(IEContentProviderLogger):
def __init__(self, ie, prefix, log_level: IEContentProviderLogger.LogLevel | None = None):
self.__ie = ie
self.prefix = prefix
self.log_level = log_level if log_level is not None else self.LogLevel.INFO
def _format_msg(self, message: str):
prefixstr = format_field(self.prefix, None, '[%s] ')
return f'{prefixstr}{message}'
def trace(self, message: str):
if self.log_level <= self.LogLevel.TRACE:
self.__ie.write_debug(self._format_msg('TRACE: ' + message))
def debug(self, message: str):
if self.log_level <= self.LogLevel.DEBUG:
self.__ie.write_debug(self._format_msg(message))
def info(self, message: str):
if self.log_level <= self.LogLevel.INFO:
self.__ie.to_screen(self._format_msg(message))
def warning(self, message: str, *, once=False):
if self.log_level <= self.LogLevel.WARNING:
self.__ie.report_warning(self._format_msg(message), only_once=once)
def error(self, message: str):
if self.log_level <= self.LogLevel.ERROR:
self.__ie._downloader.report_error(self._format_msg(message), is_error=False)
class PoTokenCache:
def __init__(
self,
logger: IEContentProviderLogger,
cache_providers: list[PoTokenCacheProvider],
cache_spec_providers: list[PoTokenCacheSpecProvider],
cache_provider_preferences: list[CacheProviderPreference] | None = None,
):
self.cache_providers: dict[str, PoTokenCacheProvider] = {
provider.PROVIDER_KEY: provider for provider in (cache_providers or [])}
self.cache_provider_preferences: list[CacheProviderPreference] = cache_provider_preferences or []
self.cache_spec_providers: dict[str, PoTokenCacheSpecProvider] = {
provider.PROVIDER_KEY: provider for provider in (cache_spec_providers or [])}
self.logger = logger
def _get_cache_providers(self, request: PoTokenRequest) -> Iterable[PoTokenCacheProvider]:
"""Sorts available cache providers by preference, given a request"""
preferences = {
provider: sum(pref(provider, request) for pref in self.cache_provider_preferences)
for provider in self.cache_providers.values()
}
if self.logger.log_level <= self.logger.LogLevel.TRACE:
# calling is_available() for every PO Token provider upfront may have some overhead
self.logger.trace(f'PO Token Cache Providers: {provider_display_list(self.cache_providers.values())}')
self.logger.trace('Cache Provider preferences for this request: {}'.format(', '.join(
f'{provider.PROVIDER_KEY}={pref}' for provider, pref in preferences.items())))
return (
provider for provider in sorted(
self.cache_providers.values(), key=preferences.get, reverse=True) if provider.is_available())
def _get_cache_spec(self, request: PoTokenRequest) -> PoTokenCacheSpec | None:
for provider in self.cache_spec_providers.values():
if not provider.is_available():
continue
try:
spec = provider.generate_cache_spec(request)
if not spec:
continue
if not validate_cache_spec(spec):
self.logger.error(
f'PoTokenCacheSpecProvider "{provider.PROVIDER_KEY}" generate_cache_spec() '
f'returned invalid spec {spec}{provider_bug_report_message(provider)}')
continue
spec = dataclasses.replace(spec, _provider=provider)
self.logger.trace(
f'Retrieved cache spec {spec} from cache spec provider "{provider.PROVIDER_NAME}"')
return spec
except Exception as e:
self.logger.error(
f'Error occurred with "{provider.PROVIDER_NAME}" PO Token cache spec provider: '
f'{e!r}{provider_bug_report_message(provider)}')
continue
return None
def _generate_key_bindings(self, spec: PoTokenCacheSpec) -> dict[str, str]:
bindings_cleaned = {
**{k: v for k, v in spec.key_bindings.items() if v is not None},
# Allow us to invalidate caches if such need arises
'_dlp_cache': 'v1',
}
if spec._provider:
bindings_cleaned['_p'] = spec._provider.PROVIDER_KEY
self.logger.trace(f'Generated cache key bindings: {bindings_cleaned}')
return bindings_cleaned
def _generate_key(self, bindings: dict) -> str:
binding_string = ''.join(repr(dict(sorted(bindings.items()))))
return hashlib.sha256(binding_string.encode()).hexdigest()
def get(self, request: PoTokenRequest) -> PoTokenResponse | None:
spec = self._get_cache_spec(request)
if not spec:
self.logger.trace('No cache spec available for this request, unable to fetch from cache')
return None
cache_key = self._generate_key(self._generate_key_bindings(spec))
self.logger.trace(f'Attempting to access PO Token cache using key: {cache_key}')
for idx, provider in enumerate(self._get_cache_providers(request)):
try:
self.logger.trace(
f'Attempting to fetch PO Token response from "{provider.PROVIDER_NAME}" cache provider')
cache_response = provider.get(cache_key)
if not cache_response:
continue
try:
po_token_response = PoTokenResponse(**json.loads(cache_response))
except (TypeError, ValueError, json.JSONDecodeError):
po_token_response = None
if not validate_response(po_token_response):
self.logger.error(
f'Invalid PO Token response retrieved from cache provider "{provider.PROVIDER_NAME}": '
f'{cache_response}{provider_bug_report_message(provider)}')
provider.delete(cache_key)
continue
self.logger.trace(
f'PO Token response retrieved from cache using "{provider.PROVIDER_NAME}" provider: '
f'{po_token_response}')
if idx > 0:
# Write back to the highest priority cache provider,
# so we stop trying to fetch from lower priority providers
self.logger.trace('Writing PO Token response to highest priority cache provider')
self.store(request, po_token_response, write_policy=CacheProviderWritePolicy.WRITE_FIRST)
return po_token_response
except PoTokenCacheProviderError as e:
self.logger.warning(
f'Error from "{provider.PROVIDER_NAME}" PO Token cache provider: '
f'{e!r}{provider_bug_report_message(provider) if not e.expected else ""}')
continue
except Exception as e:
self.logger.error(
f'Error occurred with "{provider.PROVIDER_NAME}" PO Token cache provider: '
f'{e!r}{provider_bug_report_message(provider)}',
)
continue
return None
def store(
self,
request: PoTokenRequest,
response: PoTokenResponse,
write_policy: CacheProviderWritePolicy | None = None,
):
spec = self._get_cache_spec(request)
if not spec:
self.logger.trace('No cache spec available for this request. Not caching.')
return
if not validate_response(response):
self.logger.error(
f'Invalid PO Token response provided to PoTokenCache.store(): '
f'{response}{bug_reports_message()}')
return
cache_key = self._generate_key(self._generate_key_bindings(spec))
self.logger.trace(f'Attempting to access PO Token cache using key: {cache_key}')
default_expires_at = int(dt.datetime.now(dt.timezone.utc).timestamp()) + spec.default_ttl
cache_response = dataclasses.replace(response, expires_at=response.expires_at or default_expires_at)
write_policy = write_policy or spec.write_policy
self.logger.trace(f'Using write policy: {write_policy}')
for idx, provider in enumerate(self._get_cache_providers(request)):
try:
self.logger.trace(
f'Caching PO Token response in "{provider.PROVIDER_NAME}" cache provider '
f'(key={cache_key}, expires_at={cache_response.expires_at})')
provider.store(
key=cache_key,
value=json.dumps(dataclasses.asdict(cache_response)),
expires_at=cache_response.expires_at)
except PoTokenCacheProviderError as e:
self.logger.warning(
f'Error from "{provider.PROVIDER_NAME}" PO Token cache provider: '
f'{e!r}{provider_bug_report_message(provider) if not e.expected else ""}')
except Exception as e:
self.logger.error(
f'Error occurred with "{provider.PROVIDER_NAME}" PO Token cache provider: '
f'{e!r}{provider_bug_report_message(provider)}')
# WRITE_FIRST should not write to lower priority providers in the case the highest priority provider fails
if idx == 0 and write_policy == CacheProviderWritePolicy.WRITE_FIRST:
return
def close(self):
for provider in self.cache_providers.values():
provider.close()
for spec_provider in self.cache_spec_providers.values():
spec_provider.close()
class PoTokenRequestDirector:
def __init__(self, logger: IEContentProviderLogger, cache: PoTokenCache):
self.providers: dict[str, PoTokenProvider] = {}
self.preferences: list[Preference] = []
self.cache = cache
self.logger = logger
def register_provider(self, provider: PoTokenProvider):
self.providers[provider.PROVIDER_KEY] = provider
def register_preference(self, preference: Preference):
self.preferences.append(preference)
def _get_providers(self, request: PoTokenRequest) -> Iterable[PoTokenProvider]:
"""Sorts available providers by preference, given a request"""
preferences = {
provider: sum(pref(provider, request) for pref in self.preferences)
for provider in self.providers.values()
}
if self.logger.log_level <= self.logger.LogLevel.TRACE:
# calling is_available() for every PO Token provider upfront may have some overhead
self.logger.trace(f'PO Token Providers: {provider_display_list(self.providers.values())}')
self.logger.trace('Provider preferences for this request: {}'.format(', '.join(
f'{provider.PROVIDER_NAME}={pref}' for provider, pref in preferences.items())))
return (
provider for provider in sorted(
self.providers.values(), key=preferences.get, reverse=True)
if provider.is_available()
)
def _get_po_token(self, request) -> PoTokenResponse | None:
for provider in self._get_providers(request):
try:
self.logger.trace(
f'Attempting to fetch a PO Token from "{provider.PROVIDER_NAME}" provider')
response = provider.request_pot(request.copy())
except PoTokenProviderRejectedRequest as e:
self.logger.trace(
f'PO Token Provider "{provider.PROVIDER_NAME}" rejected this request, '
f'trying next available provider. Reason: {e}')
continue
except PoTokenProviderError as e:
self.logger.warning(
f'Error fetching PO Token from "{provider.PROVIDER_NAME}" provider: '
f'{e!r}{provider_bug_report_message(provider) if not e.expected else ""}')
continue
except Exception as e:
self.logger.error(
f'Unexpected error when fetching PO Token from "{provider.PROVIDER_NAME}" provider: '
f'{e!r}{provider_bug_report_message(provider)}')
continue
self.logger.trace(f'PO Token response from "{provider.PROVIDER_NAME}" provider: {response}')
if not validate_response(response):
self.logger.error(
f'Invalid PO Token response received from "{provider.PROVIDER_NAME}" provider: '
f'{response}{provider_bug_report_message(provider)}')
continue
return response
self.logger.trace('No PO Token providers were able to provide a valid PO Token')
return None
def get_po_token(self, request: PoTokenRequest) -> str | None:
if not request.bypass_cache:
if pot_response := self.cache.get(request):
return clean_pot(pot_response.po_token)
if not self.providers:
self.logger.trace('No PO Token providers registered')
return None
pot_response = self._get_po_token(request)
if not pot_response:
return None
pot_response.po_token = clean_pot(pot_response.po_token)
if pot_response.expires_at is None or pot_response.expires_at > 0:
self.cache.store(request, pot_response)
else:
self.logger.trace(
f'PO Token response will not be cached (expires_at={pot_response.expires_at})')
return pot_response.po_token
def close(self):
for provider in self.providers.values():
provider.close()
self.cache.close()
EXTRACTOR_ARG_PREFIX = 'youtubepot'
def initialize_pot_director(ie):
assert ie._downloader is not None, 'Downloader not set'
enable_trace = ie._configuration_arg(
'pot_trace', ['false'], ie_key='youtube', casesense=False)[0] == 'true'
if enable_trace:
log_level = IEContentProviderLogger.LogLevel.TRACE
elif ie.get_param('verbose', False):
log_level = IEContentProviderLogger.LogLevel.DEBUG
else:
log_level = IEContentProviderLogger.LogLevel.INFO
def get_provider_logger_and_settings(provider, logger_key):
logger_prefix = f'{logger_key}:{provider.PROVIDER_NAME}'
extractor_key = f'{EXTRACTOR_ARG_PREFIX}-{provider.PROVIDER_KEY.lower()}'
return (
YoutubeIEContentProviderLogger(ie, logger_prefix, log_level=log_level),
ie.get_param('extractor_args', {}).get(extractor_key, {}))
cache_providers = []
for cache_provider in _pot_cache_providers.value.values():
logger, settings = get_provider_logger_and_settings(cache_provider, 'pot:cache')
cache_providers.append(cache_provider(ie, logger, settings))
cache_spec_providers = []
for cache_spec_provider in _pot_pcs_providers.value.values():
logger, settings = get_provider_logger_and_settings(cache_spec_provider, 'pot:cache:spec')
cache_spec_providers.append(cache_spec_provider(ie, logger, settings))
cache = PoTokenCache(
logger=YoutubeIEContentProviderLogger(ie, 'pot:cache', log_level=log_level),
cache_providers=cache_providers,
cache_spec_providers=cache_spec_providers,
cache_provider_preferences=list(_pot_cache_provider_preferences.value),
)
director = PoTokenRequestDirector(
logger=YoutubeIEContentProviderLogger(ie, 'pot', log_level=log_level),
cache=cache,
)
ie._downloader.add_close_hook(director.close)
for provider in _pot_providers.value.values():
logger, settings = get_provider_logger_and_settings(provider, 'pot')
director.register_provider(provider(ie, logger, settings))
for preference in _ptp_preferences.value:
director.register_preference(preference)
if director.logger.log_level <= director.logger.LogLevel.DEBUG:
# calling is_available() for every PO Token provider upfront may have some overhead
director.logger.debug(f'PO Token Providers: {provider_display_list(director.providers.values())}')
director.logger.debug(f'PO Token Cache Providers: {provider_display_list(cache.cache_providers.values())}')
director.logger.debug(f'PO Token Cache Spec Providers: {provider_display_list(cache.cache_spec_providers.values())}')
director.logger.trace(f'Registered {len(director.preferences)} provider preferences')
director.logger.trace(f'Registered {len(cache.cache_provider_preferences)} cache provider preferences')
return director
def provider_display_list(providers: Iterable[IEContentProvider]):
def provider_display_name(provider):
display_str = join_nonempty(
provider.PROVIDER_NAME,
provider.PROVIDER_VERSION if not isinstance(provider, BuiltinIEContentProvider) else None)
statuses = []
if not isinstance(provider, BuiltinIEContentProvider):
statuses.append('external')
if not provider.is_available():
statuses.append('unavailable')
if statuses:
display_str += f' ({", ".join(statuses)})'
return display_str
return ', '.join(provider_display_name(provider) for provider in providers) or 'none'
def clean_pot(po_token: str):
# Clean and validate the PO Token. This will strip invalid characters off
# (e.g. additional url params the user may accidentally include)
try:
return base64.urlsafe_b64encode(
base64.urlsafe_b64decode(urllib.parse.unquote(po_token))).decode()
except (binascii.Error, ValueError):
raise ValueError('Invalid PO Token')
def validate_response(response: PoTokenResponse | None):
if (
not isinstance(response, PoTokenResponse)
or not isinstance(response.po_token, str)
or not response.po_token
): # noqa: SIM103
return False
try:
clean_pot(response.po_token)
except ValueError:
return False
if not isinstance(response.expires_at, int):
return response.expires_at is None
return response.expires_at <= 0 or response.expires_at > int(dt.datetime.now(dt.timezone.utc).timestamp())
def validate_cache_spec(spec: PoTokenCacheSpec):
return (
isinstance(spec, PoTokenCacheSpec)
and isinstance(spec.write_policy, CacheProviderWritePolicy)
and isinstance(spec.default_ttl, int)
and isinstance(spec.key_bindings, dict)
and all(isinstance(k, str) for k in spec.key_bindings)
and all(v is None or isinstance(v, str) for v in spec.key_bindings.values())
and bool([v for v in spec.key_bindings.values() if v is not None])
)

View File

@@ -0,0 +1,156 @@
from __future__ import annotations
import abc
import enum
import functools
from yt_dlp.extractor.common import InfoExtractor
from yt_dlp.utils import NO_DEFAULT, bug_reports_message, classproperty, traverse_obj
from yt_dlp.version import __version__
# xxx: these could be generalized outside YoutubeIE eventually
class IEContentProviderLogger(abc.ABC):
class LogLevel(enum.IntEnum):
TRACE = 0
DEBUG = 10
INFO = 20
WARNING = 30
ERROR = 40
@classmethod
def _missing_(cls, value):
if isinstance(value, str):
value = value.upper()
if value in dir(cls):
return cls[value]
return cls.INFO
log_level = LogLevel.INFO
@abc.abstractmethod
def trace(self, message: str):
pass
@abc.abstractmethod
def debug(self, message: str):
pass
@abc.abstractmethod
def info(self, message: str):
pass
@abc.abstractmethod
def warning(self, message: str, *, once=False):
pass
@abc.abstractmethod
def error(self, message: str):
pass
class IEContentProviderError(Exception):
def __init__(self, msg=None, expected=False):
super().__init__(msg)
self.expected = expected
class IEContentProvider(abc.ABC):
PROVIDER_VERSION: str = '0.0.0'
BUG_REPORT_LOCATION: str = '(developer has not provided a bug report location)'
def __init__(
self,
ie: InfoExtractor,
logger: IEContentProviderLogger,
settings: dict[str, list[str]], *_, **__,
):
self.ie = ie
self.settings = settings or {}
self.logger = logger
super().__init__()
@classmethod
def __init_subclass__(cls, *, suffix=None, **kwargs):
if suffix:
cls._PROVIDER_KEY_SUFFIX = suffix
return super().__init_subclass__(**kwargs)
@classproperty
def PROVIDER_NAME(cls) -> str:
return cls.__name__[:-len(cls._PROVIDER_KEY_SUFFIX)]
@classproperty
def BUG_REPORT_MESSAGE(cls):
return f'please report this issue to the provider developer at {cls.BUG_REPORT_LOCATION} .'
@classproperty
def PROVIDER_KEY(cls) -> str:
assert hasattr(cls, '_PROVIDER_KEY_SUFFIX'), 'Content Provider implementation must define a suffix for the provider key'
assert cls.__name__.endswith(cls._PROVIDER_KEY_SUFFIX), f'PoTokenProvider class names must end with "{cls._PROVIDER_KEY_SUFFIX}"'
return cls.__name__[:-len(cls._PROVIDER_KEY_SUFFIX)]
@abc.abstractmethod
def is_available(self) -> bool:
"""
Check if the provider is available (e.g. all required dependencies are available)
This is used to determine if the provider should be used and to provide debug information.
IMPORTANT: This method should not make any network requests or perform any expensive operations.
It is called multiple times.
"""
raise NotImplementedError
def close(self): # noqa: B027
pass
def _configuration_arg(self, key, default=NO_DEFAULT, *, casesense=False):
"""
@returns A list of values for the setting given by "key"
or "default" if no such key is present
@param default The default value to return when the key is not present (default: [])
@param casesense When false, the values are converted to lower case
"""
val = traverse_obj(self.settings, key)
if val is None:
return [] if default is NO_DEFAULT else default
return list(val) if casesense else [x.lower() for x in val]
class BuiltinIEContentProvider(IEContentProvider, abc.ABC):
PROVIDER_VERSION = __version__
BUG_REPORT_MESSAGE = bug_reports_message(before='')
def register_provider_generic(
provider,
base_class,
registry,
):
"""Generic function to register a provider class"""
assert issubclass(provider, base_class), f'{provider} must be a subclass of {base_class.__name__}'
assert provider.PROVIDER_KEY not in registry, f'{base_class.__name__} {provider.PROVIDER_KEY} already registered'
registry[provider.PROVIDER_KEY] = provider
return provider
def register_preference_generic(
base_class,
registry,
*providers,
):
"""Generic function to register a preference for a provider"""
assert all(issubclass(provider, base_class) for provider in providers)
def outer(preference):
@functools.wraps(preference)
def inner(provider, *args, **kwargs):
if not providers or isinstance(provider, providers):
return preference(provider, *args, **kwargs)
return 0
registry.add(inner)
return preference
return outer

View File

@@ -0,0 +1,8 @@
from yt_dlp.globals import Indirect
_pot_providers = Indirect({})
_ptp_preferences = Indirect(set())
_pot_pcs_providers = Indirect({})
_pot_cache_providers = Indirect({})
_pot_cache_provider_preferences = Indirect(set())
_pot_memory_cache = Indirect({})

View File

@@ -0,0 +1,97 @@
"""PUBLIC API"""
from __future__ import annotations
import abc
import dataclasses
import enum
import typing
from yt_dlp.extractor.youtube.pot._provider import (
IEContentProvider,
IEContentProviderError,
register_preference_generic,
register_provider_generic,
)
from yt_dlp.extractor.youtube.pot._registry import (
_pot_cache_provider_preferences,
_pot_cache_providers,
_pot_pcs_providers,
)
from yt_dlp.extractor.youtube.pot.provider import PoTokenRequest
class PoTokenCacheProviderError(IEContentProviderError):
"""An error occurred while fetching a PO Token"""
class PoTokenCacheProvider(IEContentProvider, abc.ABC, suffix='PCP'):
@abc.abstractmethod
def get(self, key: str) -> str | None:
pass
@abc.abstractmethod
def store(self, key: str, value: str, expires_at: int):
pass
@abc.abstractmethod
def delete(self, key: str):
pass
class CacheProviderWritePolicy(enum.Enum):
WRITE_ALL = enum.auto() # Write to all cache providers
WRITE_FIRST = enum.auto() # Write to only the first cache provider
@dataclasses.dataclass
class PoTokenCacheSpec:
key_bindings: dict[str, str | None]
default_ttl: int
write_policy: CacheProviderWritePolicy = CacheProviderWritePolicy.WRITE_ALL
# Internal
_provider: PoTokenCacheSpecProvider | None = None
class PoTokenCacheSpecProvider(IEContentProvider, abc.ABC, suffix='PCSP'):
def is_available(self) -> bool:
return True
@abc.abstractmethod
def generate_cache_spec(self, request: PoTokenRequest) -> PoTokenCacheSpec | None:
"""Generate a cache spec for the given request"""
pass
def register_provider(provider: type[PoTokenCacheProvider]):
"""Register a PoTokenCacheProvider class"""
return register_provider_generic(
provider=provider,
base_class=PoTokenCacheProvider,
registry=_pot_cache_providers.value,
)
def register_spec(provider: type[PoTokenCacheSpecProvider]):
"""Register a PoTokenCacheSpecProvider class"""
return register_provider_generic(
provider=provider,
base_class=PoTokenCacheSpecProvider,
registry=_pot_pcs_providers.value,
)
def register_preference(
*providers: type[PoTokenCacheProvider]) -> typing.Callable[[CacheProviderPreference], CacheProviderPreference]:
"""Register a preference for a PoTokenCacheProvider"""
return register_preference_generic(
PoTokenCacheProvider,
_pot_cache_provider_preferences.value,
*providers,
)
if typing.TYPE_CHECKING:
CacheProviderPreference = typing.Callable[[PoTokenCacheProvider, PoTokenRequest], int]

View File

@@ -0,0 +1,281 @@
"""PUBLIC API"""
from __future__ import annotations
import abc
import copy
import dataclasses
import enum
import functools
import typing
import urllib.parse
from yt_dlp.cookies import YoutubeDLCookieJar
from yt_dlp.extractor.youtube.pot._provider import (
IEContentProvider,
IEContentProviderError,
register_preference_generic,
register_provider_generic,
)
from yt_dlp.extractor.youtube.pot._registry import _pot_providers, _ptp_preferences
from yt_dlp.networking import Request, Response
from yt_dlp.utils import traverse_obj
from yt_dlp.utils.networking import HTTPHeaderDict
__all__ = [
'ExternalRequestFeature',
'PoTokenContext',
'PoTokenProvider',
'PoTokenProviderError',
'PoTokenProviderRejectedRequest',
'PoTokenRequest',
'PoTokenResponse',
'provider_bug_report_message',
'register_preference',
'register_provider',
]
class PoTokenContext(enum.Enum):
GVS = 'gvs'
PLAYER = 'player'
SUBS = 'subs'
@dataclasses.dataclass
class PoTokenRequest:
# YouTube parameters
context: PoTokenContext
innertube_context: InnertubeContext
innertube_host: str | None = None
session_index: str | None = None
player_url: str | None = None
is_authenticated: bool = False
video_webpage: str | None = None
internal_client_name: str | None = None
# Content binding parameters
visitor_data: str | None = None
data_sync_id: str | None = None
video_id: str | None = None
# Networking parameters
request_cookiejar: YoutubeDLCookieJar = dataclasses.field(default_factory=YoutubeDLCookieJar)
request_proxy: str | None = None
request_headers: HTTPHeaderDict = dataclasses.field(default_factory=HTTPHeaderDict)
request_timeout: float | None = None
request_source_address: str | None = None
request_verify_tls: bool = True
# Generate a new token, do not used a cached token
# The token should still be cached for future requests
bypass_cache: bool = False
def copy(self):
return dataclasses.replace(
self,
request_headers=HTTPHeaderDict(self.request_headers),
innertube_context=copy.deepcopy(self.innertube_context),
)
@dataclasses.dataclass
class PoTokenResponse:
po_token: str
expires_at: int | None = None
class PoTokenProviderRejectedRequest(IEContentProviderError):
"""Reject the PoTokenRequest (cannot handle the request)"""
class PoTokenProviderError(IEContentProviderError):
"""An error occurred while fetching a PO Token"""
class ExternalRequestFeature(enum.Enum):
PROXY_SCHEME_HTTP = enum.auto()
PROXY_SCHEME_HTTPS = enum.auto()
PROXY_SCHEME_SOCKS4 = enum.auto()
PROXY_SCHEME_SOCKS4A = enum.auto()
PROXY_SCHEME_SOCKS5 = enum.auto()
PROXY_SCHEME_SOCKS5H = enum.auto()
SOURCE_ADDRESS = enum.auto()
DISABLE_TLS_VERIFICATION = enum.auto()
class PoTokenProvider(IEContentProvider, abc.ABC, suffix='PTP'):
# Set to None to disable the check
_SUPPORTED_CONTEXTS: tuple[PoTokenContext] | None = ()
# Innertube Client Name.
# For example, "WEB", "ANDROID", "TVHTML5".
# For a list of WebPO client names, see yt_dlp.extractor.youtube.pot.utils.WEBPO_CLIENTS.
# Also see yt_dlp.extractor.youtube._base.INNERTUBE_CLIENTS
# for a list of client names currently supported by the YouTube extractor.
_SUPPORTED_CLIENTS: tuple[str] | None = ()
# If making external requests to websites (i.e. to youtube.com)
# using another library or service (i.e., not _request_webpage),
# add the request features that are supported.
# If only using _request_webpage to make external requests, set this to None.
_SUPPORTED_EXTERNAL_REQUEST_FEATURES: tuple[ExternalRequestFeature] | None = ()
def __validate_request(self, request: PoTokenRequest):
if not self.is_available():
raise PoTokenProviderRejectedRequest(f'{self.PROVIDER_NAME} is not available')
# Validate request using built-in settings
if (
self._SUPPORTED_CONTEXTS is not None
and request.context not in self._SUPPORTED_CONTEXTS
):
raise PoTokenProviderRejectedRequest(
f'PO Token Context "{request.context}" is not supported by {self.PROVIDER_NAME}')
if self._SUPPORTED_CLIENTS is not None:
client_name = traverse_obj(
request.innertube_context, ('client', 'clientName'))
if client_name not in self._SUPPORTED_CLIENTS:
raise PoTokenProviderRejectedRequest(
f'Client "{client_name}" is not supported by {self.PROVIDER_NAME}. '
f'Supported clients: {", ".join(self._SUPPORTED_CLIENTS) or "none"}')
self.__validate_external_request_features(request)
@functools.cached_property
def _supported_proxy_schemes(self):
return {
scheme: feature
for scheme, feature in {
'http': ExternalRequestFeature.PROXY_SCHEME_HTTP,
'https': ExternalRequestFeature.PROXY_SCHEME_HTTPS,
'socks4': ExternalRequestFeature.PROXY_SCHEME_SOCKS4,
'socks4a': ExternalRequestFeature.PROXY_SCHEME_SOCKS4A,
'socks5': ExternalRequestFeature.PROXY_SCHEME_SOCKS5,
'socks5h': ExternalRequestFeature.PROXY_SCHEME_SOCKS5H,
}.items()
if feature in (self._SUPPORTED_EXTERNAL_REQUEST_FEATURES or [])
}
def __validate_external_request_features(self, request: PoTokenRequest):
if self._SUPPORTED_EXTERNAL_REQUEST_FEATURES is None:
return
if request.request_proxy:
scheme = urllib.parse.urlparse(request.request_proxy).scheme
if scheme.lower() not in self._supported_proxy_schemes:
raise PoTokenProviderRejectedRequest(
f'External requests by "{self.PROVIDER_NAME}" provider do not '
f'support proxy scheme "{scheme}". Supported proxy schemes: '
f'{", ".join(self._supported_proxy_schemes) or "none"}')
if (
request.request_source_address
and ExternalRequestFeature.SOURCE_ADDRESS not in self._SUPPORTED_EXTERNAL_REQUEST_FEATURES
):
raise PoTokenProviderRejectedRequest(
f'External requests by "{self.PROVIDER_NAME}" provider '
f'do not support setting source address')
if (
not request.request_verify_tls
and ExternalRequestFeature.DISABLE_TLS_VERIFICATION not in self._SUPPORTED_EXTERNAL_REQUEST_FEATURES
):
raise PoTokenProviderRejectedRequest(
f'External requests by "{self.PROVIDER_NAME}" provider '
f'do not support ignoring TLS certificate failures')
def request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
self.__validate_request(request)
return self._real_request_pot(request)
@abc.abstractmethod
def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
"""To be implemented by subclasses"""
pass
# Helper functions
def _request_webpage(self, request: Request, pot_request: PoTokenRequest | None = None, note=None, **kwargs) -> Response:
"""Make a request using the internal HTTP Client.
Use this instead of calling requests, urllib3 or other HTTP client libraries directly!
YouTube cookies will be automatically applied if this request is made to YouTube.
@param request: The request to make
@param pot_request: The PoTokenRequest to use. Request parameters will be merged from it.
@param note: Custom log message to display when making the request. Set to `False` to disable logging.
Tips:
- Disable proxy (e.g. if calling local service): Request(..., proxies={'all': None})
- Set request timeout: Request(..., extensions={'timeout': 5.0})
"""
req = request.copy()
# Merge some ctx request settings into the request
# Most of these will already be used by the configured ydl instance,
# however, the YouTube extractor may override some.
if pot_request is not None:
req.headers = HTTPHeaderDict(pot_request.request_headers, req.headers)
req.proxies = req.proxies or ({'all': pot_request.request_proxy} if pot_request.request_proxy else {})
if pot_request.request_cookiejar is not None:
req.extensions['cookiejar'] = req.extensions.get('cookiejar', pot_request.request_cookiejar)
if note is not False:
self.logger.info(str(note) if note else 'Requesting webpage')
return self.ie._downloader.urlopen(req)
def register_provider(provider: type[PoTokenProvider]):
"""Register a PoTokenProvider class"""
return register_provider_generic(
provider=provider,
base_class=PoTokenProvider,
registry=_pot_providers.value,
)
def provider_bug_report_message(provider: IEContentProvider, before=';'):
msg = provider.BUG_REPORT_MESSAGE
before = before.rstrip()
if not before or before.endswith(('.', '!', '?')):
msg = msg[0].title() + msg[1:]
return f'{before} {msg}' if before else msg
def register_preference(*providers: type[PoTokenProvider]) -> typing.Callable[[Preference], Preference]:
"""Register a preference for a PoTokenProvider"""
return register_preference_generic(
PoTokenProvider,
_ptp_preferences.value,
*providers,
)
if typing.TYPE_CHECKING:
Preference = typing.Callable[[PoTokenProvider, PoTokenRequest], int]
__all__.append('Preference')
# Barebones innertube context. There may be more fields.
class ClientInfo(typing.TypedDict, total=False):
hl: str | None
gl: str | None
remoteHost: str | None
deviceMake: str | None
deviceModel: str | None
visitorData: str | None
userAgent: str | None
clientName: str
clientVersion: str
osName: str | None
osVersion: str | None
class InnertubeContext(typing.TypedDict, total=False):
client: ClientInfo
request: dict
user: dict

View File

@@ -0,0 +1,73 @@
"""PUBLIC API"""
from __future__ import annotations
import base64
import contextlib
import enum
import re
import urllib.parse
from yt_dlp.extractor.youtube.pot.provider import PoTokenContext, PoTokenRequest
from yt_dlp.utils import traverse_obj
__all__ = ['WEBPO_CLIENTS', 'ContentBindingType', 'get_webpo_content_binding']
WEBPO_CLIENTS = (
'WEB',
'MWEB',
'TVHTML5',
'WEB_EMBEDDED_PLAYER',
'WEB_CREATOR',
'WEB_REMIX',
'TVHTML5_SIMPLY_EMBEDDED_PLAYER',
)
class ContentBindingType(enum.Enum):
VISITOR_DATA = 'visitor_data'
DATASYNC_ID = 'datasync_id'
VIDEO_ID = 'video_id'
VISITOR_ID = 'visitor_id'
def get_webpo_content_binding(
request: PoTokenRequest,
webpo_clients=WEBPO_CLIENTS,
bind_to_visitor_id=False,
) -> tuple[str | None, ContentBindingType | None]:
client_name = traverse_obj(request.innertube_context, ('client', 'clientName'))
if not client_name or client_name not in webpo_clients:
return None, None
if request.context == PoTokenContext.GVS or client_name in ('WEB_REMIX', ):
if request.is_authenticated:
return request.data_sync_id, ContentBindingType.DATASYNC_ID
else:
if bind_to_visitor_id:
visitor_id = _extract_visitor_id(request.visitor_data)
if visitor_id:
return visitor_id, ContentBindingType.VISITOR_ID
return request.visitor_data, ContentBindingType.VISITOR_DATA
elif request.context in (PoTokenContext.PLAYER, PoTokenContext.SUBS):
return request.video_id, ContentBindingType.VIDEO_ID
return None, None
def _extract_visitor_id(visitor_data):
if not visitor_data:
return None
# Attempt to extract the visitor ID from the visitor_data protobuf
# xxx: ideally should use a protobuf parser
with contextlib.suppress(Exception):
visitor_id = base64.urlsafe_b64decode(
urllib.parse.unquote_plus(visitor_data))[2:13].decode()
# check that visitor id is all letters and numbers
if re.fullmatch(r'[A-Za-z0-9_-]{11}', visitor_id):
return visitor_id
return None