import base64
import functools
import hashlib
import itertools
import json
import random
import re
import string
import time
import urllib.parse
import uuid
from .common import InfoExtractor
from ..networking import HEADRequest
from ..utils import (
ExtractorError,
UnsupportedError,
UserNotLive,
determine_ext,
extract_attributes,
filter_dict,
format_field,
int_or_none,
join_nonempty,
merge_dicts,
mimetype2ext,
parse_qs,
qualities,
srt_subtitles_timecode,
str_or_none,
truncate_string,
try_call,
try_get,
url_or_none,
urlencode_postdata,
)
from ..utils.traversal import find_element, require, traverse_obj
class TikTokBaseIE(InfoExtractor):
_UPLOADER_URL_FORMAT = 'https://www.tiktok.com/@%s'
_WEBPAGE_HOST = 'https://www.tiktok.com/'
QUALITIES = ('360p', '540p', '720p', '1080p')
_APP_INFO_DEFAULTS = {
# unique "install id"
'iid': None,
# TikTok (KR/PH/TW/TH/VN) = trill, TikTok (rest of world) = musical_ly, Douyin = aweme
'app_name': 'musical_ly',
'app_version': '35.1.3',
'manifest_app_version': '2023501030',
# "app id": aweme = 1128, trill = 1180, musical_ly = 1233, universal = 0
'aid': '0',
}
_APP_INFO_POOL = None
_APP_INFO = None
_APP_USER_AGENT = None
@functools.cached_property
def _KNOWN_APP_INFO(self):
# If we have a genuine device ID, we may not need any IID
default = [''] if self._KNOWN_DEVICE_ID else []
return self._configuration_arg('app_info', default, ie_key=TikTokIE)
@functools.cached_property
def _KNOWN_DEVICE_ID(self):
return self._configuration_arg('device_id', [None], ie_key=TikTokIE)[0]
@functools.cached_property
def _DEVICE_ID(self):
return self._KNOWN_DEVICE_ID or str(random.randint(7250000000000000000, 7325099899999994577))
@functools.cached_property
def _API_HOSTNAME(self):
return self._configuration_arg(
'api_hostname', ['api16-normal-c-useast1a.tiktokv.com'], ie_key=TikTokIE)[0]
def _get_next_app_info(self):
if self._APP_INFO_POOL is None:
defaults = {
key: self._configuration_arg(key, [default], ie_key=TikTokIE)[0]
for key, default in self._APP_INFO_DEFAULTS.items()
if key != 'iid'
}
self._APP_INFO_POOL = [
{**defaults, **dict(
(k, v) for k, v in zip(self._APP_INFO_DEFAULTS, app_info.split('/'), strict=False) if v
)} for app_info in self._KNOWN_APP_INFO
]
if not self._APP_INFO_POOL:
return False
self._APP_INFO = self._APP_INFO_POOL.pop(0)
app_name = self._APP_INFO['app_name']
version = self._APP_INFO['manifest_app_version']
if app_name == 'musical_ly':
package = f'com.zhiliaoapp.musically/{version}'
else: # trill, aweme
package = f'com.ss.android.ugc.{app_name}/{version}'
self._APP_USER_AGENT = f'{package} (Linux; U; Android 13; en_US; Pixel 7; Build/TD1A.220804.031; Cronet/58.0.2991.0)'
return True
@staticmethod
def _create_url(user_id, video_id):
return f'https://www.tiktok.com/@{user_id or "_"}/video/{video_id}'
def _get_sigi_state(self, webpage, display_id):
return self._search_json(
r'', default={})
def _get_universal_data(self, webpage, display_id):
return traverse_obj(self._search_json(
r'', default={}),
('__DEFAULT_SCOPE__', {dict})) or {}
def _call_api_impl(self, ep, video_id, query=None, data=None, headers=None, fatal=True,
note='Downloading API JSON', errnote='Unable to download API page'):
self._set_cookie(self._API_HOSTNAME, 'odin_tt', ''.join(random.choices('0123456789abcdef', k=160)))
webpage_cookies = self._get_cookies(self._WEBPAGE_HOST)
if webpage_cookies.get('sid_tt'):
self._set_cookie(self._API_HOSTNAME, 'sid_tt', webpage_cookies['sid_tt'].value)
return self._download_json(
f'https://{self._API_HOSTNAME}/aweme/v1/{ep}/', video_id=video_id,
fatal=fatal, note=note, errnote=errnote, headers={
'User-Agent': self._APP_USER_AGENT,
'Accept': 'application/json',
**(headers or {}),
}, query=query, data=data)
def _build_api_query(self, query):
return filter_dict({
**query,
'device_platform': 'android',
'os': 'android',
'ssmix': 'a',
'_rticket': int(time.time() * 1000),
'cdid': str(uuid.uuid4()),
'channel': 'googleplay',
'aid': self._APP_INFO['aid'],
'app_name': self._APP_INFO['app_name'],
'version_code': ''.join(f'{int(v):02d}' for v in self._APP_INFO['app_version'].split('.')),
'version_name': self._APP_INFO['app_version'],
'manifest_version_code': self._APP_INFO['manifest_app_version'],
'update_version_code': self._APP_INFO['manifest_app_version'],
'ab_version': self._APP_INFO['app_version'],
'resolution': '1080*2400',
'dpi': 420,
'device_type': 'Pixel 7',
'device_brand': 'Google',
'language': 'en',
'os_api': '29',
'os_version': '13',
'ac': 'wifi',
'is_pad': '0',
'current_region': 'US',
'app_type': 'normal',
'sys_region': 'US',
'last_install_time': int(time.time()) - random.randint(86400, 1123200),
'timezone_name': 'America/New_York',
'residence': 'US',
'app_language': 'en',
'timezone_offset': '-14400',
'host_abi': 'armeabi-v7a',
'locale': 'en',
'ac2': 'wifi5g',
'uoo': '1',
'carrier_region': 'US',
'op_region': 'US',
'build_number': self._APP_INFO['app_version'],
'region': 'US',
'ts': int(time.time()),
'iid': self._APP_INFO.get('iid'),
'device_id': self._DEVICE_ID,
'openudid': ''.join(random.choices('0123456789abcdef', k=16)),
})
def _call_api(self, ep, video_id, query=None, data=None, headers=None, fatal=True,
note='Downloading API JSON', errnote='Unable to download API page'):
if not self._APP_INFO and not self._get_next_app_info():
message = 'No working app info is available'
if fatal:
raise ExtractorError(message, expected=True)
else:
self.report_warning(message)
return
max_tries = len(self._APP_INFO_POOL) + 1 # _APP_INFO_POOL + _APP_INFO
for count in itertools.count(1):
self.write_debug(str(self._APP_INFO))
real_query = self._build_api_query(query or {})
try:
return self._call_api_impl(
ep, video_id, query=real_query, data=data, headers=headers,
fatal=fatal, note=note, errnote=errnote)
except ExtractorError as e:
if isinstance(e.cause, json.JSONDecodeError) and e.cause.pos == 0:
message = str(e.cause or e.msg)
if not self._get_next_app_info():
if fatal:
raise
else:
self.report_warning(message)
return
self.report_warning(f'{message}. Retrying... (attempt {count} of {max_tries})')
continue
raise
def _extract_aweme_app(self, aweme_id):
aweme_detail = traverse_obj(
self._call_api('multi/aweme/detail', aweme_id, data=urlencode_postdata({
'aweme_ids': f'[{aweme_id}]',
'request_source': '0',
}), headers={'X-Argus': ''}), ('aweme_details', 0, {dict}))
if not aweme_detail:
raise ExtractorError('Unable to extract aweme detail info', video_id=aweme_id)
return self._parse_aweme_video_app(aweme_detail)
def _solve_challenge_and_set_cookie(self, webpage):
challenge_data = traverse_obj(webpage, (
{find_element(id='cs', html=True)}, {extract_attributes}, 'class',
filter, {lambda x: f'{x}==='}, {base64.b64decode}, {json.loads}))
if not challenge_data:
if 'Please wait...' in webpage:
raise ExtractorError('Unable to extract challenge data')
raise ExtractorError('Unexpected response from webpage request')
self.to_screen('Solving JS challenge using native Python implementation')
expected_digest = traverse_obj(challenge_data, (
'v', 'c', {str}, {base64.b64decode},
{require('challenge expected digest')}))
base_hash = traverse_obj(challenge_data, (
'v', 'a', {str}, {base64.b64decode},
{hashlib.sha256}, {require('challenge base hash')}))
for i in range(1_000_001):
number = str(i).encode()
test_hash = base_hash.copy()
test_hash.update(number)
if test_hash.digest() == expected_digest:
challenge_data['d'] = base64.b64encode(number).decode()
break
else:
raise ExtractorError('Unable to solve JS challenge')
cookie_value = base64.b64encode(
json.dumps(challenge_data, separators=(',', ':')).encode()).decode()
# At time of writing, the cookie name was _wafchallengeid
cookie_name = traverse_obj(webpage, (
{find_element(id='wci', html=True)}, {extract_attributes},
'class', {require('challenge cookie name')}))
# Actual JS sets Max-Age=1, but we need to adjust for --sleep-requests and Python slowness
expire_time = int(time.time()) + (self.get_param('sleep_interval_requests') or 0) + 2
self._set_cookie('.tiktok.com', cookie_name, cookie_value, expire_time=expire_time)
def _extract_web_data_and_status(self, url, video_id, fatal=True):
video_data, status = {}, -1
def get_webpage(note='Downloading webpage'):
res = self._download_webpage_handle(url, video_id, note, fatal=fatal, impersonate=True)
if res is False:
return False
webpage, urlh = res
if urllib.parse.urlparse(urlh.url).path == '/login':
message = 'TikTok is requiring login for access to this content'
if fatal:
self.raise_login_required(message)
self.report_warning(f'{message}. {self._login_hint()}', video_id=video_id)
return False
return webpage
webpage = get_webpage()
if webpage is False:
return video_data, status
universal_data = self._get_universal_data(webpage, video_id)
if not universal_data:
try:
self._solve_challenge_and_set_cookie(webpage)
except ExtractorError as e:
if fatal:
raise
self.report_warning(e.orig_msg, video_id=video_id)
return video_data, status
webpage = get_webpage(note='Downloading webpage with challenge cookie')
if webpage is False:
return video_data, status
universal_data = self._get_universal_data(webpage, video_id)
if not universal_data:
message = 'Unable to extract universal data for rehydration'
if fatal:
raise ExtractorError(message)
self.report_warning(message, video_id=video_id)
return video_data, status
status = traverse_obj(universal_data, ('webapp.video-detail', 'statusCode', {int})) or 0
video_data = traverse_obj(universal_data, ('webapp.video-detail', 'itemInfo', 'itemStruct', {dict}))
if not traverse_obj(video_data, ('video', {dict})) and traverse_obj(video_data, ('isContentClassified', {bool})):
message = 'This post may not be comfortable for some audiences. Log in for access'
if fatal:
self.raise_login_required(message)
self.report_warning(f'{message}. {self._login_hint()}', video_id=video_id)
return video_data, status
def _get_subtitles(self, aweme_detail, aweme_id, user_name):
# TODO: Extract text positioning info
EXT_MAP = { # From lowest to highest preference
'creator_caption': 'json',
'srt': 'srt',
'webvtt': 'vtt',
}
preference = qualities(tuple(EXT_MAP.values()))
subtitles = {}
# aweme/detail endpoint subs
captions_info = traverse_obj(
aweme_detail, ('interaction_stickers', ..., 'auto_video_caption_info', 'auto_captions', ...), expected_type=dict)
for caption in captions_info:
caption_url = traverse_obj(caption, ('url', 'url_list', ...), expected_type=url_or_none, get_all=False)
if not caption_url:
continue
caption_json = self._download_json(
caption_url, aweme_id, note='Downloading captions', errnote='Unable to download captions', fatal=False)
if not caption_json:
continue
subtitles.setdefault(caption.get('language', 'en'), []).append({
'ext': 'srt',
'data': '\n\n'.join(
f'{i + 1}\n{srt_subtitles_timecode(line["start_time"] / 1000)} --> {srt_subtitles_timecode(line["end_time"] / 1000)}\n{line["text"]}'
for i, line in enumerate(caption_json['utterances']) if line.get('text')),
})
# feed endpoint subs
if not subtitles:
for caption in traverse_obj(aweme_detail, ('video', 'cla_info', 'caption_infos', ...), expected_type=dict):
if not caption.get('url'):
continue
subtitles.setdefault(caption.get('lang') or 'en', []).append({
'url': caption['url'],
'ext': EXT_MAP.get(caption.get('Format')),
})
# webpage subs
if not subtitles:
if user_name: # only _parse_aweme_video_app needs to extract the webpage here
aweme_detail, _ = self._extract_web_data_and_status(
self._create_url(user_name, aweme_id), aweme_id, fatal=False)
for caption in traverse_obj(aweme_detail, ('video', 'subtitleInfos', lambda _, v: v['Url'])):
subtitles.setdefault(caption.get('LanguageCodeName') or 'en', []).append({
'url': caption['Url'],
'ext': EXT_MAP.get(caption.get('Format')),
})
# Deprioritize creator_caption json since it can't be embedded or used by media players
for lang, subs_list in subtitles.items():
subtitles[lang] = sorted(subs_list, key=lambda x: preference(x['ext']))
return subtitles
def _parse_url_key(self, url_key):
format_id, codec, res, bitrate = self._search_regex(
r'v[^_]+_(?P(?P[^_]+)_(?P\d+p)_(?P\d+))', url_key,
'url key', default=(None, None, None, None), group=('id', 'codec', 'res', 'bitrate'))
if not format_id:
return {}, None
return {
'format_id': format_id,
'vcodec': 'h265' if codec == 'bytevc1' else codec,
'tbr': int_or_none(bitrate, scale=1000) or None,
'quality': qualities(self.QUALITIES)(res),
}, res
def _parse_aweme_video_app(self, aweme_detail):
aweme_id = aweme_detail['aweme_id']
video_info = aweme_detail['video']
known_resolutions = {}
def audio_meta(url):
ext = determine_ext(url, default_ext='m4a')
return {
'format_note': 'Music track',
'ext': ext,
'acodec': 'aac' if ext == 'm4a' else ext,
'vcodec': 'none',
'width': None,
'height': None,
} if ext == 'mp3' or '-music-' in url else {}
def extract_addr(addr, add_meta={}):
parsed_meta, res = self._parse_url_key(addr.get('url_key', ''))
is_bytevc2 = parsed_meta.get('vcodec') == 'bytevc2'
if res:
known_resolutions.setdefault(res, {}).setdefault('height', int_or_none(addr.get('height')))
known_resolutions[res].setdefault('width', int_or_none(addr.get('width')))
parsed_meta.update(known_resolutions.get(res, {}))
add_meta.setdefault('height', int_or_none(res[:-1]))
return [{
'url': url,
'filesize': int_or_none(addr.get('data_size')),
'ext': 'mp4',
'acodec': 'aac',
'source_preference': -2 if 'aweme/v1' in url else -1, # Downloads from API might get blocked
**add_meta, **parsed_meta,
# bytevc2 is bytedance's own custom h266/vvc codec, as-of-yet unplayable
'preference': -100 if is_bytevc2 else -1,
'format_note': join_nonempty(
add_meta.get('format_note'), '(API)' if 'aweme/v1' in url else None,
'(UNPLAYABLE)' if is_bytevc2 else None, delim=' '),
**audio_meta(url),
} for url in addr.get('url_list') or []]
# Hack: Add direct video links first to prioritize them when removing duplicate formats
formats = []
width = int_or_none(video_info.get('width'))
height = int_or_none(video_info.get('height'))
ratio = try_call(lambda: width / height) or 0.5625
if video_info.get('play_addr'):
formats.extend(extract_addr(video_info['play_addr'], {
'format_id': 'play_addr',
'format_note': 'Direct video',
'vcodec': 'h265' if traverse_obj(
video_info, 'is_bytevc1', 'is_h265') else 'h264', # TODO: Check for "direct iOS" videos, like https://www.tiktok.com/@cookierun_dev/video/7039716639834656002
'width': width,
'height': height,
}))
if video_info.get('download_addr'):
download_addr = video_info['download_addr']
dl_width = int_or_none(download_addr.get('width'))
formats.extend(extract_addr(download_addr, {
'format_id': 'download_addr',
'format_note': 'Download video%s' % (', watermarked' if video_info.get('has_watermark') else ''),
'vcodec': 'h264',
'width': dl_width,
'height': try_call(lambda: int(dl_width / ratio)), # download_addr['height'] is wrong
'preference': -2 if video_info.get('has_watermark') else -1,
}))
if video_info.get('play_addr_h264'):
formats.extend(extract_addr(video_info['play_addr_h264'], {
'format_id': 'play_addr_h264',
'format_note': 'Direct video',
'vcodec': 'h264',
}))
if video_info.get('play_addr_bytevc1'):
formats.extend(extract_addr(video_info['play_addr_bytevc1'], {
'format_id': 'play_addr_bytevc1',
'format_note': 'Direct video',
'vcodec': 'h265',
}))
for bitrate in video_info.get('bit_rate', []):
if bitrate.get('play_addr'):
formats.extend(extract_addr(bitrate['play_addr'], {
'format_id': bitrate.get('gear_name'),
'format_note': 'Playback video',
'tbr': try_get(bitrate, lambda x: x['bit_rate'] / 1000),
'vcodec': 'h265' if traverse_obj(
bitrate, 'is_bytevc1', 'is_h265') else 'h264',
'fps': bitrate.get('FPS'),
}))
self._remove_duplicate_formats(formats)
auth_cookie = self._get_cookies(self._WEBPAGE_HOST).get('sid_tt')
if auth_cookie:
for f in formats:
self._set_cookie(urllib.parse.urlparse(f['url']).hostname, 'sid_tt', auth_cookie.value)
stats_info = aweme_detail.get('statistics') or {}
music_info = aweme_detail.get('music') or {}
labels = traverse_obj(aweme_detail, ('hybrid_label', ..., 'text'), expected_type=str)
contained_music_track = traverse_obj(
music_info, ('matched_song', 'title'), ('matched_pgc_sound', 'title'), expected_type=str)
contained_music_author = traverse_obj(
music_info, ('matched_song', 'author'), ('matched_pgc_sound', 'author'), 'author', expected_type=str)
is_generic_og_trackname = music_info.get('is_original_sound') and music_info.get('title') == 'original sound - {}'.format(music_info.get('owner_handle'))
if is_generic_og_trackname:
music_track, music_author = contained_music_track or 'original sound', contained_music_author
else:
music_track, music_author = music_info.get('title'), traverse_obj(music_info, ('author', {str}))
author_info = traverse_obj(aweme_detail, ('author', {
'uploader': ('unique_id', {str}),
'uploader_id': ('uid', {str_or_none}),
'channel': ('nickname', {str}),
'channel_id': ('sec_uid', {str}),
}))
return {
'id': aweme_id,
**traverse_obj(aweme_detail, {
'title': ('desc', {truncate_string(left=72)}),
'description': ('desc', {str}),
'timestamp': ('create_time', {int_or_none}),
}),
**traverse_obj(stats_info, {
'view_count': 'play_count',
'like_count': 'digg_count',
'repost_count': 'share_count',
'comment_count': 'comment_count',
'save_count': 'collect_count',
}, expected_type=int_or_none),
**author_info,
'channel_url': format_field(author_info, 'channel_id', self._UPLOADER_URL_FORMAT, default=None),
'uploader_url': format_field(
author_info, ['uploader', 'uploader_id'], self._UPLOADER_URL_FORMAT, default=None),
'track': music_track,
'album': str_or_none(music_info.get('album')) or None,
'artists': re.split(r'(?:, | & )', music_author) if music_author else None,
'formats': formats,
'subtitles': self.extract_subtitles(
aweme_detail, aweme_id, traverse_obj(author_info, 'uploader', 'uploader_id', 'channel_id')),
'thumbnails': [
{
'id': cover_id,
'url': cover_url,
'preference': -1 if cover_id in ('cover', 'origin_cover') else -2,
}
for cover_id in (
'cover', 'ai_dynamic_cover', 'animated_cover',
'ai_dynamic_cover_bak', 'origin_cover', 'dynamic_cover')
for cover_url in traverse_obj(video_info, (cover_id, 'url_list', ...))
],
'duration': (traverse_obj(video_info, (
(None, 'download_addr'), 'duration', {int_or_none(scale=1000)}, any))
or traverse_obj(music_info, ('duration', {int_or_none}))),
'availability': self._availability(
is_private='Private' in labels,
needs_subscription='Friends only' in labels,
is_unlisted='Followers only' in labels),
'_format_sort_fields': ('quality', 'codec', 'size', 'br'),
}
def _extract_web_formats(self, aweme_detail):
COMMON_FORMAT_INFO = {
'ext': 'mp4',
'vcodec': 'h264',
'acodec': 'aac',
}
video_info = traverse_obj(aweme_detail, ('video', {dict})) or {}
play_width = int_or_none(video_info.get('width'))
play_height = int_or_none(video_info.get('height'))
ratio = try_call(lambda: play_width / play_height) or 0.5625
formats = []
for bitrate_info in traverse_obj(video_info, ('bitrateInfo', lambda _, v: v['PlayAddr']['UrlList'])):
format_info, res = self._parse_url_key(
traverse_obj(bitrate_info, ('PlayAddr', 'UrlKey', {str})) or '')
# bytevc2 is bytedance's own custom h266/vvc codec, as-of-yet unplayable
is_bytevc2 = format_info.get('vcodec') == 'bytevc2'
format_info.update({
'format_note': 'UNPLAYABLE' if is_bytevc2 else None,
'preference': -100 if is_bytevc2 else -1,
'filesize': traverse_obj(bitrate_info, ('PlayAddr', 'DataSize', {int_or_none})),
})
if dimension := (res and int(res[:-1])):
if dimension == 540: # '540p' is actually 576p
dimension = 576
if ratio < 1: # portrait: res/dimension is width
y = int(dimension / ratio)
format_info.update({
'width': dimension,
'height': y - (y % 2),
})
else: # landscape: res/dimension is height
x = int(dimension * ratio)
format_info.update({
'width': x + (x % 2),
'height': dimension,
})
for video_url in traverse_obj(bitrate_info, ('PlayAddr', 'UrlList', ..., {url_or_none})):
formats.append({
**COMMON_FORMAT_INFO,
**format_info,
'url': self._proto_relative_url(video_url),
})
# We don't have res string for play formats, but need quality for sorting & de-duplication
play_quality = traverse_obj(formats, (lambda _, v: v['width'] == play_width, 'quality', any))
for play_url in traverse_obj(video_info, ('playAddr', ((..., 'src'), None), {url_or_none})):
formats.append({
**COMMON_FORMAT_INFO,
'format_id': 'play',
'url': self._proto_relative_url(play_url),
'width': play_width,
'height': play_height,
'quality': play_quality,
})
for download_url in traverse_obj(video_info, (('downloadAddr', ('download', 'url')), {url_or_none})):
formats.append({
**COMMON_FORMAT_INFO,
'format_id': 'download',
'url': self._proto_relative_url(download_url),
'format_note': 'watermarked',
'preference': -2,
})
self._remove_duplicate_formats(formats)
# Is it a slideshow with only audio for download?
if not formats and traverse_obj(aweme_detail, ('music', 'playUrl', {url_or_none})):
audio_url = aweme_detail['music']['playUrl']
ext = traverse_obj(parse_qs(audio_url), (
'mime_type', -1, {lambda x: x.replace('_', '/')}, {mimetype2ext})) or 'm4a'
formats.append({
'format_id': 'audio',
'url': self._proto_relative_url(audio_url),
'ext': ext,
'acodec': 'aac' if ext == 'm4a' else ext,
'vcodec': 'none',
})
# Filter out broken formats, see https://github.com/yt-dlp/yt-dlp/issues/11034
return [f for f in formats if urllib.parse.urlparse(f['url']).hostname != 'www.tiktok.com']
def _parse_aweme_video_web(self, aweme_detail, webpage_url, video_id, extract_flat=False):
author_info = traverse_obj(aweme_detail, (('authorInfo', 'author', None), {
'channel': ('nickname', {str}),
'channel_id': (('authorSecId', 'secUid'), {str}),
'uploader': (('uniqueId', 'author'), {str}),
'uploader_id': (('authorId', 'uid', 'id'), {str_or_none}),
}), get_all=False)
return {
'id': video_id,
'formats': None if extract_flat else self._extract_web_formats(aweme_detail),
'subtitles': None if extract_flat else self.extract_subtitles(aweme_detail, video_id, None),
'http_headers': {'Referer': webpage_url},
**author_info,
'channel_url': format_field(author_info, 'channel_id', self._UPLOADER_URL_FORMAT, default=None),
'uploader_url': format_field(
author_info, ['uploader', 'uploader_id'], self._UPLOADER_URL_FORMAT, default=None),
**traverse_obj(aweme_detail, ('music', {
'track': ('title', {str}),
'album': ('album', {str}, filter),
'artists': ('authorName', {str}, {lambda x: re.split(r'(?:, | & )', x) if x else None}),
'duration': ('duration', {int_or_none}),
})),
**traverse_obj(aweme_detail, {
'title': ('desc', {truncate_string(left=72)}),
'description': ('desc', {str}),
# audio-only slideshows have a video duration of 0 and an actual audio duration
'duration': ('video', 'duration', {int_or_none}, filter),
'timestamp': ('createTime', {int_or_none}),
}),
**traverse_obj(aweme_detail, ('stats', {
'view_count': 'playCount',
'like_count': 'diggCount',
'repost_count': 'shareCount',
'comment_count': 'commentCount',
'save_count': 'collectCount',
}), expected_type=int_or_none),
'thumbnails': [
{
'id': cover_id,
'url': self._proto_relative_url(cover_url),
'preference': -2 if cover_id == 'dynamicCover' else -1,
}
for cover_id in ('thumbnail', 'cover', 'dynamicCover', 'originCover')
for cover_url in traverse_obj(aweme_detail, ((None, 'video'), cover_id, {url_or_none}))
],
}
class TikTokIE(TikTokBaseIE):
_VALID_URL = r'https?://www\.tiktok\.com/(?:embed|@(?P[\w\.-]+)?/video)/(?P\d+)'
_EMBED_REGEX = [rf'<(?:script|iframe)[^>]+\bsrc=(["\'])(?P{_VALID_URL})']
_TESTS = [{
'url': 'https://www.tiktok.com/@leenabhushan/video/6748451240264420610',
'md5': '736bb7a466c6f0a6afeb597da1e6f5b7',
'info_dict': {
'id': '6748451240264420610',
'ext': 'mp4',
'title': '#jassmanak #lehanga #leenabhushan',
'description': '#jassmanak #lehanga #leenabhushan',
'duration': 13,
'height': 1024,
'width': 576,
'uploader': 'leenabhushan',
'uploader_id': '6691488002098119685',
'uploader_url': 'https://www.tiktok.com/@MS4wLjABAAAA_Eb4t1vodM1IuTy_cvp9CY22RAb59xqrO0Xtz9CYQJvgXaDvZxYnZYRzDWhhgJmy',
'creator': 'facestoriesbyleenabh',
'thumbnail': r're:^https?://[\w\/\.\-]+(~[\w\-]+\.image)?',
'upload_date': '20191016',
'timestamp': 1571246252,
'view_count': int,
'like_count': int,
'repost_count': int,
'comment_count': int,
'save_count': int,
'artist': 'Ysrbeats',
'album': 'Lehanga',
'track': 'Lehanga',
},
'skip': '404 Not Found',
}, {
'url': 'https://www.tiktok.com/@patroxofficial/video/6742501081818877190?langCountry=en',
'md5': 'f21112672ee4ce05ca390fb6522e1b6f',
'info_dict': {
'id': '6742501081818877190',
'ext': 'mp4',
'title': 'Tag 1 Friend reverse this Video and look what happens 🤩😱 @skyandtami ...',
'description': 'md5:5e2a23877420bb85ce6521dbee39ba94',
'duration': 27,
'height': 1024,
'width': 576,
'uploader': 'patrox',
'uploader_id': '18702747',
'uploader_url': 'https://www.tiktok.com/@patrox',
'channel_url': 'https://www.tiktok.com/@MS4wLjABAAAAiFnldaILebi5heDoVU6bn4jBWWycX6-9U3xuNPqZ8Ws',
'channel_id': 'MS4wLjABAAAAiFnldaILebi5heDoVU6bn4jBWWycX6-9U3xuNPqZ8Ws',
'channel': 'patroX',
'thumbnail': r're:^https?://[\w\/\.\-]+(~[\w\-]+\.image)?',
'upload_date': '20190930',
'timestamp': 1569860870,
'view_count': int,
'like_count': int,
'repost_count': int,
'comment_count': int,
'save_count': int,
'artists': ['Evan Todd', 'Jessica Keenan Wynn', 'Alice Lee', 'Barrett Wilbert Weed', 'Jon Eidson'],
'track': 'Big Fun',
},
}, {
# Banned audio, was available on the app, now works with web too
'url': 'https://www.tiktok.com/@barudakhb_/video/6984138651336838402',
'info_dict': {
'id': '6984138651336838402',
'ext': 'mp4',
'title': 'Balas @yolaaftwsr hayu yu ? #SquadRandom_ 🔥',
'description': 'Balas @yolaaftwsr hayu yu ? #SquadRandom_ 🔥',
'uploader': 'barudakhb_',
'channel': 'md5:29f238c49bc0c176cb3cef1a9cea9fa6',
'uploader_id': '6974687867511718913',
'uploader_url': 'https://www.tiktok.com/@barudakhb_',
'channel_url': 'https://www.tiktok.com/@MS4wLjABAAAAbhBwQC-R1iKoix6jDFsF-vBdfx2ABoDjaZrM9fX6arU3w71q3cOWgWuTXn1soZ7d',
'channel_id': 'MS4wLjABAAAAbhBwQC-R1iKoix6jDFsF-vBdfx2ABoDjaZrM9fX6arU3w71q3cOWgWuTXn1soZ7d',
'track': 'Boka Dance',
'artists': ['md5:29f238c49bc0c176cb3cef1a9cea9fa6'],
'timestamp': 1626121503,
'duration': 18,
'thumbnail': r're:^https?://[\w\/\.\-]+(~[\w\-]+\.image)?',
'upload_date': '20210712',
'view_count': int,
'like_count': int,
'repost_count': int,
'comment_count': int,
'save_count': int,
},
}, {
# Sponsored video, only available with feed workaround
'url': 'https://www.tiktok.com/@MS4wLjABAAAATh8Vewkn0LYM7Fo03iec3qKdeCUOcBIouRk1mkiag6h3o_pQu_dUXvZ2EZlGST7_/video/7042692929109986561',
'info_dict': {
'id': '7042692929109986561',
'ext': 'mp4',
'title': 'Slap and Run!',
'description': 'Slap and Run!',
'uploader': 'user440922249',
'channel': 'Slap And Run',
'uploader_id': '7036055384943690754',
'uploader_url': 'https://www.tiktok.com/@MS4wLjABAAAATh8Vewkn0LYM7Fo03iec3qKdeCUOcBIouRk1mkiag6h3o_pQu_dUXvZ2EZlGST7_',
'channel_id': 'MS4wLjABAAAATh8Vewkn0LYM7Fo03iec3qKdeCUOcBIouRk1mkiag6h3o_pQu_dUXvZ2EZlGST7_',
'track': 'Promoted Music',
'timestamp': 1639754738,
'duration': 30,
'thumbnail': r're:^https?://[\w\/\.\-]+(~[\w\-]+\.image)?',
'upload_date': '20211217',
'view_count': int,
'like_count': int,
'repost_count': int,
'comment_count': int,
'save_count': int,
},
'skip': 'This video is unavailable',
}, {
# Video without title and description
'url': 'https://www.tiktok.com/@pokemonlife22/video/7059698374567611694',
'info_dict': {
'id': '7059698374567611694',
'ext': 'mp4',
'title': 'TikTok video #7059698374567611694',
'description': '',
'uploader': 'pokemonlife22',
'channel': 'Pokemon',
'uploader_id': '6820838815978423302',
'uploader_url': 'https://www.tiktok.com/@pokemonlife22',
'channel_url': 'https://www.tiktok.com/@MS4wLjABAAAA0tF1nBwQVVMyrGu3CqttkNgM68Do1OXUFuCY0CRQk8fEtSVDj89HqoqvbSTmUP2W',
'channel_id': 'MS4wLjABAAAA0tF1nBwQVVMyrGu3CqttkNgM68Do1OXUFuCY0CRQk8fEtSVDj89HqoqvbSTmUP2W',
'track': 'original sound',
'timestamp': 1643714123,
'duration': 6,
'thumbnail': r're:^https?://[\w\/\.\-]+(~[\w\-]+\.image)?',
'upload_date': '20220201',
'artists': ['Pokemon'],
'view_count': int,
'like_count': int,
'repost_count': int,
'comment_count': int,
'save_count': int,
},
}, {
# hydration JSON is sent in a