1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-10-26 12:10:59 +00:00
yt-dlp/yt_dlp/extractor/wppilot.py
Lauren Liberda c0599d4fe4
[wppilot] Add extractors (#1401)
Authored by: selfisekai
2021-11-10 06:16:18 +05:30

178 lines
6.1 KiB
Python

# coding: utf-8
from .common import InfoExtractor
from ..utils import (
try_get,
ExtractorError,
)
import json
import random
import re
class WPPilotBaseIE(InfoExtractor):
_VIDEO_URL = 'https://pilot.wp.pl/api/v1/channel/%s'
_VIDEO_GUEST_URL = 'https://pilot.wp.pl/api/v1/guest/channel/%s'
_HEADERS_WEB = {
'Content-Type': 'application/json; charset=UTF-8',
'Referer': 'https://pilot.wp.pl/tv/',
}
def _get_channel_list(self, cache=True):
if cache is True:
cache_res = self._downloader.cache.load('wppilot', 'channel-list')
if cache_res:
return cache_res, True
webpage = self._download_webpage('https://pilot.wp.pl/tv/', None, 'Downloading webpage')
page_data_base_url = self._search_regex(
r'<script src="(https://wp-pilot-gatsby\.wpcdn\.pl/v[\d.-]+/desktop)',
webpage, 'gatsby build version') + '/page-data'
page_data = self._download_json(f'{page_data_base_url}/tv/page-data.json', None, 'Downloading page data')
for qhash in page_data['staticQueryHashes']:
qhash_content = self._download_json(
f'{page_data_base_url}/sq/d/{qhash}.json', None,
'Searching for channel list')
channel_list = try_get(qhash_content, lambda x: x['data']['allChannels']['nodes'])
if channel_list is None:
continue
self._downloader.cache.store('wppilot', 'channel-list', channel_list)
return channel_list, False
raise ExtractorError('Unable to find the channel list')
def _parse_channel(self, chan):
return {
'id': str(chan['id']),
'title': chan['name'],
'is_live': True,
'thumbnails': [{
'id': key,
'url': chan[key],
} for key in ('thumbnail', 'thumbnail_mobile', 'icon') if chan.get(key)],
}
class WPPilotIE(WPPilotBaseIE):
_VALID_URL = r'(?:https?://pilot\.wp\.pl/tv/?#|wppilot:)(?P<id>[a-z\d-]+)'
IE_NAME = 'wppilot'
_TESTS = [{
'url': 'https://pilot.wp.pl/tv/#telewizja-wp-hd',
'info_dict': {
'id': '158',
'ext': 'mp4',
'title': 'Telewizja WP HD',
},
'params': {
'format': 'bestvideo',
},
}, {
# audio only
'url': 'https://pilot.wp.pl/tv/#radio-nowy-swiat',
'info_dict': {
'id': '238',
'ext': 'm4a',
'title': 'Radio Nowy Świat',
},
'params': {
'format': 'bestaudio',
},
}, {
'url': 'wppilot:9',
'only_matching': True,
}]
def _get_channel(self, id_or_slug):
video_list, is_cached = self._get_channel_list(cache=True)
key = 'id' if re.match(r'^\d+$', id_or_slug) else 'slug'
for video in video_list:
if video.get(key) == id_or_slug:
return self._parse_channel(video)
# if cached channel not found, download and retry
if is_cached:
video_list, _ = self._get_channel_list(cache=False)
for video in video_list:
if video.get(key) == id_or_slug:
return self._parse_channel(video)
raise ExtractorError('Channel not found')
def _real_extract(self, url):
video_id = self._match_id(url)
channel = self._get_channel(video_id)
video_id = str(channel['id'])
is_authorized = next((c for c in self._downloader.cookiejar if c.name == 'netviapisessid'), None)
# cookies starting with "g:" are assigned to guests
is_authorized = True if is_authorized is not None and not is_authorized.value.startswith('g:') else False
video = self._download_json(
(self._VIDEO_URL if is_authorized else self._VIDEO_GUEST_URL) % video_id,
video_id, query={
'device_type': 'web',
}, headers=self._HEADERS_WEB,
expected_status=(200, 422))
stream_token = try_get(video, lambda x: x['_meta']['error']['info']['stream_token'])
if stream_token:
close = self._download_json(
'https://pilot.wp.pl/api/v1/channels/close', video_id,
'Invalidating previous stream session', headers=self._HEADERS_WEB,
data=json.dumps({
'channelId': video_id,
't': stream_token,
}).encode('utf-8'))
if try_get(close, lambda x: x['data']['status']) == 'ok':
return self.url_result(url, ie=WPPilotIE.ie_key())
formats = []
for fmt in video['data']['stream_channel']['streams']:
# live DASH does not work for now
# if fmt['type'] == 'dash@live:abr':
# formats.extend(
# self._extract_mpd_formats(
# random.choice(fmt['url']), video_id))
if fmt['type'] == 'hls@live:abr':
formats.extend(
self._extract_m3u8_formats(
random.choice(fmt['url']),
video_id, live=True))
self._sort_formats(formats)
channel['formats'] = formats
return channel
class WPPilotChannelsIE(WPPilotBaseIE):
_VALID_URL = r'(?:https?://pilot\.wp\.pl/(?:tv/?)?(?:\?[^#]*)?#?|wppilot:)$'
IE_NAME = 'wppilot:channels'
_TESTS = [{
'url': 'wppilot:',
'info_dict': {
'id': 'wppilot',
'title': 'WP Pilot',
},
'playlist_mincount': 100,
}, {
'url': 'https://pilot.wp.pl/',
'only_matching': True,
}]
def _entries(self):
channel_list, _ = self._get_channel_list()
for chan in channel_list:
entry = self._parse_channel(chan)
entry.update({
'_type': 'url_transparent',
'url': f'wppilot:{chan["id"]}',
'ie_key': WPPilotIE.ie_key(),
})
yield entry
def _real_extract(self, url):
return self.playlist_result(self._entries(), 'wppilot', 'WP Pilot')