mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2025-10-24 03:08:34 +00:00

* [utils] Create `RetryManager` * Migrate all retries to use the manager * [extractor] Add wrapper methods for convenience * Standardize console messages for retries * Add `--retry-sleep` for extractors
232 lines
11 KiB
Python
232 lines
11 KiB
Python
import json
|
|
import time
|
|
import urllib.error
|
|
|
|
from .fragment import FragmentFD
|
|
from ..utils import (
|
|
RegexNotFoundError,
|
|
RetryManager,
|
|
dict_get,
|
|
int_or_none,
|
|
try_get,
|
|
)
|
|
|
|
|
|
class YoutubeLiveChatFD(FragmentFD):
|
|
""" Downloads YouTube live chats fragment by fragment """
|
|
|
|
def real_download(self, filename, info_dict):
|
|
video_id = info_dict['video_id']
|
|
self.to_screen('[%s] Downloading live chat' % self.FD_NAME)
|
|
if not self.params.get('skip_download') and info_dict['protocol'] == 'youtube_live_chat':
|
|
self.report_warning('Live chat download runs until the livestream ends. '
|
|
'If you wish to download the video simultaneously, run a separate yt-dlp instance')
|
|
|
|
test = self.params.get('test', False)
|
|
|
|
ctx = {
|
|
'filename': filename,
|
|
'live': True,
|
|
'total_frags': None,
|
|
}
|
|
|
|
from ..extractor.youtube import YoutubeBaseInfoExtractor
|
|
|
|
ie = YoutubeBaseInfoExtractor(self.ydl)
|
|
|
|
start_time = int(time.time() * 1000)
|
|
|
|
def dl_fragment(url, data=None, headers=None):
|
|
http_headers = info_dict.get('http_headers', {})
|
|
if headers:
|
|
http_headers = http_headers.copy()
|
|
http_headers.update(headers)
|
|
return self._download_fragment(ctx, url, info_dict, http_headers, data)
|
|
|
|
def parse_actions_replay(live_chat_continuation):
|
|
offset = continuation_id = click_tracking_params = None
|
|
processed_fragment = bytearray()
|
|
for action in live_chat_continuation.get('actions', []):
|
|
if 'replayChatItemAction' in action:
|
|
replay_chat_item_action = action['replayChatItemAction']
|
|
offset = int(replay_chat_item_action['videoOffsetTimeMsec'])
|
|
processed_fragment.extend(
|
|
json.dumps(action, ensure_ascii=False).encode() + b'\n')
|
|
if offset is not None:
|
|
continuation = try_get(
|
|
live_chat_continuation,
|
|
lambda x: x['continuations'][0]['liveChatReplayContinuationData'], dict)
|
|
if continuation:
|
|
continuation_id = continuation.get('continuation')
|
|
click_tracking_params = continuation.get('clickTrackingParams')
|
|
self._append_fragment(ctx, processed_fragment)
|
|
return continuation_id, offset, click_tracking_params
|
|
|
|
def try_refresh_replay_beginning(live_chat_continuation):
|
|
# choose the second option that contains the unfiltered live chat replay
|
|
refresh_continuation = try_get(
|
|
live_chat_continuation,
|
|
lambda x: x['header']['liveChatHeaderRenderer']['viewSelector']['sortFilterSubMenuRenderer']['subMenuItems'][1]['continuation']['reloadContinuationData'], dict)
|
|
if refresh_continuation:
|
|
# no data yet but required to call _append_fragment
|
|
self._append_fragment(ctx, b'')
|
|
refresh_continuation_id = refresh_continuation.get('continuation')
|
|
offset = 0
|
|
click_tracking_params = refresh_continuation.get('trackingParams')
|
|
return refresh_continuation_id, offset, click_tracking_params
|
|
return parse_actions_replay(live_chat_continuation)
|
|
|
|
live_offset = 0
|
|
|
|
def parse_actions_live(live_chat_continuation):
|
|
nonlocal live_offset
|
|
continuation_id = click_tracking_params = None
|
|
processed_fragment = bytearray()
|
|
for action in live_chat_continuation.get('actions', []):
|
|
timestamp = self.parse_live_timestamp(action)
|
|
if timestamp is not None:
|
|
live_offset = timestamp - start_time
|
|
# compatibility with replay format
|
|
pseudo_action = {
|
|
'replayChatItemAction': {'actions': [action]},
|
|
'videoOffsetTimeMsec': str(live_offset),
|
|
'isLive': True,
|
|
}
|
|
processed_fragment.extend(
|
|
json.dumps(pseudo_action, ensure_ascii=False).encode() + b'\n')
|
|
continuation_data_getters = [
|
|
lambda x: x['continuations'][0]['invalidationContinuationData'],
|
|
lambda x: x['continuations'][0]['timedContinuationData'],
|
|
]
|
|
continuation_data = try_get(live_chat_continuation, continuation_data_getters, dict)
|
|
if continuation_data:
|
|
continuation_id = continuation_data.get('continuation')
|
|
click_tracking_params = continuation_data.get('clickTrackingParams')
|
|
timeout_ms = int_or_none(continuation_data.get('timeoutMs'))
|
|
if timeout_ms is not None:
|
|
time.sleep(timeout_ms / 1000)
|
|
self._append_fragment(ctx, processed_fragment)
|
|
return continuation_id, live_offset, click_tracking_params
|
|
|
|
def download_and_parse_fragment(url, frag_index, request_data=None, headers=None):
|
|
for retry in RetryManager(self.params.get('fragment_retries'), self.report_retry, frag_index=frag_index):
|
|
try:
|
|
success = dl_fragment(url, request_data, headers)
|
|
if not success:
|
|
return False, None, None, None
|
|
raw_fragment = self._read_fragment(ctx)
|
|
try:
|
|
data = ie.extract_yt_initial_data(video_id, raw_fragment.decode('utf-8', 'replace'))
|
|
except RegexNotFoundError:
|
|
data = None
|
|
if not data:
|
|
data = json.loads(raw_fragment)
|
|
live_chat_continuation = try_get(
|
|
data,
|
|
lambda x: x['continuationContents']['liveChatContinuation'], dict) or {}
|
|
|
|
func = (info_dict['protocol'] == 'youtube_live_chat' and parse_actions_live
|
|
or frag_index == 1 and try_refresh_replay_beginning
|
|
or parse_actions_replay)
|
|
return (True, *func(live_chat_continuation))
|
|
except urllib.error.HTTPError as err:
|
|
retry.error = err
|
|
continue
|
|
return False, None, None, None
|
|
|
|
self._prepare_and_start_frag_download(ctx, info_dict)
|
|
|
|
success = dl_fragment(info_dict['url'])
|
|
if not success:
|
|
return False
|
|
raw_fragment = self._read_fragment(ctx)
|
|
try:
|
|
data = ie.extract_yt_initial_data(video_id, raw_fragment.decode('utf-8', 'replace'))
|
|
except RegexNotFoundError:
|
|
return False
|
|
continuation_id = try_get(
|
|
data,
|
|
lambda x: x['contents']['twoColumnWatchNextResults']['conversationBar']['liveChatRenderer']['continuations'][0]['reloadContinuationData']['continuation'])
|
|
# no data yet but required to call _append_fragment
|
|
self._append_fragment(ctx, b'')
|
|
|
|
ytcfg = ie.extract_ytcfg(video_id, raw_fragment.decode('utf-8', 'replace'))
|
|
|
|
if not ytcfg:
|
|
return False
|
|
api_key = try_get(ytcfg, lambda x: x['INNERTUBE_API_KEY'])
|
|
innertube_context = try_get(ytcfg, lambda x: x['INNERTUBE_CONTEXT'])
|
|
if not api_key or not innertube_context:
|
|
return False
|
|
visitor_data = try_get(innertube_context, lambda x: x['client']['visitorData'], str)
|
|
if info_dict['protocol'] == 'youtube_live_chat_replay':
|
|
url = 'https://www.youtube.com/youtubei/v1/live_chat/get_live_chat_replay?key=' + api_key
|
|
chat_page_url = 'https://www.youtube.com/live_chat_replay?continuation=' + continuation_id
|
|
elif info_dict['protocol'] == 'youtube_live_chat':
|
|
url = 'https://www.youtube.com/youtubei/v1/live_chat/get_live_chat?key=' + api_key
|
|
chat_page_url = 'https://www.youtube.com/live_chat?continuation=' + continuation_id
|
|
|
|
frag_index = offset = 0
|
|
click_tracking_params = None
|
|
while continuation_id is not None:
|
|
frag_index += 1
|
|
request_data = {
|
|
'context': innertube_context,
|
|
'continuation': continuation_id,
|
|
}
|
|
if frag_index > 1:
|
|
request_data['currentPlayerState'] = {'playerOffsetMs': str(max(offset - 5000, 0))}
|
|
if click_tracking_params:
|
|
request_data['context']['clickTracking'] = {'clickTrackingParams': click_tracking_params}
|
|
headers = ie.generate_api_headers(ytcfg=ytcfg, visitor_data=visitor_data)
|
|
headers.update({'content-type': 'application/json'})
|
|
fragment_request_data = json.dumps(request_data, ensure_ascii=False).encode() + b'\n'
|
|
success, continuation_id, offset, click_tracking_params = download_and_parse_fragment(
|
|
url, frag_index, fragment_request_data, headers)
|
|
else:
|
|
success, continuation_id, offset, click_tracking_params = download_and_parse_fragment(
|
|
chat_page_url, frag_index)
|
|
if not success:
|
|
return False
|
|
if test:
|
|
break
|
|
|
|
self._finish_frag_download(ctx, info_dict)
|
|
return True
|
|
|
|
@staticmethod
|
|
def parse_live_timestamp(action):
|
|
action_content = dict_get(
|
|
action,
|
|
['addChatItemAction', 'addLiveChatTickerItemAction', 'addBannerToLiveChatCommand'])
|
|
if not isinstance(action_content, dict):
|
|
return None
|
|
item = dict_get(action_content, ['item', 'bannerRenderer'])
|
|
if not isinstance(item, dict):
|
|
return None
|
|
renderer = dict_get(item, [
|
|
# text
|
|
'liveChatTextMessageRenderer', 'liveChatPaidMessageRenderer',
|
|
'liveChatMembershipItemRenderer', 'liveChatPaidStickerRenderer',
|
|
# ticker
|
|
'liveChatTickerPaidMessageItemRenderer',
|
|
'liveChatTickerSponsorItemRenderer',
|
|
# banner
|
|
'liveChatBannerRenderer',
|
|
])
|
|
if not isinstance(renderer, dict):
|
|
return None
|
|
parent_item_getters = [
|
|
lambda x: x['showItemEndpoint']['showLiveChatItemEndpoint']['renderer'],
|
|
lambda x: x['contents'],
|
|
]
|
|
parent_item = try_get(renderer, parent_item_getters, dict)
|
|
if parent_item:
|
|
renderer = dict_get(parent_item, [
|
|
'liveChatTextMessageRenderer', 'liveChatPaidMessageRenderer',
|
|
'liveChatMembershipItemRenderer', 'liveChatPaidStickerRenderer',
|
|
])
|
|
if not isinstance(renderer, dict):
|
|
return None
|
|
return int_or_none(renderer.get('timestampUsec'), 1000)
|