1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-07-08 22:38:33 +00:00

[ie/NHKRadiru] rework extended metadata to use new API

NHK came up with a new API and rearranged all the information to make things more complicated
Then they wrote lots of javascript to un-rearrange it back to the way it was before for display on the site
most of this commit is replicating that I'm afraid, ie bashing strings together

there is some new metadata though which is nice
This commit is contained in:
garret1317 2025-03-23 07:02:06 +00:00
parent 159dbb37e1
commit 010cee8c65

View File

@ -8,6 +8,7 @@
get_element_by_class, get_element_by_class,
int_or_none, int_or_none,
join_nonempty, join_nonempty,
orderedSet,
parse_duration, parse_duration,
remove_end, remove_end,
traverse_obj, traverse_obj,
@ -683,10 +684,120 @@ class NhkRadiruIE(InfoExtractor):
_API_URL_TMPL = None _API_URL_TMPL = None
# the following few functions are ported from https://www.nhk.or.jp/radio/assets/js/timetable_detail_new.js
def _format_actlist(self, act_list):
role_groups = {}
for act in act_list:
role = act['role']
if role not in role_groups:
role_groups[role] = []
role_groups[role].append(act)
formatted_roles = []
for role, acts in role_groups.items():
for i, act in enumerate(acts):
res = f'{role}' if i == 0 else ''
if title := act.get('title'):
res += f'{title}'
res += act.get('name')
formatted_roles.append(res)
return join_nonempty(*formatted_roles, delim='')
def _fetch_artists(self, by_artist):
if not by_artist or len(by_artist) == 0:
return None
artists = []
for artist in by_artist:
res = ''
name = artist.get('name')
role = artist.get('role')
part = artist.get('part')
if role != '':
res += f'{role}'
if part != '':
res += f'{part}'
res += name
if res != '':
artists.append(res)
if len(artists) == 0:
return None
return ''.join(artists)
def _fetch_duration(self, duration):
d = parse_duration(duration)
if d is None:
return None
hours, remainder = divmod(d, 3600)
minutes, seconds = divmod(remainder, 60)
res = ''
if hours > 0:
res += f'{int(hours)}時間'
if minutes > 0:
res += f'{int(minutes)}'
res += f'{int(seconds):02}秒)'
return res
def _format_musiclist(self, music_list):
if not music_list or len(music_list) == 0:
return None
tracks = []
for track in music_list:
track_details = []
if name := track.get('name'):
track_details.append(f'{name}')
if lyricist := track.get('lyricist'):
track_details.append(f'{lyricist}:作詞')
if composer := track.get('composer'):
track_details.append(f'{composer}:作曲')
if arranger := track.get('arranger'):
track_details.append(f'{arranger}:編曲')
track_details.append(self._fetch_artists(track.get('byArtist')))
track_details.append(self._fetch_duration(track.get('duration')))
if track.get('label') or track.get('code'):
track_details.append('' + join_nonempty('label', 'code', delim=' ', from_dict=track) + '')
if location := track.get('location'):
track_details.append(f'{location}')
tracks.append(join_nonempty(*track_details, delim='\n'))
return '\n\n'.join(tracks)
def _format_description(self, response):
act = traverse_obj(response, ('misc', 'actList', {self._format_actlist}))
music = traverse_obj(response, ('misc', 'musicList', {self._format_musiclist}))
desc = join_nonempty('epg80', 'epg200', delim='\n\n', from_dict=traverse_obj(response, 'detailedDescription'))
return join_nonempty(desc, act, music, delim='\n\n')
def _get_thumbnails(self, thumbs, name, preference=-1):
thumbnails = []
if thumbs is None or len(thumbs) == 0:
return []
for size, thumb in thumbs:
if size == 'copyright':
continue
thumbnails.append({**thumb,
'preference': preference,
'id': join_nonempty(name, size),
})
preference -= 1
return thumbnails
def _extract_extended_metadata(self, episode_id, aa_vinfo): def _extract_extended_metadata(self, episode_id, aa_vinfo):
service, _, area = traverse_obj(aa_vinfo, (2, {str}, {lambda x: (x or '').partition(',')})) service, _, area = traverse_obj(aa_vinfo, (2, {str}, {lambda x: (x or '').partition(',')}))
dateid = aa_vinfo[3]
detail_url = try_call( detail_url = try_call(
lambda: self._API_URL_TMPL.format(area=area, service=service, dateid=aa_vinfo[3])) lambda: self._API_URL_TMPL.format(broadcastEventId=join_nonempty(service, area, dateid, delim='-')))
if not detail_url: if not detail_url:
return {} return {}
@ -699,36 +810,38 @@ def _extract_extended_metadata(self, episode_id, aa_vinfo):
if error := traverse_obj(response, ('error', {dict})): if error := traverse_obj(response, ('error', {dict})):
self.report_warning( self.report_warning(
'Failed to get extended metadata. API returned ' 'Failed to get extended metadata. API returned '
f'Error {join_nonempty("code", "message", from_dict=error, delim=": ")}') f'Error {join_nonempty("statuscode", "message", from_dict=error, delim=": ")}')
return {} return {}
full_meta = traverse_obj(response, ('list', service, 0, {dict})) station = traverse_obj(response, ('publishedOn', 'broadcastDisplayName'))
if not full_meta:
self.report_warning('Failed to get extended metadata. API returned empty list.')
return {}
station = ' '.join(traverse_obj(full_meta, (('service', 'area'), 'name', {str}))) or None about = response.get('about')
thumbnails = [{ thumbnails = []
'id': str(id_), thumbnails.extend(self._get_thumbnails(traverse_obj(about, ('eyecatch', {dict.items})), ''))
'preference': 1 if id_.startswith('thumbnail') else -2 if id_.startswith('logo') else -1, if eyecatch_list := about.get('eyecatchList'):
**traverse_obj(thumb, { for num, v in enumerate(eyecatch_list):
'url': 'url', thumbnails.extend(self._get_thumbnails(v.items(), join_nonempty('list', num), preference=-2))
'width': ('width', {int_or_none}), thumbnails.extend(self._get_thumbnails(traverse_obj(about, ('partOfSeries', 'eyecatch', {dict.items})), 'series', preference=-3))
'height': ('height', {int_or_none}),
}),
} for id_, thumb in traverse_obj(full_meta, ('images', {dict.items}, lambda _, v: v[1]['url']))]
return filter_dict({ return filter_dict({
'description': self._format_description(response),
'cast': traverse_obj(response, ('misc', 'actList', ..., 'name')),
'thumbnails': thumbnails,
**traverse_obj(response, {
'title': ('name', {str}),
'timestamp': ('endDate', {unified_timestamp}),
'release_timestamp': ('startDate', {unified_timestamp}),
'duration': ('duration', {parse_duration}),
}),
**traverse_obj(response, ('identifierGroup', {
'series': 'radioSeriesName',
'series_id': 'radioSeriesId',
'episode': 'radioEpisodeName',
'episode_id': 'radioEpisodeId',
'categories': ('genre', ..., ['name1', 'name2'], all, {orderedSet}),
})),
'channel': station, 'channel': station,
'uploader': station, 'uploader': station,
'description': join_nonempty(
'subtitle', 'content', 'act', 'music', delim='\n\n', from_dict=full_meta),
'thumbnails': thumbnails,
**traverse_obj(full_meta, {
'title': ('title', {str}),
'timestamp': ('end_time', {unified_timestamp}),
'release_timestamp': ('start_time', {unified_timestamp}),
}),
}) })
def _extract_episode_info(self, episode, programme_id, series_meta): def _extract_episode_info(self, episode, programme_id, series_meta):