1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-06-27 08:58:30 +00:00
This commit is contained in:
doe1080 2025-06-27 11:10:59 +09:00 committed by GitHub
commit cb402776ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -3,7 +3,6 @@
import itertools
import json
import re
import time
from .common import InfoExtractor, SearchInfoExtractor
from ..networking.exceptions import HTTPError
@ -16,12 +15,12 @@
float_or_none,
int_or_none,
parse_bitrate,
parse_duration,
parse_iso8601,
parse_qs,
parse_resolution,
qualities,
str_or_none,
time_seconds,
truncate_string,
unified_timestamp,
update_url_query,
@ -38,8 +37,14 @@
class NiconicoBaseIE(InfoExtractor):
_API_BASE = 'https://nvapi.nicovideo.jp'
_BASE_URL = 'https://www.nicovideo.jp'
_GEO_BYPASS = False
_GEO_COUNTRIES = ['JP']
_HEADERS = {
'X-Frontend-ID': '6',
'X-Frontend-Version': '0',
}
_LOGIN_BASE = 'https://account.nicovideo.jp'
_NETRC_MACHINE = 'niconico'
@ -99,6 +104,42 @@ class NiconicoIE(NiconicoBaseIE):
IE_NAME = 'niconico'
IE_DESC = 'ニコニコ動画'
_VALID_URL = r'https?://(?:(?:embed|sp|www)\.)?nicovideo\.jp/watch/(?P<id>(?:[a-z]{2})?\d+)'
_ERROR_MAP = {
'FORBIDDEN': {
'ADMINISTRATOR_DELETE_VIDEO': 'Video unavailable, possibly removed by admins',
'CHANNEL_MEMBER_ONLY': 'Channel members only',
'DELETED_CHANNEL_VIDEO': 'Video unavailable, channel was closed',
'DELETED_COMMUNITY_VIDEO': 'Video unavailable, community deleted or missing',
'DEFAULT': 'Page unavailable, check the URL',
'HARMFUL_VIDEO': 'Sensitive content, login required',
'HIDDEN_VIDEO': 'Video unavailable, set to private',
'NOT_ALLOWED': 'No parmission',
'PPV_VIDEO': 'PPV video, payment information required',
'PREMIUM_ONLY': 'Premium members only',
},
'INVALID_PARAMETER': {
'DEFAULT': 'Video unavailable, may not exist or was deleted',
},
'MAINTENANCE': {
'DEFAULT': 'Maintenance is in progress',
},
'NOT_FOUND': {
'DEFAULT': 'Video unavailable, may not exist or was deleted',
'RIGHT_HOLDER_DELETE_VIDEO': 'Removed by rights-holder request',
},
'UNAUTHORIZED': {
'DEFAULT': 'Invalid session, re-login required',
},
'UNKNOWN': {
'DEFAULT': 'Failed to fetch content',
},
}
_STATUS_MAP = {
'needs_auth': 'PPV video, payment information required',
'premium_only': 'Premium members only',
'subscriber_only': 'Channel members only',
}
_TESTS = [{
'url': 'http://www.nicovideo.jp/watch/sm22312215',
'info_dict': {
@ -118,6 +159,7 @@ class NiconicoIE(NiconicoBaseIE):
'tags': [],
},
'params': {'skip_download': 'm3u8'},
'skip': 'temporarily skip to keep the diff small',
}, {
# File downloaded with and without credentials are different, so omit
# the md5 field
@ -139,6 +181,7 @@ class NiconicoIE(NiconicoBaseIE):
'tags': ['Translation_Request', 'Kagamine_Rin', 'Rin_Original'],
},
'params': {'skip_download': 'm3u8'},
'skip': 'temporarily skip to keep the diff small',
}, {
# 'video exists but is marked as "deleted"
# md5 is unstable
@ -189,6 +232,7 @@ class NiconicoIE(NiconicoBaseIE):
'tags': [],
},
'params': {'skip_download': 'm3u8'},
'skip': 'temporarily skip to keep the diff small',
}, {
# "New" HTML5 video
'url': 'http://www.nicovideo.jp/watch/sm31464864',
@ -209,6 +253,7 @@ class NiconicoIE(NiconicoBaseIE):
'tags': [],
},
'params': {'skip_download': 'm3u8'},
'skip': 'temporarily skip to keep the diff small',
}, {
# Video without owner
'url': 'http://www.nicovideo.jp/watch/sm18238488',
@ -227,6 +272,7 @@ class NiconicoIE(NiconicoBaseIE):
'tags': [],
},
'params': {'skip_download': 'm3u8'},
'skip': 'temporarily skip to keep the diff small',
}, {
'url': 'http://sp.nicovideo.jp/watch/sm28964488?ss_pos=1&cp_in=wt_tg',
'only_matching': True,
@ -236,9 +282,7 @@ class NiconicoIE(NiconicoBaseIE):
'only_matching': True,
}]
_VALID_URL = r'https?://(?:(?:www\.|secure\.|sp\.)?nicovideo\.jp/watch|nico\.ms)/(?P<id>(?:[a-z]{2})?[0-9]+)'
def _yield_dms_formats(self, api_data, video_id):
def _extract_formats(self, api_data, video_id):
fmt_filter = lambda _, v: v['isAvailable'] and v['id']
videos = traverse_obj(api_data, ('media', 'domand', 'videos', fmt_filter))
audios = traverse_obj(api_data, ('media', 'domand', 'audios', fmt_filter))
@ -247,164 +291,137 @@ def _yield_dms_formats(self, api_data, video_id):
if not all((videos, audios, access_key, track_id)):
return
dms_m3u8_url = self._download_json(
f'https://nvapi.nicovideo.jp/v1/watch/{video_id}/access-rights/hls', video_id,
data=json.dumps({
m3u8_url = self._download_json(
f'{self._API_BASE}/v1/watch/{video_id}/access-rights/hls',
video_id, headers={
'Accept': 'application/json;charset=utf-8',
'Content-Type': 'application/json',
'X-Access-Right-Key': access_key,
'X-Request-With': self._BASE_URL,
**self._HEADERS,
}, query={
'actionTrackId': track_id,
}, data=json.dumps({
'outputs': list(itertools.product((v['id'] for v in videos), (a['id'] for a in audios))),
}).encode(), query={'actionTrackId': track_id}, headers={
'x-access-right-key': access_key,
'x-frontend-id': 6,
'x-frontend-version': 0,
'x-request-with': 'https://www.nicovideo.jp',
})['data']['contentUrl']
# Getting all audio formats results in duplicate video formats which we filter out later
dms_fmts = self._extract_m3u8_formats(dms_m3u8_url, video_id, 'mp4')
}).encode(),
)['data']['contentUrl']
raw_fmts = self._extract_m3u8_formats(m3u8_url, video_id, 'mp4')
# m3u8 extraction does not provide audio bitrates, so extract from the API data and fix
for audio_fmt in traverse_obj(dms_fmts, lambda _, v: v['vcodec'] == 'none'):
yield {
**audio_fmt,
**traverse_obj(audios, (lambda _, v: audio_fmt['format_id'].startswith(v['id']), {
'format_id': ('id', {str}),
formats = []
for a_fmt in traverse_obj(raw_fmts, lambda _, v: v['vcodec'] == 'none'):
formats.append({
**a_fmt,
**traverse_obj(audios, (lambda _, v: a_fmt['format_id'].startswith(v['id']), {
'abr': ('bitRate', {float_or_none(scale=1000)}),
'asr': ('samplingRate', {int_or_none}),
'format_id': ('id', {str}),
'quality': ('qualityLevel', {int_or_none}),
}), get_all=False),
}, any)),
'acodec': 'aac',
}
})
# Sort before removing dupes to keep the format dicts with the lowest tbr
video_fmts = sorted((fmt for fmt in dms_fmts if fmt['vcodec'] != 'none'), key=lambda f: f['tbr'])
self._remove_duplicate_formats(video_fmts)
# Sort first, keeping the lowest-tbr formats
v_fmts = sorted((fmt for fmt in raw_fmts if fmt['vcodec'] != 'none'), key=lambda f: f['tbr'])
self._remove_duplicate_formats(v_fmts)
# Calculate the true vbr/tbr by subtracting the lowest abr
min_abr = min(traverse_obj(audios, (..., 'bitRate', {float_or_none})), default=0) / 1000
for video_fmt in video_fmts:
video_fmt['tbr'] -= min_abr
video_fmt['format_id'] = url_basename(video_fmt['url']).rpartition('.')[0]
video_fmt['quality'] = traverse_obj(videos, (
lambda _, v: v['id'] == video_fmt['format_id'], 'qualityLevel', {int_or_none}, any)) or -1
yield video_fmt
min_abr = traverse_obj(audios, (
..., 'bitRate', {float_or_none(scale=1000)}, all, {min}), default=0)
for v_fmt in v_fmts:
v_fmt['format_id'] = url_basename(v_fmt['url']).rpartition('.')[0]
v_fmt['quality'] = traverse_obj(videos, (
lambda _, v: v['id'] == v_fmt['format_id'], 'qualityLevel', {int_or_none}, any), default=-1)
v_fmt['tbr'] -= min_abr
formats.extend(v_fmts)
def _extract_server_response(self, webpage, video_id, fatal=True):
try:
return traverse_obj(
self._parse_json(self._html_search_meta('server-response', webpage) or '', video_id),
('data', 'response', {dict}, {require('server response')}))
except ExtractorError:
if not fatal:
return {}
raise
return formats
def _real_extract(self, url):
video_id = self._match_id(url)
try:
webpage, handle = self._download_webpage_handle(
f'https://www.nicovideo.jp/watch/{video_id}', video_id,
headers=self.geo_verification_headers())
if video_id.startswith('so'):
video_id = self._match_id(handle.url)
path = 'v3' if self.is_logged_in else 'v3_guest'
api_resp = self._download_json(
f'{self._BASE_URL}/api/watch/{path}/{video_id}', video_id,
'Downloading API JSON', 'Unable to fetch data', headers={
**self._HEADERS,
**self.geo_verification_headers(),
}, query={
'actionTrackId': f'AAAAAAAAAA_{round(time_seconds() * 1000)}',
}, expected_status=[400, 404])
api_data = self._extract_server_response(webpage, video_id)
except ExtractorError as e:
try:
api_data = self._download_json(
f'https://www.nicovideo.jp/api/watch/v3/{video_id}', video_id,
'Downloading API JSON', 'Unable to fetch data', query={
'_frontendId': '6',
'_frontendVersion': '0',
'actionTrackId': f'AAAAAAAAAA_{round(time.time() * 1000)}',
}, headers=self.geo_verification_headers())['data']
except ExtractorError:
if not isinstance(e.cause, HTTPError):
# Raise if original exception was from _parse_json or utils.traversal.require
raise
# The webpage server response has more detailed error info than the API response
webpage = e.cause.response.read().decode('utf-8', 'replace')
reason_code = self._extract_server_response(
webpage, video_id, fatal=False).get('reasonCode')
if not reason_code:
raise
if reason_code in ('DOMESTIC_VIDEO', 'HIGH_RISK_COUNTRY_VIDEO'):
self.raise_geo_restricted(countries=self._GEO_COUNTRIES)
elif reason_code == 'HIDDEN_VIDEO':
raise ExtractorError(
'The viewing period of this video has expired', expected=True)
elif reason_code == 'DELETED_VIDEO':
raise ExtractorError('This video has been deleted', expected=True)
raise ExtractorError(f'Niconico says: {reason_code}')
api_data = api_resp['data']
release_timestamp = traverse_obj(api_data, ('publishScheduledAt', {parse_iso8601}))
availability = self._availability(**(traverse_obj(api_data, ('payment', 'video', {
'needs_premium': ('isPremium', {bool}),
'needs_subscription': ('isAdmission', {bool}),
})) or {'needs_auth': True}))
meta = api_resp['meta']
if meta.get('status') != 200:
err_code = meta['errorCode']
reason_code = traverse_obj(api_data, ('reasonCode', {str_or_none}))
err_msg = 'Server busy, service temporarily unavailable'
formats = list(self._yield_dms_formats(api_data, video_id))
if reason_code in ('DOMESTIC_VIDEO', 'HIGH_RISK_COUNTRY_VIDEO'):
self.raise_geo_restricted(countries=self._GEO_COUNTRIES)
elif reason_code == 'HARMFUL_VIDEO' and traverse_obj(api_data, (
'viewer', 'allowSensitiveContents', {bool},
)) is False:
err_msg = 'Sensitive content, adjust display settings to watch'
elif reason_code == 'HIDDEN_VIDEO' and release_timestamp:
err_msg = f'Scheduled release, please wait. Release time: {release_timestamp}'
elif msg := traverse_obj(self._ERROR_MAP, (
err_code.upper(), (reason_code, 'DEFAULT'), {str}, any,
)):
err_msg = msg
raise ExtractorError(err_msg, expected=True)
availability = self._availability(**{
**dict.fromkeys(('is_private', 'is_unlisted'), False),
**traverse_obj(api_data, ('payment', 'video', {
'needs_auth': (('isContinuationBenefit', 'isPpv'), {bool}, any),
'needs_premium': ('isPremium', {bool}),
'needs_subscription': ('isAdmission', {bool}),
})),
})
formats = self._extract_formats(api_data, video_id)
if not formats:
fail_msg = clean_html(self._html_search_regex(
r'<p[^>]+\bclass="fail-message"[^>]*>(?P<msg>.+?)</p>',
webpage, 'fail message', default=None, group='msg'))
if fail_msg:
self.to_screen(f'Niconico said: {fail_msg}')
if fail_msg and 'された地域と同じ地域からのみ視聴できます。' in fail_msg:
availability = None
self.raise_geo_restricted(countries=self._GEO_COUNTRIES, metadata_available=True)
elif availability == 'premium_only':
self.raise_login_required('This video requires premium', metadata_available=True)
elif availability == 'subscriber_only':
self.raise_login_required('This video is for members only', metadata_available=True)
elif availability == 'needs_auth':
self.raise_login_required(metadata_available=False)
# Start extracting information
tags = None
if webpage:
# use og:video:tag (not logged in)
og_video_tags = re.finditer(r'<meta\s+property="og:video:tag"\s*content="(.*?)">', webpage)
tags = list(filter(None, (clean_html(x.group(1)) for x in og_video_tags)))
if not tags:
# use keywords and split with comma (not logged in)
kwds = self._html_search_meta('keywords', webpage, default=None)
if kwds:
tags = [x for x in kwds.split(',') if x]
if not tags:
# find in json (logged in)
tags = traverse_obj(api_data, ('tag', 'items', ..., 'name'))
if err_msg := self._STATUS_MAP.get(availability):
self.raise_login_required(err_msg, metadata_available=True)
thumb_prefs = qualities(['url', 'middleUrl', 'largeUrl', 'player', 'ogp'])
def get_video_info(*items, get_first=True, **kwargs):
return traverse_obj(api_data, ('video', *items), get_all=not get_first, **kwargs)
return {
'id': video_id,
'_api_data': api_data,
'title': get_video_info(('originalTitle', 'title')) or self._og_search_title(webpage, default=None),
'formats': formats,
'availability': availability,
'thumbnails': [{
'id': key,
'url': url,
'ext': 'jpg',
'preference': thumb_prefs(key),
**parse_resolution(url, lenient=True),
} for key, url in (get_video_info('thumbnail') or {}).items() if url],
'description': clean_html(get_video_info('description')),
'uploader': traverse_obj(api_data, ('owner', 'nickname'), ('channel', 'name'), ('community', 'name')),
'uploader_id': str_or_none(traverse_obj(api_data, ('owner', 'id'), ('channel', 'id'), ('community', 'id'))),
'timestamp': parse_iso8601(get_video_info('registeredAt')) or parse_iso8601(
self._html_search_meta('video:release_date', webpage, 'date published', default=None)),
'channel': traverse_obj(api_data, ('channel', 'name'), ('community', 'name')),
'channel_id': traverse_obj(api_data, ('channel', 'id'), ('community', 'id')),
'view_count': int_or_none(get_video_info('count', 'view')),
'tags': tags,
'genre': traverse_obj(api_data, ('genre', 'label'), ('genre', 'key')),
'comment_count': get_video_info('count', 'comment', expected_type=int),
'duration': (
parse_duration(self._html_search_meta('video:duration', webpage, 'video duration', default=None))
or get_video_info('duration')),
'webpage_url': url_or_none(url) or f'https://www.nicovideo.jp/watch/{video_id}',
'display_id': video_id,
'formats': formats,
'genres': traverse_obj(api_data, ('genre', 'label', {str}, filter, all, filter)),
'release_timestamp': release_timestamp,
'subtitles': self.extract_subtitles(video_id, api_data),
'tags': traverse_obj(api_data, ('tag', 'items', ..., 'name', {str}, filter, all, filter)),
'thumbnails': [{
'ext': 'jpg',
'id': key,
'preference': thumb_prefs(key),
'url': url,
**parse_resolution(url, lenient=True),
} for key, url in traverse_obj(api_data, (
'video', 'thumbnail', {dict}), default={}).items()],
**traverse_obj(api_data, (('channel', 'owner'), any, {
'channel': (('name', 'nickname'), {str}, any),
'channel_id': ('id', {str_or_none}),
'uploader': (('name', 'nickname'), {str}, any),
'uploader_id': ('id', {str_or_none}),
})),
**traverse_obj(api_data, ('video', {
'id': ('id', {str_or_none}),
'title': ('title', {str}),
'description': ('description', {clean_html}, filter),
'duration': ('duration', {int_or_none}),
'timestamp': ('registeredAt', {parse_iso8601}),
})),
**traverse_obj(api_data, ('video', 'count', {
'comment_count': ('comment', {int_or_none}),
'like_count': ('like', {int_or_none}),
'view_count': ('view', {int_or_none}),
})),
}
def _get_subtitles(self, video_id, api_data):
@ -413,21 +430,19 @@ def _get_subtitles(self, video_id, api_data):
return
danmaku = traverse_obj(self._download_json(
f'{comments_info["server"]}/v1/threads', video_id, data=json.dumps({
f'{comments_info["server"]}/v1/threads', video_id,
'Downloading comments', 'Failed to download comments', headers={
'Content-Type': 'text/plain;charset=UTF-8',
'Origin': self._BASE_URL,
'Referer': f'{self._BASE_URL}/',
'X-Client-Os-Type': 'others',
**self._HEADERS,
}, data=json.dumps({
'additionals': {},
'params': comments_info.get('params'),
'threadKey': comments_info.get('threadKey'),
}).encode(), fatal=False,
headers={
'Referer': 'https://www.nicovideo.jp/',
'Origin': 'https://www.nicovideo.jp',
'Content-Type': 'text/plain;charset=UTF-8',
'x-client-os-type': 'others',
'x-frontend-id': '6',
'x-frontend-version': '0',
},
note='Downloading comments', errnote='Failed to download comments'),
('data', 'threads', ..., 'comments', ...))
), ('data', 'threads', ..., 'comments', ...))
return {
'comments': [{