mirror of
				https://github.com/yt-dlp/yt-dlp.git
				synced 2025-11-04 00:25:15 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			296 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			296 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import itertools
 | 
						|
import json
 | 
						|
 | 
						|
from .common import InfoExtractor
 | 
						|
from ..utils import (
 | 
						|
    ExtractorError,
 | 
						|
    int_or_none,
 | 
						|
    str_or_none,
 | 
						|
    traverse_obj,
 | 
						|
    unified_strdate,
 | 
						|
    unified_timestamp,
 | 
						|
    url_basename,
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
class TrillerBaseIE(InfoExtractor):
 | 
						|
    _NETRC_MACHINE = 'triller'
 | 
						|
    _API_BASE_URL = 'https://social.triller.co/v1.5'
 | 
						|
    _API_HEADERS = {'Origin': 'https://triller.co'}
 | 
						|
 | 
						|
    def _perform_login(self, username, password):
 | 
						|
        if self._API_HEADERS.get('Authorization'):
 | 
						|
            return
 | 
						|
 | 
						|
        user_check = self._download_json(
 | 
						|
            f'{self._API_BASE_URL}/api/user/is-valid-username', None, note='Checking username',
 | 
						|
            fatal=False, expected_status=400, headers={
 | 
						|
                'Content-Type': 'application/json',
 | 
						|
                'Origin': 'https://triller.co',
 | 
						|
            }, data=json.dumps({'username': username}, separators=(',', ':')).encode('utf-8'))
 | 
						|
        if user_check.get('status'):  # endpoint returns "status":false if username exists
 | 
						|
            raise ExtractorError('Unable to login: Invalid username', expected=True)
 | 
						|
 | 
						|
        credentials = {
 | 
						|
            'username': username,
 | 
						|
            'password': password,
 | 
						|
        }
 | 
						|
        login = self._download_json(
 | 
						|
            f'{self._API_BASE_URL}/user/auth', None, note='Logging in',
 | 
						|
            fatal=False, expected_status=400, headers={
 | 
						|
                'Content-Type': 'application/json',
 | 
						|
                'Origin': 'https://triller.co',
 | 
						|
            }, data=json.dumps(credentials, separators=(',', ':')).encode('utf-8'))
 | 
						|
        if not login.get('auth_token'):
 | 
						|
            if login.get('error') == 1008:
 | 
						|
                raise ExtractorError('Unable to login: Incorrect password', expected=True)
 | 
						|
            raise ExtractorError('Unable to login')
 | 
						|
 | 
						|
        self._API_HEADERS['Authorization'] = f'Bearer {login["auth_token"]}'
 | 
						|
 | 
						|
    def _get_comments(self, video_id, limit=15):
 | 
						|
        comment_info = self._download_json(
 | 
						|
            f'{self._API_BASE_URL}/api/videos/{video_id}/comments_v2',
 | 
						|
            video_id, fatal=False, note='Downloading comments API JSON',
 | 
						|
            headers=self._API_HEADERS, query={'limit': limit}) or {}
 | 
						|
        if not comment_info.get('comments'):
 | 
						|
            return
 | 
						|
        for comment_dict in comment_info['comments']:
 | 
						|
            yield {
 | 
						|
                'author': traverse_obj(comment_dict, ('author', 'username')),
 | 
						|
                'author_id': traverse_obj(comment_dict, ('author', 'user_id')),
 | 
						|
                'id': comment_dict.get('id'),
 | 
						|
                'text': comment_dict.get('body'),
 | 
						|
                'timestamp': unified_timestamp(comment_dict.get('timestamp')),
 | 
						|
            }
 | 
						|
 | 
						|
    def _check_user_info(self, user_info):
 | 
						|
        if not user_info:
 | 
						|
            self.report_warning('Unable to extract user info')
 | 
						|
        elif user_info.get('private') and not user_info.get('followed_by_me'):
 | 
						|
            raise ExtractorError('This video is private', expected=True)
 | 
						|
        elif traverse_obj(user_info, 'blocked_by_user', 'blocking_user'):
 | 
						|
            raise ExtractorError('The author of the video is blocked', expected=True)
 | 
						|
        return user_info
 | 
						|
 | 
						|
    def _parse_video_info(self, video_info, username, user_info=None):
 | 
						|
        video_uuid = video_info.get('video_uuid')
 | 
						|
        video_id = video_info.get('id')
 | 
						|
 | 
						|
        formats = []
 | 
						|
        video_url = traverse_obj(video_info, 'video_url', 'stream_url')
 | 
						|
        if video_url:
 | 
						|
            formats.append({
 | 
						|
                'url': video_url,
 | 
						|
                'ext': 'mp4',
 | 
						|
                'vcodec': 'h264',
 | 
						|
                'width': video_info.get('width'),
 | 
						|
                'height': video_info.get('height'),
 | 
						|
                'format_id': url_basename(video_url).split('.')[0],
 | 
						|
                'filesize': video_info.get('filesize'),
 | 
						|
            })
 | 
						|
        video_set = video_info.get('video_set') or []
 | 
						|
        for video in video_set:
 | 
						|
            resolution = video.get('resolution') or ''
 | 
						|
            formats.append({
 | 
						|
                'url': video['url'],
 | 
						|
                'ext': 'mp4',
 | 
						|
                'vcodec': video.get('codec'),
 | 
						|
                'vbr': int_or_none(video.get('bitrate'), 1000),
 | 
						|
                'width': int_or_none(resolution.split('x')[0]),
 | 
						|
                'height': int_or_none(resolution.split('x')[1]),
 | 
						|
                'format_id': url_basename(video['url']).split('.')[0],
 | 
						|
            })
 | 
						|
        audio_url = video_info.get('audio_url')
 | 
						|
        if audio_url:
 | 
						|
            formats.append({
 | 
						|
                'url': audio_url,
 | 
						|
                'ext': 'm4a',
 | 
						|
                'format_id': url_basename(audio_url).split('.')[0],
 | 
						|
            })
 | 
						|
 | 
						|
        manifest_url = video_info.get('transcoded_url')
 | 
						|
        if manifest_url:
 | 
						|
            formats.extend(self._extract_m3u8_formats(
 | 
						|
                manifest_url, video_id, 'mp4', entry_protocol='m3u8_native',
 | 
						|
                m3u8_id='hls', fatal=False))
 | 
						|
        self._sort_formats(formats)
 | 
						|
 | 
						|
        comment_count = int_or_none(video_info.get('comment_count'))
 | 
						|
 | 
						|
        user_info = user_info or traverse_obj(video_info, 'user', default={})
 | 
						|
 | 
						|
        return {
 | 
						|
            'id': str_or_none(video_id) or video_uuid,
 | 
						|
            'title': video_info.get('description') or f'Video by {username}',
 | 
						|
            'thumbnail': video_info.get('thumbnail_url'),
 | 
						|
            'description': video_info.get('description'),
 | 
						|
            'uploader': str_or_none(username),
 | 
						|
            'uploader_id': str_or_none(user_info.get('user_id')),
 | 
						|
            'creator': str_or_none(user_info.get('name')),
 | 
						|
            'timestamp': unified_timestamp(video_info.get('timestamp')),
 | 
						|
            'upload_date': unified_strdate(video_info.get('timestamp')),
 | 
						|
            'duration': int_or_none(video_info.get('duration')),
 | 
						|
            'view_count': int_or_none(video_info.get('play_count')),
 | 
						|
            'like_count': int_or_none(video_info.get('likes_count')),
 | 
						|
            'artist': str_or_none(video_info.get('song_artist')),
 | 
						|
            'track': str_or_none(video_info.get('song_title')),
 | 
						|
            'webpage_url': f'https://triller.co/@{username}/video/{video_uuid}',
 | 
						|
            'uploader_url': f'https://triller.co/@{username}',
 | 
						|
            'extractor_key': TrillerIE.ie_key(),
 | 
						|
            'extractor': TrillerIE.IE_NAME,
 | 
						|
            'formats': formats,
 | 
						|
            'comment_count': comment_count,
 | 
						|
            '__post_extractor': self.extract_comments(video_id, comment_count),
 | 
						|
        }
 | 
						|
 | 
						|
 | 
						|
class TrillerIE(TrillerBaseIE):
 | 
						|
    _VALID_URL = r'''(?x)
 | 
						|
            https?://(?:www\.)?triller\.co/
 | 
						|
            @(?P<username>[\w\._]+)/video/
 | 
						|
            (?P<id>[\da-f]{8}-[\da-f]{4}-[\da-f]{4}-[\da-f]{4}-[\da-f]{12})
 | 
						|
        '''
 | 
						|
    _TESTS = [{
 | 
						|
        'url': 'https://triller.co/@theestallion/video/2358fcd7-3df2-4c77-84c8-1d091610a6cf',
 | 
						|
        'md5': '228662d783923b60d78395fedddc0a20',
 | 
						|
        'info_dict': {
 | 
						|
            'id': '71595734',
 | 
						|
            'ext': 'mp4',
 | 
						|
            'title': 'md5:9a2bf9435c5c4292678996a464669416',
 | 
						|
            'thumbnail': r're:^https://uploads\.cdn\.triller\.co/.+\.jpg$',
 | 
						|
            'description': 'md5:9a2bf9435c5c4292678996a464669416',
 | 
						|
            'uploader': 'theestallion',
 | 
						|
            'uploader_id': '18992236',
 | 
						|
            'creator': 'Megan Thee Stallion',
 | 
						|
            'timestamp': 1660598222,
 | 
						|
            'upload_date': '20220815',
 | 
						|
            'duration': 47,
 | 
						|
            'height': 3840,
 | 
						|
            'width': 2160,
 | 
						|
            'view_count': int,
 | 
						|
            'like_count': int,
 | 
						|
            'artist': 'Megan Thee Stallion',
 | 
						|
            'track': 'Her',
 | 
						|
            'webpage_url': 'https://triller.co/@theestallion/video/2358fcd7-3df2-4c77-84c8-1d091610a6cf',
 | 
						|
            'uploader_url': 'https://triller.co/@theestallion',
 | 
						|
            'comment_count': int,
 | 
						|
        }
 | 
						|
    }, {
 | 
						|
        'url': 'https://triller.co/@charlidamelio/video/46c6fcfa-aa9e-4503-a50c-68444f44cddc',
 | 
						|
        'md5': '874055f462af5b0699b9dbb527a505a0',
 | 
						|
        'info_dict': {
 | 
						|
            'id': '71621339',
 | 
						|
            'ext': 'mp4',
 | 
						|
            'title': 'md5:4c91ea82760fe0fffb71b8c3aa7295fc',
 | 
						|
            'thumbnail': r're:^https://uploads\.cdn\.triller\.co/.+\.jpg$',
 | 
						|
            'description': 'md5:4c91ea82760fe0fffb71b8c3aa7295fc',
 | 
						|
            'uploader': 'charlidamelio',
 | 
						|
            'uploader_id': '1875551',
 | 
						|
            'creator': 'charli damelio',
 | 
						|
            'timestamp': 1660773354,
 | 
						|
            'upload_date': '20220817',
 | 
						|
            'duration': 16,
 | 
						|
            'height': 1920,
 | 
						|
            'width': 1080,
 | 
						|
            'view_count': int,
 | 
						|
            'like_count': int,
 | 
						|
            'artist': 'Dixie',
 | 
						|
            'track': 'Someone to Blame',
 | 
						|
            'webpage_url': 'https://triller.co/@charlidamelio/video/46c6fcfa-aa9e-4503-a50c-68444f44cddc',
 | 
						|
            'uploader_url': 'https://triller.co/@charlidamelio',
 | 
						|
            'comment_count': int,
 | 
						|
        }
 | 
						|
    }]
 | 
						|
 | 
						|
    def _real_extract(self, url):
 | 
						|
        username, video_uuid = self._match_valid_url(url).group('username', 'id')
 | 
						|
 | 
						|
        video_info = traverse_obj(self._download_json(
 | 
						|
            f'{self._API_BASE_URL}/api/videos/{video_uuid}',
 | 
						|
            video_uuid, note='Downloading video info API JSON',
 | 
						|
            errnote='Unable to download video info API JSON',
 | 
						|
            headers=self._API_HEADERS), ('videos', 0))
 | 
						|
        if not video_info:
 | 
						|
            raise ExtractorError('No video info found in API response')
 | 
						|
 | 
						|
        user_info = self._check_user_info(video_info.get('user') or {})
 | 
						|
        return self._parse_video_info(video_info, username, user_info)
 | 
						|
 | 
						|
 | 
						|
class TrillerUserIE(TrillerBaseIE):
 | 
						|
    _VALID_URL = r'https?://(?:www\.)?triller\.co/@(?P<id>[\w\._]+)/?(?:$|[#?])'
 | 
						|
    _TESTS = [{
 | 
						|
        # first videos request only returns 2 videos
 | 
						|
        'url': 'https://triller.co/@theestallion',
 | 
						|
        'playlist_mincount': 9,
 | 
						|
        'info_dict': {
 | 
						|
            'id': '18992236',
 | 
						|
            'title': 'theestallion',
 | 
						|
            'thumbnail': r're:^https://uploads\.cdn\.triller\.co/.+\.jpg$',
 | 
						|
        }
 | 
						|
    }, {
 | 
						|
        'url': 'https://triller.co/@charlidamelio',
 | 
						|
        'playlist_mincount': 25,
 | 
						|
        'info_dict': {
 | 
						|
            'id': '1875551',
 | 
						|
            'title': 'charlidamelio',
 | 
						|
            'thumbnail': r're:^https://uploads\.cdn\.triller\.co/.+\.jpg$',
 | 
						|
        }
 | 
						|
    }]
 | 
						|
 | 
						|
    def _real_initialize(self):
 | 
						|
        if not self._API_HEADERS.get('Authorization'):
 | 
						|
            guest = self._download_json(
 | 
						|
                f'{self._API_BASE_URL}/user/create_guest',
 | 
						|
                None, note='Creating guest session', data=b'', headers=self._API_HEADERS, query={
 | 
						|
                    'platform': 'Web',
 | 
						|
                    'app_version': '',
 | 
						|
                })
 | 
						|
            if not guest.get('auth_token'):
 | 
						|
                raise ExtractorError('Unable to fetch required auth token for user extraction')
 | 
						|
 | 
						|
            self._API_HEADERS['Authorization'] = f'Bearer {guest["auth_token"]}'
 | 
						|
 | 
						|
    def _extract_video_list(self, username, user_id, limit=6):
 | 
						|
        query = {
 | 
						|
            'limit': limit,
 | 
						|
        }
 | 
						|
        for page in itertools.count(1):
 | 
						|
            for retry in self.RetryManager():
 | 
						|
                try:
 | 
						|
                    video_list = self._download_json(
 | 
						|
                        f'{self._API_BASE_URL}/api/users/{user_id}/videos',
 | 
						|
                        username, note=f'Downloading user video list page {page}',
 | 
						|
                        errnote='Unable to download user video list', headers=self._API_HEADERS,
 | 
						|
                        query=query)
 | 
						|
                except ExtractorError as e:
 | 
						|
                    if isinstance(e.cause, json.JSONDecodeError) and e.cause.pos == 0:
 | 
						|
                        retry.error = e
 | 
						|
                        continue
 | 
						|
                    raise
 | 
						|
            if not video_list.get('videos'):
 | 
						|
                break
 | 
						|
            yield from video_list['videos']
 | 
						|
            query['before_time'] = traverse_obj(video_list, ('videos', -1, 'timestamp'))
 | 
						|
            if not query['before_time']:
 | 
						|
                break
 | 
						|
 | 
						|
    def _entries(self, videos, username, user_info):
 | 
						|
        for video in videos:
 | 
						|
            yield self._parse_video_info(video, username, user_info)
 | 
						|
 | 
						|
    def _real_extract(self, url):
 | 
						|
        username = self._match_id(url)
 | 
						|
        user_info = self._check_user_info(self._download_json(
 | 
						|
            f'{self._API_BASE_URL}/api/users/by_username/{username}',
 | 
						|
            username, note='Downloading user info',
 | 
						|
            errnote='Failed to download user info', headers=self._API_HEADERS).get('user', {}))
 | 
						|
 | 
						|
        user_id = str_or_none(user_info.get('user_id'))
 | 
						|
        videos = self._extract_video_list(username, user_id)
 | 
						|
        thumbnail = user_info.get('avatar_url')
 | 
						|
 | 
						|
        return self.playlist_result(
 | 
						|
            self._entries(videos, username, user_info), user_id, username, thumbnail=thumbnail)
 |