diff --git a/yt_dlp/extractor/vk.py b/yt_dlp/extractor/vk.py index 17cdcab4cb..3831f04260 100644 --- a/yt_dlp/extractor/vk.py +++ b/yt_dlp/extractor/vk.py @@ -756,11 +756,102 @@ def _real_extract(self, url): clean_html(get_element_by_class('wall_post_text', webpage))) -class VKMusicIE(VKBaseIE): - IE_NAME = 'vk:music' +class VKMusicBaseIE(VKBaseIE): + _BASE64_CHARS = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN0PQRSTUVWXYZO123456789+/=' - # Debug and test on https://regexr.com/8dlot - _VALID_URL = r'https?://(?:(?:m|new)\.)?vk\.com/(?:audio(?P-?\d+_\d+)|(?:.*[\?&](?:act|z)=audio_playlist|music/[a-z]+/)(?P(?P-?\d+)_(?P\d+)))(?:(?:%2F|_|[?&]access_hash=)(?P[0-9a-f]+))?' + def _b64_decode(self, enc): + dec = '' + e = n = 0 + for c in enc: + r = self._BASE64_CHARS.index(c) + cond = n % 4 + e = 64 * e + r if cond else r + n += 1 + if cond: + dec += chr(255 & e >> (-2 * n & 6)) + return dec + + def _unmask_url(self, mask_url, vk_id): + if 'audio_api_unavailable' in mask_url: + extra = mask_url.split('?extra=')[1].split('#') + _, base = self._b64_decode(extra[1]).split(chr(11)) + mask_url = list(self._b64_decode(extra[0])) + url_len = len(mask_url) + indexes = [None] * url_len + index = int(base) ^ vk_id + for n in range(url_len - 1, -1, -1): + index = (url_len * (n + 1) ^ index + n) % url_len + indexes[n] = index + for n in range(1, url_len): + c = mask_url[n] + index = indexes[url_len - 1 - n] + mask_url[n] = mask_url[index] + mask_url[index] = c + mask_url = ''.join(mask_url) + return mask_url + + def _parse_track_meta(self, meta, track_id=None): + len_ = len(meta) + info = {} + + info['id'] = f'{meta[1]}_{meta[0]}' \ + if len_ >= 2 and meta[1] and meta[0] \ + else track_id + + title = unescapeHTML(meta[3]) if len_ >= 3 else None # TODO: fallback + artist = unescapeHTML(meta[4]) if len_ >= 4 else None # artists in one string, may include "feat." + info['title'] = join_nonempty(artist, title, delim=' - ') + info['track'] = title + info['uploader'] = artist + + # artists as list + info['artists'] = ( + # not htmlescaped unlike meta[4] + traverse_obj((*meta[17], *meta[18]), (..., 'name')) + if len_ >= 18 else None + ) or [artist] + + info['duration'] = int_or_none(meta[5]) if len_ >= 5 else None + info['thumbnails'] = [{'url': meta[14]}] if len_ >= 14 else [] + + # meta[30] is 2 bits + # most significant: isExplicit + # least significant: isForeignAgent + # i. e. + # 00 = safe + # 01 = marked by RKN as "foreign agent" + # 10 = explicit lyrics + # 11 = both E lyrics and "foreign agent" + if len_ >= 30 and meta[30]: + info['age_limit'] = 18 + + return info + + def _raise_if_blocked(self, meta, track_id): + reason = traverse_obj( + self._parse_json( + meta[12] if len(meta) >= 12 else None, + track_id, fatal=False), + ('claim', 'reason')) + + if reason == 'geo': + self.raise_geo_restricted() + # can be an empty string + elif reason is not None: + raise ExtractorError( + 'This track is unavailable. ' + f'Reason code: {reason:r}') + + +class VKMusicTrackIE(VKMusicBaseIE): + IE_NAME = 'vkmusic:track' + + _VALID_URL = r'''(?x) + https?:// + (?:(?:m|new)\.)?vk\.(?:com|ru)/ + audio(?P-?\d+_\d+) + (?:(?:%2F|_)(?P[0-9a-f]+))? + ''' _TESTS = [ { @@ -814,6 +905,102 @@ class VKMusicIE(VKBaseIE): 'skip_download': True, }, }, + { + 'note': 'special symbols in title and artist must be unescaped', + 'url': 'https://vk.com/audio-2001069891_6069891', + 'info_dict': { + 'id': '-2001069891_6069891', + 'ext': 'm4a', + 'title': 'Jack Thomas feat. Nico & Vinz - Rivers (feat. Nico & Vinz)', + 'track': 'Rivers (feat. Nico & Vinz)', + 'uploader': 'Jack Thomas feat. Nico & Vinz', + 'artists': ['Jack Thomas', 'Nico & Vinz'], + 'duration': 207, + 'thumbnail': r're:https?://.*\.jpg', + }, + 'params': { + 'skip_download': True, + }, + }, + ] + + def _real_extract(self, url): + mobj = self._match_valid_url(url) + track_id = mobj.group('id') + access_hash = mobj.group('hash') + + if not access_hash: + webpage = self._download_webpage(url, track_id) + + data_audio = self._search_regex( + r'data-audio="([^"]+)', webpage, 'data-audio attr', + default=None, group=1) + + if data_audio: + meta = self._parse_json(unescapeHTML(data_audio), track_id) + else: + if self._parse_vk_id() == 0: + self.raise_login_required( + 'This track is unavailable. ' + 'Log in or provide a link with access hash') + + data_exec = self._search_regex( + r'class="AudioPlayerBlock__root"[^>]+data-exec="([^"]+)', + webpage, 'AudioPlayerBlock data-exec', group=1) + + meta = traverse_obj( + self._parse_json(unescapeHTML(data_exec), track_id), + ('AudioPlayerBlock/init', 'firstAudio')) + + del data_exec + + del data_audio + del webpage + + self._raise_if_blocked(meta, track_id) + + access_hash = meta[24] + + meta = self._download_payload('al_audio', track_id, { + 'act': 'reload_audios', + 'audio_ids': f'{track_id}_{access_hash}', + })[0] + + # vk sends an empty list when auth required + if not meta: + self.raise_login_required() + + meta = meta[0] + self._raise_if_blocked(meta, track_id) + url = self._unmask_url(meta[2], self._parse_vk_id()) + + return { + **self._parse_track_meta(meta, track_id), + 'formats': [{ + 'url': url, + 'ext': 'm4a', + 'vcodec': 'none', + 'acodec': 'mp3', + 'container': 'm4a_dash', + }], + } + + +class VKMusicPlaylistIE(VKMusicBaseIE): + IE_NAME = 'vkmusic:playlist' + + _VALID_URL = r'''(?x) + https?:// + (?:(?:m|new)\.)?vk\.(?:com|ru)/ + (?: + music/(?:album|playlist)/| + .*[?&](?:act|z)=audio_playlist + ) + (?P(?P-?\d+)_(?P\d+)) + (?:(?:%2F|_|[?&]access_hash=)(?P[0-9a-f]+))? + ''' + + _TESTS = [ { 'url': 'https://vk.com/artist/linkinpark/releases?z=audio_playlist-2000984503_984503%2Fc468f3a862b6f73b55', 'info_dict': { @@ -835,23 +1022,6 @@ class VKMusicIE(VKBaseIE): 'skip_download': True, }, }, - { - 'note': 'special symbols in title and artist must be unescaped', - 'url': 'https://vk.com/audio-2001069891_6069891', - 'info_dict': { - 'id': '-2001069891_6069891', - 'ext': 'm4a', - 'title': 'Jack Thomas feat. Nico & Vinz - Rivers (feat. Nico & Vinz)', - 'track': 'Rivers (feat. Nico & Vinz)', - 'uploader': 'Jack Thomas feat. Nico & Vinz', - 'artists': ['Jack Thomas', 'Nico & Vinz'], - 'duration': 207, - 'thumbnail': r're:https?://.*\.jpg', - }, - 'params': { - 'skip_download': True, - }, - }, { 'url': 'https://vk.com/audios877252112?block=playlists§ion=general&z=audio_playlist-147845620_2390', 'info_dict': { @@ -871,173 +1041,58 @@ class VKMusicIE(VKBaseIE): }, ] - def _parse_track_meta(self, meta, track_id=None): - len_ = len(meta) - info = {} - - info['id'] = f'{meta[1]}_{meta[0]}' \ - if len_ >= 2 and meta[1] and meta[0] \ - else track_id - - title = unescapeHTML(meta[3]) if len_ >= 3 else None # TODO: fallback - artist = unescapeHTML(meta[4]) if len_ >= 4 else None # artists in one string, may include "feat." - info['title'] = join_nonempty(artist, title, delim=' - ') - info['track'] = title - info['uploader'] = artist - - # artists as list - info['artists'] = ( - # not htmlescaped unlike meta[4] - traverse_obj((*meta[17], *meta[18]), (..., 'name')) - if len_ >= 18 else None - ) or [artist] - - info['duration'] = int_or_none(meta[5]) if len_ >= 5 else None - info['thumbnails'] = [{'url': meta[14]}] if len_ >= 14 else [] - - # meta[30] is 2 bits - # most significant: isExplicit - # least significant: isForeignAgent - # i. e. - # 00 = safe - # 01 = marked by RKN as "foreign agent" - # 10 = explicit lyrics - # 11 = both E lyrics and "foreign agent" - if len_ >= 30 and meta[30]: - info['age_limit'] = 18 - - return info - - def _raise_if_blocked(self, meta, track_id): - reason = traverse_obj( - self._parse_json( - meta[12] if len(meta) >= 12 else None, - track_id, fatal=False), - ('claim', 'reason')) - - if reason == 'geo': - self.raise_geo_restricted() - # can be an empty string - elif reason is not None: - raise ExtractorError( - 'This track is unavailable. ' - f'Reason code: {reason:r}') - def _real_extract(self, url): mobj = self._match_valid_url(url) - track_id = mobj.group('track_id') - playlist_id = mobj.group('playlist_id') - access_hash = mobj.group('access_hash') + playlist_id = mobj.group('full_id') - if track_id: - if not access_hash: - webpage = self._download_webpage(url, track_id) + meta = self._download_payload('al_audio', playlist_id, { + 'act': 'load_section', + 'access_hash': mobj.group('hash') or '', + 'claim': '0', + 'context': '', + 'from_id': self._parse_vk_id(), + 'is_loading_all': '1', + 'is_preload': '0', + 'offset': '0', + 'owner_id': mobj.group('oid'), + 'playlist_id': mobj.group('id'), + 'ref': '', + 'type': 'playlist', + })[0] + tracks = meta['list'] - data_audio = self._search_regex( - r'data-audio="([^"]+)', webpage, 'data-audio attr', - default=None, group=1) + entries = [] + for ent in tracks: + info = self._parse_track_meta(ent) + track_id = info.pop('id') + title = info.pop('title') - if data_audio: - meta = self._parse_json(unescapeHTML(data_audio), track_id) - else: - if self._parse_vk_id() == 0: - self.raise_login_required( - 'This track is unavailable. ' - 'Log in or provide a link with access hash') + ent_hash = f'_{ent[24]}' if len(ent) >= 24 and ent[24] else '' + audio_url = f'https://vk.com/audio{track_id}{ent_hash}' - data_exec = self._search_regex( - r'class="AudioPlayerBlock__root"[^>]+data-exec="([^"]+)', - webpage, 'AudioPlayerBlock data-exec', group=1) + entries.append(self.url_result( + audio_url, VKMusicTrackIE, track_id, title, **info)) - meta = traverse_obj( - self._parse_json(unescapeHTML(data_exec), track_id), - ('AudioPlayerBlock/init', 'firstAudio')) + title = unescapeHTML(meta.get('title')) # TODO: fallback + artist = unescapeHTML(meta.get('authorName')) + genre, year = self._search_regex( + r'^([^<]+)<\s*span[^>]*>[^<]*(\d+)$', + meta.get('infoLine1'), 'genre and release year', + default=(None, None), fatal=False, group=(1, 2)) + is_album = year is not None - del data_exec - - del data_audio - del webpage - - self._raise_if_blocked(meta, track_id) - - access_hash = meta[24] - - meta = self._download_payload('al_audio', track_id, { - 'act': 'reload_audios', - 'audio_ids': f'{track_id}_{access_hash}', - })[0] - - # vk sends an empty list when auth required - if not meta: - self.raise_login_required() - - meta = meta[0] - - self._raise_if_blocked(meta, track_id) - - url = _unmask_url(meta[2], self._parse_vk_id()) - - return { - **self._parse_track_meta(meta, track_id), - 'formats': [{ - 'url': url, - 'ext': 'm4a', - 'vcodec': 'none', - 'acodec': 'mp3', - 'container': 'm4a_dash', - }], - } - - elif playlist_id: - meta = self._download_payload('al_audio', playlist_id, { - 'act': 'load_section', - 'access_hash': access_hash or '', - 'claim': '0', - 'context': '', - 'from_id': self._parse_vk_id(), - 'is_loading_all': '1', - 'is_preload': '0', - 'offset': '0', - 'owner_id': mobj.group('pl_oid'), - 'playlist_id': mobj.group('pl_id'), - 'ref': '', - 'type': 'playlist', - })[0] - - tracks = meta['list'] - - entries = [] - - for ent in tracks: - info = self._parse_track_meta(ent) - ent_access = f'_{ent[24]}' if len(ent) >= 24 and ent[24] else '' - track_id = info.pop('id') - title = info.pop('title') - audio_url = f'https://vk.com/audio{track_id}{ent_access}' - - entries.append(self.url_result( - audio_url, VKMusicIE, track_id, title, **info)) - - title = unescapeHTML(meta.get('title')) # TODO: fallback - artist = unescapeHTML(meta.get('authorName')) - genre, year = self._search_regex( - r'^([^<]+)<\s*span[^>]*>[^<]*(\d+)$', - meta.get('infoLine1'), 'genre and release year', - default=(None, None), fatal=False, group=(1, 2)) - is_album = year is not None - - return self.playlist_result( - entries, playlist_id, - join_nonempty(artist, title, delim=' - ') if is_album else title, - unescapeHTML(meta.get('rawDescription')), - album=title if is_album else None, - uploader=artist, - artists=[artist] if is_album else None, - thumbnail=meta.get('coverUrl'), # XXX: should i also specify `thumbnails`? - genres=[unescapeHTML(genre)] if genre else None, - release_year=int_or_none(year), - modified_timestamp=int_or_none(meta.get('lastUpdated')), - view_count=int_or_none(meta.get('listens'))) + return self.playlist_result( + entries, playlist_id, + join_nonempty(artist, title, delim=' - ') if is_album else title, + unescapeHTML(meta.get('rawDescription')), + album=title if is_album else None, + uploader=artist, + artists=[artist] if is_album else None, + thumbnail=meta.get('coverUrl'), # XXX: should i also specify `thumbnails`? + genres=[unescapeHTML(genre)] if genre else None, + release_year=int_or_none(year), + modified_timestamp=int_or_none(meta.get('lastUpdated')), + view_count=int_or_none(meta.get('listens'))) class VKPlayBaseIE(InfoExtractor): @@ -1187,39 +1242,3 @@ def _real_extract(self, url): **self._extract_common_meta(stream_info), 'formats': formats, } - - -_BASE64_CHARS = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN0PQRSTUVWXYZO123456789+/=' - - -def _b64_decode(enc): - dec = '' - e = n = 0 - for c in enc: - r = _BASE64_CHARS.index(c) - cond = n % 4 - e = 64 * e + r if cond else r - n += 1 - if cond: - dec += chr(255 & e >> (-2 * n & 6)) - return dec - - -def _unmask_url(mask_url, vk_id): - if 'audio_api_unavailable' in mask_url: - extra = mask_url.split('?extra=')[1].split('#') - func, base = _b64_decode(extra[1]).split(chr(11)) - mask_url = list(_b64_decode(extra[0])) - url_len = len(mask_url) - indexes = [None] * url_len - index = int(base) ^ vk_id - for n in range(url_len - 1, -1, -1): - index = (url_len * (n + 1) ^ index + n) % url_len - indexes[n] = index - for n in range(1, url_len): - c = mask_url[n] - index = indexes[url_len - 1 - n] - mask_url[n] = mask_url[index] - mask_url[index] = c - mask_url = ''.join(mask_url) - return mask_url