1
0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-11-13 13:05:13 +00:00
Files
yt-dlp/test/test_jsc/test_provider.py
Simon Sawicki 6224a38988 [ie/youtube] Implement external n/sig solver (#14157)
Closes #14404, Closes #14431, Closes #14680, Closes #14707

Authored by: bashonly, coletdjnz, seproDev, Grub4K

Co-authored-by: coletdjnz <coletdjnz@protonmail.com>
Co-authored-by: bashonly <bashonly@protonmail.com>
Co-authored-by: sepro <sepro@sepr0.com>
2025-10-31 23:13:04 +01:00

195 lines
7.6 KiB
Python

import pytest
from yt_dlp.extractor.youtube.jsc.provider import (
JsChallengeProvider,
JsChallengeRequest,
JsChallengeProviderResponse,
JsChallengeProviderRejectedRequest,
JsChallengeType,
JsChallengeResponse,
NChallengeOutput,
NChallengeInput,
JsChallengeProviderError,
register_provider,
register_preference,
)
from yt_dlp.extractor.youtube.pot._provider import IEContentProvider
from yt_dlp.utils import ExtractorError
from yt_dlp.extractor.youtube.jsc._registry import _jsc_preferences, _jsc_providers
class ExampleJCP(JsChallengeProvider):
PROVIDER_NAME = 'example-provider'
PROVIDER_VERSION = '0.0.1'
BUG_REPORT_LOCATION = 'https://example.com/issues'
_SUPPORTED_TYPES = [JsChallengeType.N]
def is_available(self) -> bool:
return True
def _real_bulk_solve(self, requests):
for request in requests:
results = dict.fromkeys(request.input.challenges, 'example-solution')
response = JsChallengeResponse(
type=request.type,
output=NChallengeOutput(results=results))
yield JsChallengeProviderResponse(request=request, response=response)
PLAYER_URL = 'https://example.com/player.js'
class TestJsChallengeProvider:
# note: some test covered in TestPoTokenProvider which shares the same base class
def test_base_type(self):
assert issubclass(JsChallengeProvider, IEContentProvider)
def test_create_provider_missing_bulk_solve_method(self, ie, logger):
class MissingMethodsJCP(JsChallengeProvider):
def is_available(self) -> bool:
return True
with pytest.raises(TypeError, match='bulk_solve'):
MissingMethodsJCP(ie=ie, logger=logger, settings={})
def test_create_provider_missing_available_method(self, ie, logger):
class MissingMethodsJCP(JsChallengeProvider):
def _real_bulk_solve(self, requests):
raise JsChallengeProviderRejectedRequest('Not implemented')
with pytest.raises(TypeError, match='is_available'):
MissingMethodsJCP(ie=ie, logger=logger, settings={})
def test_barebones_provider(self, ie, logger):
class BarebonesProviderJCP(JsChallengeProvider):
def is_available(self) -> bool:
return True
def _real_bulk_solve(self, requests):
raise JsChallengeProviderRejectedRequest('Not implemented')
provider = BarebonesProviderJCP(ie=ie, logger=logger, settings={})
assert provider.PROVIDER_NAME == 'BarebonesProvider'
assert provider.PROVIDER_KEY == 'BarebonesProvider'
assert provider.PROVIDER_VERSION == '0.0.0'
assert provider.BUG_REPORT_MESSAGE == 'please report this issue to the provider developer at (developer has not provided a bug report location) .'
def test_example_provider_success(self, ie, logger):
provider = ExampleJCP(ie=ie, logger=logger, settings={})
request = JsChallengeRequest(
type=JsChallengeType.N,
input=NChallengeInput(player_url=PLAYER_URL, challenges=['example-challenge']))
request_two = JsChallengeRequest(
type=JsChallengeType.N,
input=NChallengeInput(player_url=PLAYER_URL, challenges=['example-challenge-2']))
responses = list(provider.bulk_solve([request, request_two]))
assert len(responses) == 2
assert all(isinstance(r, JsChallengeProviderResponse) for r in responses)
assert responses == [
JsChallengeProviderResponse(
request=request,
response=JsChallengeResponse(
type=JsChallengeType.N,
output=NChallengeOutput(results={'example-challenge': 'example-solution'}),
),
),
JsChallengeProviderResponse(
request=request_two,
response=JsChallengeResponse(
type=JsChallengeType.N,
output=NChallengeOutput(results={'example-challenge-2': 'example-solution'}),
),
),
]
def test_provider_unsupported_challenge_type(self, ie, logger):
provider = ExampleJCP(ie=ie, logger=logger, settings={})
request_supported = JsChallengeRequest(
type=JsChallengeType.N,
input=NChallengeInput(player_url=PLAYER_URL, challenges=['example-challenge']))
request_unsupported = JsChallengeRequest(
type=JsChallengeType.SIG,
input=NChallengeInput(player_url=PLAYER_URL, challenges=['example-challenge']))
responses = list(provider.bulk_solve([request_supported, request_unsupported, request_supported]))
assert len(responses) == 3
# Requests are validated first before continuing to _real_bulk_solve
assert isinstance(responses[0], JsChallengeProviderResponse)
assert isinstance(responses[0].error, JsChallengeProviderRejectedRequest)
assert responses[0].request is request_unsupported
assert str(responses[0].error) == 'JS Challenge type "JsChallengeType.SIG" is not supported by example-provider'
assert responses[1:] == [
JsChallengeProviderResponse(
request=request_supported,
response=JsChallengeResponse(
type=JsChallengeType.N,
output=NChallengeOutput(results={'example-challenge': 'example-solution'}),
),
),
JsChallengeProviderResponse(
request=request_supported,
response=JsChallengeResponse(
type=JsChallengeType.N,
output=NChallengeOutput(results={'example-challenge': 'example-solution'}),
),
),
]
def test_provider_get_player(self, ie, logger):
ie._load_player = lambda video_id, player_url, fatal: (video_id, player_url, fatal)
provider = ExampleJCP(ie=ie, logger=logger, settings={})
assert provider._get_player('video123', PLAYER_URL) == ('video123', PLAYER_URL, True)
def test_provider_get_player_error(self, ie, logger):
def raise_error(video_id, player_url, fatal):
raise ExtractorError('Failed to load player')
ie._load_player = raise_error
provider = ExampleJCP(ie=ie, logger=logger, settings={})
with pytest.raises(JsChallengeProviderError, match='Failed to load player for JS challenge'):
provider._get_player('video123', PLAYER_URL)
def test_require_class_end_with_suffix(self, ie, logger):
class InvalidSuffix(JsChallengeProvider):
PROVIDER_NAME = 'invalid-suffix'
def _real_bulk_solve(self, requests):
raise JsChallengeProviderRejectedRequest('Not implemented')
def is_available(self) -> bool:
return True
provider = InvalidSuffix(ie=ie, logger=logger, settings={})
with pytest.raises(AssertionError):
provider.PROVIDER_KEY # noqa: B018
def test_register_provider(ie):
@register_provider
class UnavailableProviderJCP(JsChallengeProvider):
def is_available(self) -> bool:
return False
def _real_bulk_solve(self, requests):
raise JsChallengeProviderRejectedRequest('Not implemented')
assert _jsc_providers.value.get('UnavailableProvider') == UnavailableProviderJCP
_jsc_providers.value.pop('UnavailableProvider')
def test_register_preference(ie):
before = len(_jsc_preferences.value)
@register_preference(ExampleJCP)
def unavailable_preference(*args, **kwargs):
return 1
assert len(_jsc_preferences.value) == before + 1