diff --git a/yt_dlp/extractor/onsen.py b/yt_dlp/extractor/onsen.py index 9bc2c9b222..809d4ca282 100644 --- a/yt_dlp/extractor/onsen.py +++ b/yt_dlp/extractor/onsen.py @@ -1,4 +1,5 @@ import base64 +import contextlib import datetime as dt import json @@ -8,6 +9,7 @@ parse_qs, str_or_none, strip_or_none, + url_or_none, ) from ..utils.traversal import traverse_obj @@ -19,7 +21,7 @@ class OnsenIE(InfoExtractor): _BASE_URL = 'https://www.onsen.ag/' _HEADERS = {'Referer': _BASE_URL} _NETRC_MACHINE = 'onsen' - _VALID_URL = r'https?://(?:(?:share|www)\.)onsen\.ag/program/(?P[\w-]+)' + _VALID_URL = r'https?://(?:(?:share|www)\.)onsen\.ag/program/(?P[^/?#]+)' _TESTS = [{ 'url': 'https://share.onsen.ag/program/onsenking?p=90&c=MTA0NjI', 'info_dict': { @@ -35,7 +37,7 @@ class OnsenIE(InfoExtractor): 'series': '音泉キング「下野紘」のラジオ きみはもちろん、<音泉>ファミリーだよね?', 'series_id': 'onsenking', 'tags': ['音泉キング', '音泉ジュニア'], - 'thumbnail': r're:https://d3bzklg4lms4gh\.cloudfront\.net/program_info/image/default/production/.+$', + 'thumbnail': r're:https?://d3bzklg4lms4gh\.cloudfront\.net/program_info/image/default/production/.+$', 'upload_date': '20220627', 'webpage_url': 'https://www.onsen.ag/program/onsenking?c=MTA0NjI=', }, @@ -54,7 +56,7 @@ class OnsenIE(InfoExtractor): 'series': 'TVアニメ『ガールズバンドクライ』WEBラジオ「ガールズバンドクライ~ラジオにも全部ぶち込め。~」', 'series_id': 'girls-band-cry-radio', 'tags': ['ガールズバンドクライ', 'ガルクラ', 'ガルクラジオ'], - 'thumbnail': r're:https://d3bzklg4lms4gh\.cloudfront\.net/program_info/image/default/production/.+$', + 'thumbnail': r're:https?://d3bzklg4lms4gh\.cloudfront\.net/program_info/image/default/production/.+$', 'upload_date': '20240425', 'webpage_url': 'https://www.onsen.ag/program/girls-band-cry-radio?c=MTgwMDE=', }, @@ -69,9 +71,10 @@ class OnsenIE(InfoExtractor): }] def _perform_login(self, username, password): - signin = self._download_json( + sign_in = self._download_json( f'{self._BASE_URL}web_api/signin', None, 'Logging in', headers={ - 'content-type': 'application/json; charset=UTF-8', + 'Accept': 'application/json, text/plain, */*', + 'Content-Type': 'application/json; charset=UTF-8', }, data=json.dumps({ 'session': { 'email': username, @@ -79,65 +82,57 @@ def _perform_login(self, username, password): }, }).encode(), expected_status=401) - if signin.get('error'): + if sign_in.get('error'): raise ExtractorError('Invalid username or password', expected=True) - def _get_info(self, program, program_id, metadata): - m3u8 = program['streaming_url'] - display_id = base64.b64encode(str(program['id']).encode()).decode() - if ud := self._search_regex(rf'{program_id}0?(\d{{6}})', m3u8, 'upload_date', default=None): - try: - ud = dt.datetime.strptime(f'20{ud}', '%Y%m%d').strftime('%Y%m%d') - except ValueError: - ud = None - - return { - 'display_id': display_id, - 'formats': self._extract_m3u8_formats(m3u8, program_id, headers=self._HEADERS), - 'http_headers': self._HEADERS, - 'upload_date': ud, - 'webpage_url': f'{self._BASE_URL}program/{program_id}?c={display_id}', - **metadata, - **traverse_obj(program, { - 'id': ('id', {str_or_none}), - 'title': ('title', {strip_or_none}), - 'media_type': ('media_type', {str}), - 'thumbnail': ('poster_image_url', {lambda x: x.split('?')[0]}), - }), - 'cast': metadata['cast'] + traverse_obj(program, ('guests', ..., 'name', {str})), - } - def _real_extract(self, url): program_id = self._match_id(url) qs = {k: v[0] for k, v in parse_qs(url).items() if v} programs = self._download_json( f'{self._BASE_URL}web_api/programs/{program_id}', program_id) + enc_id = lambda p: base64.b64encode(str(p['id']).encode()).decode() - metadata = { - 'cast': traverse_obj(programs, ('performers', ..., 'name', {str})), + if 'c' not in qs: + entries = [self.url_result( + f'{url}?c={enc_id(program)}', OnsenIE, + ) for program in traverse_obj(programs, ('contents', lambda _, v: v['id']))] + + return self.playlist_result( + entries, program_id, programs['program_info']['title']) + + raw_id = base64.b64decode(qs['c'] + '=' * (-len(qs['c']) % 4)).decode() + p_keys = ('contents', lambda _, v: v['id'] == int(raw_id)) + if not (program := traverse_obj(programs, (*p_keys, any))): + raise ExtractorError('This program is no longer available', expected=True) + if not (m3u8_url := traverse_obj(program, ('streaming_url', {url_or_none}))): + self.raise_login_required('This program is only available for premium supporters') + + display_id = enc_id(program) + upload_date = None + if date_str := self._search_regex(rf'{program_id}0?(\d{{6}})', m3u8_url, 'date string', default=None): + with contextlib.suppress(ValueError): + upload_date = dt.datetime.strptime(f'20{date_str}', '%Y%m%d').strftime('%Y%m%d') + + return { + 'display_id': display_id, + 'formats': self._extract_m3u8_formats(m3u8_url, display_id, headers=self._HEADERS), + 'http_headers': self._HEADERS, 'section_start': int(qs.get('t', 0)), - 'series_id': program_id, - **traverse_obj(programs['program_info'], { + 'upload_date': upload_date, + 'webpage_url': f'{self._BASE_URL}program/{program_id}?c={display_id}', + **traverse_obj(program, { + 'id': ('id', {str_or_none}), + 'title': ('title', {strip_or_none}), + 'media_type': ('media_type', {str}), + 'thumbnail': ('poster_image_url', {url_or_none}, {lambda x: x.partition('?')[0]}), + }), + **traverse_obj(programs, { + 'cast': (('performers', (*p_keys, 'guests')), ..., 'name', {str}), + 'series_id': ('directory_name', {str_or_none}), + }), + **traverse_obj(programs, ('program_info', { 'description': ('description', {str}), 'series': ('title', {str}), - 'tags': ('hashtag_list', {list}), - }), + 'tags': ('hashtag_list', ..., {str}), + })), } - - if 'c' in qs: - p_id = base64.b64decode(qs['c'] + '=' * (-len(qs['c']) % 4)).decode() - program = traverse_obj( - programs, ('contents', lambda _, v: v['id'] == int(p_id)), get_all=False) - if not program: - raise ExtractorError('This program is no longer available', expected=True) - if not program['streaming_url']: - self.raise_login_required('This program is only available for premium supporters') - - return self._get_info(program, program_id, metadata) - else: - entries = [ - self._get_info(program, program_id, metadata) - for program in programs['contents'] if program['streaming_url'] - ] - - return self.playlist_result(entries, program_id, metadata['series'])