import base64 import functools import hashlib import itertools import json import random import re import string import time import urllib.parse import uuid from .common import InfoExtractor from ..networking import HEADRequest from ..utils import ( ExtractorError, UnsupportedError, UserNotLive, determine_ext, extract_attributes, filter_dict, format_field, int_or_none, join_nonempty, merge_dicts, mimetype2ext, parse_qs, qualities, srt_subtitles_timecode, str_or_none, truncate_string, try_call, try_get, url_or_none, urlencode_postdata, ) from ..utils.traversal import find_element, require, traverse_obj class TikTokBaseIE(InfoExtractor): _UPLOADER_URL_FORMAT = 'https://www.tiktok.com/@%s' _WEBPAGE_HOST = 'https://www.tiktok.com/' QUALITIES = ('360p', '540p', '720p', '1080p') _APP_INFO_DEFAULTS = { # unique "install id" 'iid': None, # TikTok (KR/PH/TW/TH/VN) = trill, TikTok (rest of world) = musical_ly, Douyin = aweme 'app_name': 'musical_ly', 'app_version': '35.1.3', 'manifest_app_version': '2023501030', # "app id": aweme = 1128, trill = 1180, musical_ly = 1233, universal = 0 'aid': '0', } _APP_INFO_POOL = None _APP_INFO = None _APP_USER_AGENT = None @functools.cached_property def _KNOWN_APP_INFO(self): # If we have a genuine device ID, we may not need any IID default = [''] if self._KNOWN_DEVICE_ID else [] return self._configuration_arg('app_info', default, ie_key=TikTokIE) @functools.cached_property def _KNOWN_DEVICE_ID(self): return self._configuration_arg('device_id', [None], ie_key=TikTokIE)[0] @functools.cached_property def _DEVICE_ID(self): return self._KNOWN_DEVICE_ID or str(random.randint(7250000000000000000, 7325099899999994577)) @functools.cached_property def _API_HOSTNAME(self): return self._configuration_arg( 'api_hostname', ['api16-normal-c-useast1a.tiktokv.com'], ie_key=TikTokIE)[0] def _get_next_app_info(self): if self._APP_INFO_POOL is None: defaults = { key: self._configuration_arg(key, [default], ie_key=TikTokIE)[0] for key, default in self._APP_INFO_DEFAULTS.items() if key != 'iid' } self._APP_INFO_POOL = [ {**defaults, **dict( (k, v) for k, v in zip(self._APP_INFO_DEFAULTS, app_info.split('/'), strict=False) if v )} for app_info in self._KNOWN_APP_INFO ] if not self._APP_INFO_POOL: return False self._APP_INFO = self._APP_INFO_POOL.pop(0) app_name = self._APP_INFO['app_name'] version = self._APP_INFO['manifest_app_version'] if app_name == 'musical_ly': package = f'com.zhiliaoapp.musically/{version}' else: # trill, aweme package = f'com.ss.android.ugc.{app_name}/{version}' self._APP_USER_AGENT = f'{package} (Linux; U; Android 13; en_US; Pixel 7; Build/TD1A.220804.031; Cronet/58.0.2991.0)' return True @staticmethod def _create_url(user_id, video_id): return f'https://www.tiktok.com/@{user_id or "_"}/video/{video_id}' def _get_sigi_state(self, webpage, display_id): return self._search_json( r']+\bid="(?:SIGI_STATE|sigi-persisted-data)"[^>]*>', webpage, 'sigi state', display_id, end_pattern=r'', default={}) def _get_universal_data(self, webpage, display_id): return traverse_obj(self._search_json( r']+\bid="__UNIVERSAL_DATA_FOR_REHYDRATION__"[^>]*>', webpage, 'universal data', display_id, end_pattern=r'', default={}), ('__DEFAULT_SCOPE__', {dict})) or {} def _call_api_impl(self, ep, video_id, query=None, data=None, headers=None, fatal=True, note='Downloading API JSON', errnote='Unable to download API page'): self._set_cookie(self._API_HOSTNAME, 'odin_tt', ''.join(random.choices('0123456789abcdef', k=160))) webpage_cookies = self._get_cookies(self._WEBPAGE_HOST) if webpage_cookies.get('sid_tt'): self._set_cookie(self._API_HOSTNAME, 'sid_tt', webpage_cookies['sid_tt'].value) return self._download_json( f'https://{self._API_HOSTNAME}/aweme/v1/{ep}/', video_id=video_id, fatal=fatal, note=note, errnote=errnote, headers={ 'User-Agent': self._APP_USER_AGENT, 'Accept': 'application/json', **(headers or {}), }, query=query, data=data) def _build_api_query(self, query): return filter_dict({ **query, 'device_platform': 'android', 'os': 'android', 'ssmix': 'a', '_rticket': int(time.time() * 1000), 'cdid': str(uuid.uuid4()), 'channel': 'googleplay', 'aid': self._APP_INFO['aid'], 'app_name': self._APP_INFO['app_name'], 'version_code': ''.join(f'{int(v):02d}' for v in self._APP_INFO['app_version'].split('.')), 'version_name': self._APP_INFO['app_version'], 'manifest_version_code': self._APP_INFO['manifest_app_version'], 'update_version_code': self._APP_INFO['manifest_app_version'], 'ab_version': self._APP_INFO['app_version'], 'resolution': '1080*2400', 'dpi': 420, 'device_type': 'Pixel 7', 'device_brand': 'Google', 'language': 'en', 'os_api': '29', 'os_version': '13', 'ac': 'wifi', 'is_pad': '0', 'current_region': 'US', 'app_type': 'normal', 'sys_region': 'US', 'last_install_time': int(time.time()) - random.randint(86400, 1123200), 'timezone_name': 'America/New_York', 'residence': 'US', 'app_language': 'en', 'timezone_offset': '-14400', 'host_abi': 'armeabi-v7a', 'locale': 'en', 'ac2': 'wifi5g', 'uoo': '1', 'carrier_region': 'US', 'op_region': 'US', 'build_number': self._APP_INFO['app_version'], 'region': 'US', 'ts': int(time.time()), 'iid': self._APP_INFO.get('iid'), 'device_id': self._DEVICE_ID, 'openudid': ''.join(random.choices('0123456789abcdef', k=16)), }) def _call_api(self, ep, video_id, query=None, data=None, headers=None, fatal=True, note='Downloading API JSON', errnote='Unable to download API page'): if not self._APP_INFO and not self._get_next_app_info(): message = 'No working app info is available' if fatal: raise ExtractorError(message, expected=True) else: self.report_warning(message) return max_tries = len(self._APP_INFO_POOL) + 1 # _APP_INFO_POOL + _APP_INFO for count in itertools.count(1): self.write_debug(str(self._APP_INFO)) real_query = self._build_api_query(query or {}) try: return self._call_api_impl( ep, video_id, query=real_query, data=data, headers=headers, fatal=fatal, note=note, errnote=errnote) except ExtractorError as e: if isinstance(e.cause, json.JSONDecodeError) and e.cause.pos == 0: message = str(e.cause or e.msg) if not self._get_next_app_info(): if fatal: raise else: self.report_warning(message) return self.report_warning(f'{message}. Retrying... (attempt {count} of {max_tries})') continue raise def _extract_aweme_app(self, aweme_id): aweme_detail = traverse_obj( self._call_api('multi/aweme/detail', aweme_id, data=urlencode_postdata({ 'aweme_ids': f'[{aweme_id}]', 'request_source': '0', }), headers={'X-Argus': ''}), ('aweme_details', 0, {dict})) if not aweme_detail: raise ExtractorError('Unable to extract aweme detail info', video_id=aweme_id) return self._parse_aweme_video_app(aweme_detail) def _solve_challenge_and_set_cookie(self, webpage): challenge_data = traverse_obj(webpage, ( {find_element(id='cs', html=True)}, {extract_attributes}, 'class', filter, {lambda x: f'{x}==='}, {base64.b64decode}, {json.loads})) if not challenge_data: if 'Please wait...' in webpage: raise ExtractorError('Unable to extract challenge data') raise ExtractorError('Unexpected response from webpage request') self.to_screen('Solving JS challenge using native Python implementation') expected_digest = traverse_obj(challenge_data, ( 'v', 'c', {str}, {base64.b64decode}, {require('challenge expected digest')})) base_hash = traverse_obj(challenge_data, ( 'v', 'a', {str}, {base64.b64decode}, {hashlib.sha256}, {require('challenge base hash')})) for i in range(1_000_001): number = str(i).encode() test_hash = base_hash.copy() test_hash.update(number) if test_hash.digest() == expected_digest: challenge_data['d'] = base64.b64encode(number).decode() break else: raise ExtractorError('Unable to solve JS challenge') cookie_value = base64.b64encode( json.dumps(challenge_data, separators=(',', ':')).encode()).decode() # At time of writing, the cookie name was _wafchallengeid cookie_name = traverse_obj(webpage, ( {find_element(id='wci', html=True)}, {extract_attributes}, 'class', {require('challenge cookie name')})) # Actual JS sets Max-Age=1, but we need to adjust for --sleep-requests and Python slowness expire_time = int(time.time()) + (self.get_param('sleep_interval_requests') or 0) + 2 self._set_cookie('.tiktok.com', cookie_name, cookie_value, expire_time=expire_time) def _extract_web_data_and_status(self, url, video_id, fatal=True): video_data, status = {}, -1 def get_webpage(note='Downloading webpage'): res = self._download_webpage_handle(url, video_id, note, fatal=fatal, impersonate=True) if res is False: return False webpage, urlh = res if urllib.parse.urlparse(urlh.url).path == '/login': message = 'TikTok is requiring login for access to this content' if fatal: self.raise_login_required(message) self.report_warning(f'{message}. {self._login_hint()}', video_id=video_id) return False return webpage webpage = get_webpage() if webpage is False: return video_data, status universal_data = self._get_universal_data(webpage, video_id) if not universal_data: try: self._solve_challenge_and_set_cookie(webpage) except ExtractorError as e: if fatal: raise self.report_warning(e.orig_msg, video_id=video_id) return video_data, status webpage = get_webpage(note='Downloading webpage with challenge cookie') if webpage is False: return video_data, status universal_data = self._get_universal_data(webpage, video_id) if not universal_data: message = 'Unable to extract universal data for rehydration' if fatal: raise ExtractorError(message) self.report_warning(message, video_id=video_id) return video_data, status status = traverse_obj(universal_data, ('webapp.video-detail', 'statusCode', {int})) or 0 video_data = traverse_obj(universal_data, ('webapp.video-detail', 'itemInfo', 'itemStruct', {dict})) if not traverse_obj(video_data, ('video', {dict})) and traverse_obj(video_data, ('isContentClassified', {bool})): message = 'This post may not be comfortable for some audiences. Log in for access' if fatal: self.raise_login_required(message) self.report_warning(f'{message}. {self._login_hint()}', video_id=video_id) return video_data, status def _get_subtitles(self, aweme_detail, aweme_id, user_name): # TODO: Extract text positioning info EXT_MAP = { # From lowest to highest preference 'creator_caption': 'json', 'srt': 'srt', 'webvtt': 'vtt', } preference = qualities(tuple(EXT_MAP.values())) subtitles = {} # aweme/detail endpoint subs captions_info = traverse_obj( aweme_detail, ('interaction_stickers', ..., 'auto_video_caption_info', 'auto_captions', ...), expected_type=dict) for caption in captions_info: caption_url = traverse_obj(caption, ('url', 'url_list', ...), expected_type=url_or_none, get_all=False) if not caption_url: continue caption_json = self._download_json( caption_url, aweme_id, note='Downloading captions', errnote='Unable to download captions', fatal=False) if not caption_json: continue subtitles.setdefault(caption.get('language', 'en'), []).append({ 'ext': 'srt', 'data': '\n\n'.join( f'{i + 1}\n{srt_subtitles_timecode(line["start_time"] / 1000)} --> {srt_subtitles_timecode(line["end_time"] / 1000)}\n{line["text"]}' for i, line in enumerate(caption_json['utterances']) if line.get('text')), }) # feed endpoint subs if not subtitles: for caption in traverse_obj(aweme_detail, ('video', 'cla_info', 'caption_infos', ...), expected_type=dict): if not caption.get('url'): continue subtitles.setdefault(caption.get('lang') or 'en', []).append({ 'url': caption['url'], 'ext': EXT_MAP.get(caption.get('Format')), }) # webpage subs if not subtitles: if user_name: # only _parse_aweme_video_app needs to extract the webpage here aweme_detail, _ = self._extract_web_data_and_status( self._create_url(user_name, aweme_id), aweme_id, fatal=False) for caption in traverse_obj(aweme_detail, ('video', 'subtitleInfos', lambda _, v: v['Url'])): subtitles.setdefault(caption.get('LanguageCodeName') or 'en', []).append({ 'url': caption['Url'], 'ext': EXT_MAP.get(caption.get('Format')), }) # Deprioritize creator_caption json since it can't be embedded or used by media players for lang, subs_list in subtitles.items(): subtitles[lang] = sorted(subs_list, key=lambda x: preference(x['ext'])) return subtitles def _parse_url_key(self, url_key): format_id, codec, res, bitrate = self._search_regex( r'v[^_]+_(?P(?P[^_]+)_(?P\d+p)_(?P\d+))', url_key, 'url key', default=(None, None, None, None), group=('id', 'codec', 'res', 'bitrate')) if not format_id: return {}, None return { 'format_id': format_id, 'vcodec': 'h265' if codec == 'bytevc1' else codec, 'tbr': int_or_none(bitrate, scale=1000) or None, 'quality': qualities(self.QUALITIES)(res), }, res def _parse_aweme_video_app(self, aweme_detail): aweme_id = aweme_detail['aweme_id'] video_info = aweme_detail['video'] known_resolutions = {} def audio_meta(url): ext = determine_ext(url, default_ext='m4a') return { 'format_note': 'Music track', 'ext': ext, 'acodec': 'aac' if ext == 'm4a' else ext, 'vcodec': 'none', 'width': None, 'height': None, } if ext == 'mp3' or '-music-' in url else {} def extract_addr(addr, add_meta={}): parsed_meta, res = self._parse_url_key(addr.get('url_key', '')) is_bytevc2 = parsed_meta.get('vcodec') == 'bytevc2' if res: known_resolutions.setdefault(res, {}).setdefault('height', int_or_none(addr.get('height'))) known_resolutions[res].setdefault('width', int_or_none(addr.get('width'))) parsed_meta.update(known_resolutions.get(res, {})) add_meta.setdefault('height', int_or_none(res[:-1])) return [{ 'url': url, 'filesize': int_or_none(addr.get('data_size')), 'ext': 'mp4', 'acodec': 'aac', 'source_preference': -2 if 'aweme/v1' in url else -1, # Downloads from API might get blocked **add_meta, **parsed_meta, # bytevc2 is bytedance's own custom h266/vvc codec, as-of-yet unplayable 'preference': -100 if is_bytevc2 else -1, 'format_note': join_nonempty( add_meta.get('format_note'), '(API)' if 'aweme/v1' in url else None, '(UNPLAYABLE)' if is_bytevc2 else None, delim=' '), **audio_meta(url), } for url in addr.get('url_list') or []] # Hack: Add direct video links first to prioritize them when removing duplicate formats formats = [] width = int_or_none(video_info.get('width')) height = int_or_none(video_info.get('height')) ratio = try_call(lambda: width / height) or 0.5625 if video_info.get('play_addr'): formats.extend(extract_addr(video_info['play_addr'], { 'format_id': 'play_addr', 'format_note': 'Direct video', 'vcodec': 'h265' if traverse_obj( video_info, 'is_bytevc1', 'is_h265') else 'h264', # TODO: Check for "direct iOS" videos, like https://www.tiktok.com/@cookierun_dev/video/7039716639834656002 'width': width, 'height': height, })) if video_info.get('download_addr'): download_addr = video_info['download_addr'] dl_width = int_or_none(download_addr.get('width')) formats.extend(extract_addr(download_addr, { 'format_id': 'download_addr', 'format_note': 'Download video%s' % (', watermarked' if video_info.get('has_watermark') else ''), 'vcodec': 'h264', 'width': dl_width, 'height': try_call(lambda: int(dl_width / ratio)), # download_addr['height'] is wrong 'preference': -2 if video_info.get('has_watermark') else -1, })) if video_info.get('play_addr_h264'): formats.extend(extract_addr(video_info['play_addr_h264'], { 'format_id': 'play_addr_h264', 'format_note': 'Direct video', 'vcodec': 'h264', })) if video_info.get('play_addr_bytevc1'): formats.extend(extract_addr(video_info['play_addr_bytevc1'], { 'format_id': 'play_addr_bytevc1', 'format_note': 'Direct video', 'vcodec': 'h265', })) for bitrate in video_info.get('bit_rate', []): if bitrate.get('play_addr'): formats.extend(extract_addr(bitrate['play_addr'], { 'format_id': bitrate.get('gear_name'), 'format_note': 'Playback video', 'tbr': try_get(bitrate, lambda x: x['bit_rate'] / 1000), 'vcodec': 'h265' if traverse_obj( bitrate, 'is_bytevc1', 'is_h265') else 'h264', 'fps': bitrate.get('FPS'), })) self._remove_duplicate_formats(formats) auth_cookie = self._get_cookies(self._WEBPAGE_HOST).get('sid_tt') if auth_cookie: for f in formats: self._set_cookie(urllib.parse.urlparse(f['url']).hostname, 'sid_tt', auth_cookie.value) stats_info = aweme_detail.get('statistics') or {} music_info = aweme_detail.get('music') or {} labels = traverse_obj(aweme_detail, ('hybrid_label', ..., 'text'), expected_type=str) contained_music_track = traverse_obj( music_info, ('matched_song', 'title'), ('matched_pgc_sound', 'title'), expected_type=str) contained_music_author = traverse_obj( music_info, ('matched_song', 'author'), ('matched_pgc_sound', 'author'), 'author', expected_type=str) is_generic_og_trackname = music_info.get('is_original_sound') and music_info.get('title') == 'original sound - {}'.format(music_info.get('owner_handle')) if is_generic_og_trackname: music_track, music_author = contained_music_track or 'original sound', contained_music_author else: music_track, music_author = music_info.get('title'), traverse_obj(music_info, ('author', {str})) author_info = traverse_obj(aweme_detail, ('author', { 'uploader': ('unique_id', {str}), 'uploader_id': ('uid', {str_or_none}), 'channel': ('nickname', {str}), 'channel_id': ('sec_uid', {str}), })) return { 'id': aweme_id, **traverse_obj(aweme_detail, { 'title': ('desc', {truncate_string(left=72)}), 'description': ('desc', {str}), 'timestamp': ('create_time', {int_or_none}), }), **traverse_obj(stats_info, { 'view_count': 'play_count', 'like_count': 'digg_count', 'repost_count': 'share_count', 'comment_count': 'comment_count', 'save_count': 'collect_count', }, expected_type=int_or_none), **author_info, 'channel_url': format_field(author_info, 'channel_id', self._UPLOADER_URL_FORMAT, default=None), 'uploader_url': format_field( author_info, ['uploader', 'uploader_id'], self._UPLOADER_URL_FORMAT, default=None), 'track': music_track, 'album': str_or_none(music_info.get('album')) or None, 'artists': re.split(r'(?:, | & )', music_author) if music_author else None, 'formats': formats, 'subtitles': self.extract_subtitles( aweme_detail, aweme_id, traverse_obj(author_info, 'uploader', 'uploader_id', 'channel_id')), 'thumbnails': [ { 'id': cover_id, 'url': cover_url, 'preference': -1 if cover_id in ('cover', 'origin_cover') else -2, } for cover_id in ( 'cover', 'ai_dynamic_cover', 'animated_cover', 'ai_dynamic_cover_bak', 'origin_cover', 'dynamic_cover') for cover_url in traverse_obj(video_info, (cover_id, 'url_list', ...)) ], 'duration': (traverse_obj(video_info, ( (None, 'download_addr'), 'duration', {int_or_none(scale=1000)}, any)) or traverse_obj(music_info, ('duration', {int_or_none}))), 'availability': self._availability( is_private='Private' in labels, needs_subscription='Friends only' in labels, is_unlisted='Followers only' in labels), '_format_sort_fields': ('quality', 'codec', 'size', 'br'), } def _extract_web_formats(self, aweme_detail): COMMON_FORMAT_INFO = { 'ext': 'mp4', 'vcodec': 'h264', 'acodec': 'aac', } video_info = traverse_obj(aweme_detail, ('video', {dict})) or {} play_width = int_or_none(video_info.get('width')) play_height = int_or_none(video_info.get('height')) ratio = try_call(lambda: play_width / play_height) or 0.5625 formats = [] for bitrate_info in traverse_obj(video_info, ('bitrateInfo', lambda _, v: v['PlayAddr']['UrlList'])): format_info, res = self._parse_url_key( traverse_obj(bitrate_info, ('PlayAddr', 'UrlKey', {str})) or '') # bytevc2 is bytedance's own custom h266/vvc codec, as-of-yet unplayable is_bytevc2 = format_info.get('vcodec') == 'bytevc2' format_info.update({ 'format_note': 'UNPLAYABLE' if is_bytevc2 else None, 'preference': -100 if is_bytevc2 else -1, 'filesize': traverse_obj(bitrate_info, ('PlayAddr', 'DataSize', {int_or_none})), }) if dimension := (res and int(res[:-1])): if dimension == 540: # '540p' is actually 576p dimension = 576 if ratio < 1: # portrait: res/dimension is width y = int(dimension / ratio) format_info.update({ 'width': dimension, 'height': y - (y % 2), }) else: # landscape: res/dimension is height x = int(dimension * ratio) format_info.update({ 'width': x + (x % 2), 'height': dimension, }) for video_url in traverse_obj(bitrate_info, ('PlayAddr', 'UrlList', ..., {url_or_none})): formats.append({ **COMMON_FORMAT_INFO, **format_info, 'url': self._proto_relative_url(video_url), }) # We don't have res string for play formats, but need quality for sorting & de-duplication play_quality = traverse_obj(formats, (lambda _, v: v['width'] == play_width, 'quality', any)) for play_url in traverse_obj(video_info, ('playAddr', ((..., 'src'), None), {url_or_none})): formats.append({ **COMMON_FORMAT_INFO, 'format_id': 'play', 'url': self._proto_relative_url(play_url), 'width': play_width, 'height': play_height, 'quality': play_quality, }) for download_url in traverse_obj(video_info, (('downloadAddr', ('download', 'url')), {url_or_none})): formats.append({ **COMMON_FORMAT_INFO, 'format_id': 'download', 'url': self._proto_relative_url(download_url), 'format_note': 'watermarked', 'preference': -2, }) self._remove_duplicate_formats(formats) # Is it a slideshow with only audio for download? if not formats and traverse_obj(aweme_detail, ('music', 'playUrl', {url_or_none})): audio_url = aweme_detail['music']['playUrl'] ext = traverse_obj(parse_qs(audio_url), ( 'mime_type', -1, {lambda x: x.replace('_', '/')}, {mimetype2ext})) or 'm4a' formats.append({ 'format_id': 'audio', 'url': self._proto_relative_url(audio_url), 'ext': ext, 'acodec': 'aac' if ext == 'm4a' else ext, 'vcodec': 'none', }) # Filter out broken formats, see https://github.com/yt-dlp/yt-dlp/issues/11034 return [f for f in formats if urllib.parse.urlparse(f['url']).hostname != 'www.tiktok.com'] def _parse_aweme_video_web(self, aweme_detail, webpage_url, video_id, extract_flat=False): author_info = traverse_obj(aweme_detail, (('authorInfo', 'author', None), { 'channel': ('nickname', {str}), 'channel_id': (('authorSecId', 'secUid'), {str}), 'uploader': (('uniqueId', 'author'), {str}), 'uploader_id': (('authorId', 'uid', 'id'), {str_or_none}), }), get_all=False) return { 'id': video_id, 'formats': None if extract_flat else self._extract_web_formats(aweme_detail), 'subtitles': None if extract_flat else self.extract_subtitles(aweme_detail, video_id, None), 'http_headers': {'Referer': webpage_url}, **author_info, 'channel_url': format_field(author_info, 'channel_id', self._UPLOADER_URL_FORMAT, default=None), 'uploader_url': format_field( author_info, ['uploader', 'uploader_id'], self._UPLOADER_URL_FORMAT, default=None), **traverse_obj(aweme_detail, ('music', { 'track': ('title', {str}), 'album': ('album', {str}, filter), 'artists': ('authorName', {str}, {lambda x: re.split(r'(?:, | & )', x) if x else None}), 'duration': ('duration', {int_or_none}), })), **traverse_obj(aweme_detail, { 'title': ('desc', {truncate_string(left=72)}), 'description': ('desc', {str}), # audio-only slideshows have a video duration of 0 and an actual audio duration 'duration': ('video', 'duration', {int_or_none}, filter), 'timestamp': ('createTime', {int_or_none}), }), **traverse_obj(aweme_detail, ('stats', { 'view_count': 'playCount', 'like_count': 'diggCount', 'repost_count': 'shareCount', 'comment_count': 'commentCount', 'save_count': 'collectCount', }), expected_type=int_or_none), 'thumbnails': [ { 'id': cover_id, 'url': self._proto_relative_url(cover_url), 'preference': -2 if cover_id == 'dynamicCover' else -1, } for cover_id in ('thumbnail', 'cover', 'dynamicCover', 'originCover') for cover_url in traverse_obj(aweme_detail, ((None, 'video'), cover_id, {url_or_none})) ], } class TikTokIE(TikTokBaseIE): _VALID_URL = r'https?://www\.tiktok\.com/(?:embed|@(?P[\w\.-]+)?/video)/(?P\d+)' _EMBED_REGEX = [rf'<(?:script|iframe)[^>]+\bsrc=(["\'])(?P{_VALID_URL})'] _TESTS = [{ 'url': 'https://www.tiktok.com/@leenabhushan/video/6748451240264420610', 'md5': '736bb7a466c6f0a6afeb597da1e6f5b7', 'info_dict': { 'id': '6748451240264420610', 'ext': 'mp4', 'title': '#jassmanak #lehanga #leenabhushan', 'description': '#jassmanak #lehanga #leenabhushan', 'duration': 13, 'height': 1024, 'width': 576, 'uploader': 'leenabhushan', 'uploader_id': '6691488002098119685', 'uploader_url': 'https://www.tiktok.com/@MS4wLjABAAAA_Eb4t1vodM1IuTy_cvp9CY22RAb59xqrO0Xtz9CYQJvgXaDvZxYnZYRzDWhhgJmy', 'creator': 'facestoriesbyleenabh', 'thumbnail': r're:^https?://[\w\/\.\-]+(~[\w\-]+\.image)?', 'upload_date': '20191016', 'timestamp': 1571246252, 'view_count': int, 'like_count': int, 'repost_count': int, 'comment_count': int, 'save_count': int, 'artist': 'Ysrbeats', 'album': 'Lehanga', 'track': 'Lehanga', }, 'skip': '404 Not Found', }, { 'url': 'https://www.tiktok.com/@patroxofficial/video/6742501081818877190?langCountry=en', 'md5': 'f21112672ee4ce05ca390fb6522e1b6f', 'info_dict': { 'id': '6742501081818877190', 'ext': 'mp4', 'title': 'Tag 1 Friend reverse this Video and look what happens 🤩😱 @skyandtami ...', 'description': 'md5:5e2a23877420bb85ce6521dbee39ba94', 'duration': 27, 'height': 1024, 'width': 576, 'uploader': 'patrox', 'uploader_id': '18702747', 'uploader_url': 'https://www.tiktok.com/@patrox', 'channel_url': 'https://www.tiktok.com/@MS4wLjABAAAAiFnldaILebi5heDoVU6bn4jBWWycX6-9U3xuNPqZ8Ws', 'channel_id': 'MS4wLjABAAAAiFnldaILebi5heDoVU6bn4jBWWycX6-9U3xuNPqZ8Ws', 'channel': 'patroX', 'thumbnail': r're:^https?://[\w\/\.\-]+(~[\w\-]+\.image)?', 'upload_date': '20190930', 'timestamp': 1569860870, 'view_count': int, 'like_count': int, 'repost_count': int, 'comment_count': int, 'save_count': int, 'artists': ['Evan Todd', 'Jessica Keenan Wynn', 'Alice Lee', 'Barrett Wilbert Weed', 'Jon Eidson'], 'track': 'Big Fun', }, }, { # Banned audio, was available on the app, now works with web too 'url': 'https://www.tiktok.com/@barudakhb_/video/6984138651336838402', 'info_dict': { 'id': '6984138651336838402', 'ext': 'mp4', 'title': 'Balas @yolaaftwsr hayu yu ? #SquadRandom_ 🔥', 'description': 'Balas @yolaaftwsr hayu yu ? #SquadRandom_ 🔥', 'uploader': 'barudakhb_', 'channel': 'md5:29f238c49bc0c176cb3cef1a9cea9fa6', 'uploader_id': '6974687867511718913', 'uploader_url': 'https://www.tiktok.com/@barudakhb_', 'channel_url': 'https://www.tiktok.com/@MS4wLjABAAAAbhBwQC-R1iKoix6jDFsF-vBdfx2ABoDjaZrM9fX6arU3w71q3cOWgWuTXn1soZ7d', 'channel_id': 'MS4wLjABAAAAbhBwQC-R1iKoix6jDFsF-vBdfx2ABoDjaZrM9fX6arU3w71q3cOWgWuTXn1soZ7d', 'track': 'Boka Dance', 'artists': ['md5:29f238c49bc0c176cb3cef1a9cea9fa6'], 'timestamp': 1626121503, 'duration': 18, 'thumbnail': r're:^https?://[\w\/\.\-]+(~[\w\-]+\.image)?', 'upload_date': '20210712', 'view_count': int, 'like_count': int, 'repost_count': int, 'comment_count': int, 'save_count': int, }, }, { # Sponsored video, only available with feed workaround 'url': 'https://www.tiktok.com/@MS4wLjABAAAATh8Vewkn0LYM7Fo03iec3qKdeCUOcBIouRk1mkiag6h3o_pQu_dUXvZ2EZlGST7_/video/7042692929109986561', 'info_dict': { 'id': '7042692929109986561', 'ext': 'mp4', 'title': 'Slap and Run!', 'description': 'Slap and Run!', 'uploader': 'user440922249', 'channel': 'Slap And Run', 'uploader_id': '7036055384943690754', 'uploader_url': 'https://www.tiktok.com/@MS4wLjABAAAATh8Vewkn0LYM7Fo03iec3qKdeCUOcBIouRk1mkiag6h3o_pQu_dUXvZ2EZlGST7_', 'channel_id': 'MS4wLjABAAAATh8Vewkn0LYM7Fo03iec3qKdeCUOcBIouRk1mkiag6h3o_pQu_dUXvZ2EZlGST7_', 'track': 'Promoted Music', 'timestamp': 1639754738, 'duration': 30, 'thumbnail': r're:^https?://[\w\/\.\-]+(~[\w\-]+\.image)?', 'upload_date': '20211217', 'view_count': int, 'like_count': int, 'repost_count': int, 'comment_count': int, 'save_count': int, }, 'skip': 'This video is unavailable', }, { # Video without title and description 'url': 'https://www.tiktok.com/@pokemonlife22/video/7059698374567611694', 'info_dict': { 'id': '7059698374567611694', 'ext': 'mp4', 'title': 'TikTok video #7059698374567611694', 'description': '', 'uploader': 'pokemonlife22', 'channel': 'Pokemon', 'uploader_id': '6820838815978423302', 'uploader_url': 'https://www.tiktok.com/@pokemonlife22', 'channel_url': 'https://www.tiktok.com/@MS4wLjABAAAA0tF1nBwQVVMyrGu3CqttkNgM68Do1OXUFuCY0CRQk8fEtSVDj89HqoqvbSTmUP2W', 'channel_id': 'MS4wLjABAAAA0tF1nBwQVVMyrGu3CqttkNgM68Do1OXUFuCY0CRQk8fEtSVDj89HqoqvbSTmUP2W', 'track': 'original sound', 'timestamp': 1643714123, 'duration': 6, 'thumbnail': r're:^https?://[\w\/\.\-]+(~[\w\-]+\.image)?', 'upload_date': '20220201', 'artists': ['Pokemon'], 'view_count': int, 'like_count': int, 'repost_count': int, 'comment_count': int, 'save_count': int, }, }, { # hydration JSON is sent in a