diff --git a/yt_dlp/extractor/ciscolive.py b/yt_dlp/extractor/ciscolive.py index 8d847d582..3e895f197 100644 --- a/yt_dlp/extractor/ciscolive.py +++ b/yt_dlp/extractor/ciscolive.py @@ -1,13 +1,15 @@ import itertools +import json +import urllib from .common import InfoExtractor from ..utils import ( clean_html, - urllib, # Add this import float_or_none, int_or_none, parse_qs, - try_get, + traverse_obj, + try_get, write_string, urlencode_postdata, ) @@ -16,9 +18,9 @@ class CiscoLiveBaseIE(InfoExtractor): # These appear to be constant across all Cisco Live presentations # and are not tied to any user session or event - RAINFOCUS_API_URL = 'https://events.rainfocus.com/api/%s' # Keep this as is - RAINFOCUS_API_PROFILE_ID = 'HEedDIRblcZk7Ld3KHm1T0VUtZog9eG9' # Updated from browser - RAINFOCUS_WIDGET_ID = 'M7n14I8sz0pklW1vybwVRdKrgdREj8sR' # Updated from browser + RAINFOCUS_API_URL = 'https://events.rainfocus.com/api/%s' + RAINFOCUS_API_PROFILE_ID = 'HEedDIRblcZk7Ld3KHm1T0VUtZog9eG9' + RAINFOCUS_WIDGET_ID = 'M7n14I8sz0pklW1vybwVRdKrgdREj8sR' BRIGHTCOVE_URL_TEMPLATE = 'http://players.brightcove.net/5647924234001/SyK2FdqjM_default/index.html?videoId=%s' # Origin header will be set dynamically in _call_api HEADERS = { @@ -47,14 +49,14 @@ def _parse_rf_item(self, rf_item): presenter_name = try_get(rf_item, lambda x: x['participants'][0]['fullName']) bc_id = rf_item['videos'][0]['url'] bc_url = self.BRIGHTCOVE_URL_TEMPLATE % bc_id - duration = float_or_none(try_get(rf_item, lambda x: x['times'][0]['length'])) - location = try_get(rf_item, lambda x: x['times'][0]['room']) + duration = float_or_none(traverse_obj(rf_item, ('times', 0, 'length'))) + location = traverse_obj(rf_item, ('times', 0, 'room')) if duration: duration = duration * 60 return { - '_type': 'url_transparent', + '_type': 'url_transparent', 'url': bc_url, 'ie_key': 'BrightcoveNew', 'title': title, @@ -119,19 +121,17 @@ def suitable(cls, url): return False if CiscoLiveSessionIE.suitable(url) else super().suitable(url) def _check_bc_id_exists(self, rf_item): - # Ensure rf_item is a dictionary and 'videos' key exists and is a list + if not isinstance(rf_item, dict) or not isinstance(rf_item.get('videos'), list) or not rf_item['videos']: - self.write_debug(f"Item missing 'videos' list or 'videos' is not a list/empty: {rf_item.get('title', rf_item.get('id', 'Unknown item'))}") + self.write_debug(f'Item missing "videos" list or "videos" is not a list/empty: {rf_item.get("title", rf_item.get("id", "Unknown item"))}') return False - # Ensure the first video entry has a 'url' + if not isinstance(rf_item['videos'][0], dict) or 'url' not in rf_item['videos'][0]: - self.write_debug(f"Item's first video entry missing 'url': {rf_item.get('title', rf_item.get('id', 'Unknown item'))}") + self.write_debug(f'Item\'s first video entry missing "url": {rf_item.get("title", rf_item.get("id", "Unknown item"))}') return False return int_or_none(try_get(rf_item, lambda x: x['videos'][0]['url'])) is not None def _entries(self, query, url): - # query is the processed query from _real_extract - # It should contain parameters like 'technology', 'event', 'type' current_page_query = query.copy() current_page_query['size'] = 50 current_page_query['from'] = 0 @@ -143,34 +143,34 @@ def _entries(self, query, url): f'Downloading search JSON page {page_num}') if self.get_param('verbose'): - write_string(f"\n\n[debug] API response for page {page_num}:\n") - import json - write_string(json.dumps(results, indent=2) + "\n\n") + write_string(f'\n\n[debug] API response for page {page_num}:\n') + write_string(json.dumps(results, indent=2) + '\n\n') - sl = try_get(results, lambda x: x['sectionList'][0], dict) + sl = traverse_obj(results, ('sectionList', 0, {dict})) - # Determine the source of items and pagination data - items_data_source = results # Default to root - source_name_for_debug = "root of results" + + items_data_source = results + source_name_for_debug = 'root of results' - if sl: # if sectionList[0] exists - # Check if this first section itself contains an 'items' list + if sl: if isinstance(sl.get('items'), list): - self.write_debug("Using items, total, and size from sectionList[0]") + self.write_debug('Using items, total, and size from sectionList[0]') items_data_source = sl - source_name_for_debug = "sectionList[0]" + source_name_for_debug = 'sectionList[0]' else: self.write_debug( - "sectionList[0] exists but has no 'items' key. " - "Using items, total, and size from root of results (if available).") - # items_data_source remains 'results' (root) + 'sectionList[0] exists but has no "items" key. ' + 'Using items, total, and size from root of results (if available).') + + else: - self.write_debug("No sectionList found. Using items, total, and size from root of results.") - # items_data_source remains 'results' (root) + self.write_debug('No sectionList found. Using items, total, and size from root of results.') + + items = items_data_source.get('items') if not items or not isinstance(items, list): - self.write_debug(f"No 'items' list found in {source_name_for_debug}.") + self.write_debug(f'No "items" list found in {source_name_for_debug}.') break for item in items: @@ -178,7 +178,7 @@ def _entries(self, query, url): continue if not self._check_bc_id_exists(item): self.write_debug(f"Skipping item without Brightcove ID: {item.get('title', item.get('id', 'Unknown item'))}") - continue + continue # Ensure this is single-quoted if it was double yield self._parse_rf_item(item) # Pagination logic using items_data_source for total and size @@ -188,27 +188,27 @@ def _entries(self, query, url): total = int_or_none(items_data_source.get('total')) if total is not None and (current_page_query['from'] + current_page_size) >= total: self.write_debug( - f"Reached end of results based on 'total': {total} from {source_name_for_debug}, " - f"current 'from': {current_page_query['from']}, 'size': {current_page_size}.") + f'Reached end of results based on "total": {total} from {source_name_for_debug}, ' + f'current "from": {current_page_query["from"]}, "size": {current_page_size}.') break - if not items and page_num > 1: # No items on a subsequent page - self.write_debug("No items found on subsequent page, stopping pagination.") + if not items and page_num > 1: + self.write_debug('No items found on subsequent page, stopping pagination.') break current_page_query['from'] += current_page_size - if page_size_from_response is not None: # API might dictate a different page size + if page_size_from_response is not None: current_page_query['size'] = page_size_from_response def _real_extract(self, url): raw_query_params = parse_qs(url) - self.write_debug(f"Raw query parameters from URL: {raw_query_params}") + self.write_debug(f'Raw query parameters from URL: {raw_query_params}') # Initialize api_query with parameters that are always sent or have defaults, # based on browser inspection. api_query = { 'type': 'session', - 'search': '', # Static parameter from browser payload + 'search': '', # Static parameter from browser payload # Ensure this is single-quoted if it was double 'browserTimezone': 'America/New_York', # Default, as seen in browser 'catalogDisplay': 'grid', # Default, as seen in browser } @@ -223,12 +223,12 @@ def _real_extract(self, url): # The 'search: ""' is added above. # We only care about actual filter parameters like 'search.technology', 'search.event'. if key == 'search' and value_list[0].startswith('#'): - self.write_debug(f"Skipping fragment query parameter: {key}={value_list[0]}") + self.write_debug(f'Skipping fragment query parameter: {key}={value_list[0]}') continue api_query[key] = value_list[0] # Keep original key name, e.g., 'search.technology' - self.write_debug(f"Processed API query parameters (before size/from): {api_query}") + self.write_debug(f'Processed API query parameters (before size/from): {api_query}') return self.playlist_result( self._entries(api_query, url), playlist_title='Search query')