mirror of
				https://github.com/yt-dlp/yt-dlp.git
				synced 2025-10-26 20:21:05 +00:00 
			
		
		
		
	 2685654a37
			
		
	
	
		2685654a37
		
			
		
	
	
	
	
		
			
			https://github.com/yt-dlp/yt-dlp/tree/master/yt_dlp/extractor/youtube/pot/README.md Authored by: coletdjnz
		
			
				
	
	
		
			1530 lines
		
	
	
		
			66 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1530 lines
		
	
	
		
			66 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import annotations
 | |
| import abc
 | |
| import base64
 | |
| import dataclasses
 | |
| import hashlib
 | |
| import json
 | |
| import time
 | |
| import pytest
 | |
| 
 | |
| from yt_dlp.extractor.youtube.pot._provider import BuiltinIEContentProvider, IEContentProvider
 | |
| 
 | |
| from yt_dlp.extractor.youtube.pot.provider import (
 | |
|     PoTokenRequest,
 | |
|     PoTokenContext,
 | |
|     PoTokenProviderError,
 | |
|     PoTokenProviderRejectedRequest,
 | |
| )
 | |
| from yt_dlp.extractor.youtube.pot._director import (
 | |
|     PoTokenCache,
 | |
|     validate_cache_spec,
 | |
|     clean_pot,
 | |
|     validate_response,
 | |
|     PoTokenRequestDirector,
 | |
|     provider_display_list,
 | |
| )
 | |
| 
 | |
| from yt_dlp.extractor.youtube.pot.cache import (
 | |
|     PoTokenCacheSpec,
 | |
|     PoTokenCacheSpecProvider,
 | |
|     PoTokenCacheProvider,
 | |
|     CacheProviderWritePolicy,
 | |
|     PoTokenCacheProviderError,
 | |
| )
 | |
| 
 | |
| 
 | |
| from yt_dlp.extractor.youtube.pot.provider import (
 | |
|     PoTokenResponse,
 | |
|     PoTokenProvider,
 | |
| )
 | |
| 
 | |
| 
 | |
| class BaseMockPoTokenProvider(PoTokenProvider, abc.ABC):
 | |
|     def __init__(self, *args, **kwargs):
 | |
|         super().__init__(*args, **kwargs)
 | |
|         self.available_called_times = 0
 | |
|         self.request_called_times = 0
 | |
|         self.close_called = False
 | |
| 
 | |
|     def is_available(self) -> bool:
 | |
|         self.available_called_times += 1
 | |
|         return True
 | |
| 
 | |
|     def request_pot(self, *args, **kwargs):
 | |
|         self.request_called_times += 1
 | |
|         return super().request_pot(*args, **kwargs)
 | |
| 
 | |
|     def close(self):
 | |
|         self.close_called = True
 | |
|         super().close()
 | |
| 
 | |
| 
 | |
| class ExamplePTP(BaseMockPoTokenProvider):
 | |
|     PROVIDER_NAME = 'example'
 | |
|     PROVIDER_VERSION = '0.0.1'
 | |
|     BUG_REPORT_LOCATION = 'https://example.com/issues'
 | |
| 
 | |
|     _SUPPORTED_CLIENTS = ('WEB',)
 | |
|     _SUPPORTED_CONTEXTS = (PoTokenContext.GVS, )
 | |
| 
 | |
|     def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
 | |
|         if request.data_sync_id == 'example':
 | |
|             return PoTokenResponse(request.video_id)
 | |
|         return PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
| 
 | |
| 
 | |
| def success_ptp(response: PoTokenResponse | None = None, key: str | None = None):
 | |
|     class SuccessPTP(BaseMockPoTokenProvider):
 | |
|         PROVIDER_NAME = 'success'
 | |
|         PROVIDER_VERSION = '0.0.1'
 | |
|         BUG_REPORT_LOCATION = 'https://success.example.com/issues'
 | |
| 
 | |
|         _SUPPORTED_CLIENTS = ('WEB',)
 | |
|         _SUPPORTED_CONTEXTS = (PoTokenContext.GVS,)
 | |
| 
 | |
|         def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
 | |
|             return response or PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
| 
 | |
|     if key:
 | |
|         SuccessPTP.PROVIDER_KEY = key
 | |
|     return SuccessPTP
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def pot_provider(ie, logger):
 | |
|     return success_ptp()(ie=ie, logger=logger, settings={})
 | |
| 
 | |
| 
 | |
| class UnavailablePTP(BaseMockPoTokenProvider):
 | |
|     PROVIDER_NAME = 'unavailable'
 | |
|     BUG_REPORT_LOCATION = 'https://unavailable.example.com/issues'
 | |
|     _SUPPORTED_CLIENTS = None
 | |
|     _SUPPORTED_CONTEXTS = None
 | |
| 
 | |
|     def is_available(self) -> bool:
 | |
|         super().is_available()
 | |
|         return False
 | |
| 
 | |
|     def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
 | |
|         raise PoTokenProviderError('something went wrong')
 | |
| 
 | |
| 
 | |
| class UnsupportedPTP(BaseMockPoTokenProvider):
 | |
|     PROVIDER_NAME = 'unsupported'
 | |
|     BUG_REPORT_LOCATION = 'https://unsupported.example.com/issues'
 | |
|     _SUPPORTED_CLIENTS = None
 | |
|     _SUPPORTED_CONTEXTS = None
 | |
| 
 | |
|     def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
 | |
|         raise PoTokenProviderRejectedRequest('unsupported request')
 | |
| 
 | |
| 
 | |
| class ErrorPTP(BaseMockPoTokenProvider):
 | |
|     PROVIDER_NAME = 'error'
 | |
|     BUG_REPORT_LOCATION = 'https://error.example.com/issues'
 | |
|     _SUPPORTED_CLIENTS = None
 | |
|     _SUPPORTED_CONTEXTS = None
 | |
| 
 | |
|     def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
 | |
|         expected = request.video_id == 'expected'
 | |
|         raise PoTokenProviderError('an error occurred', expected=expected)
 | |
| 
 | |
| 
 | |
| class UnexpectedErrorPTP(BaseMockPoTokenProvider):
 | |
|     PROVIDER_NAME = 'unexpected_error'
 | |
|     BUG_REPORT_LOCATION = 'https://unexpected.example.com/issues'
 | |
|     _SUPPORTED_CLIENTS = None
 | |
|     _SUPPORTED_CONTEXTS = None
 | |
| 
 | |
|     def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
 | |
|         raise ValueError('an unexpected error occurred')
 | |
| 
 | |
| 
 | |
| class InvalidPTP(BaseMockPoTokenProvider):
 | |
|     PROVIDER_NAME = 'invalid'
 | |
|     BUG_REPORT_LOCATION = 'https://invalid.example.com/issues'
 | |
|     _SUPPORTED_CLIENTS = None
 | |
|     _SUPPORTED_CONTEXTS = None
 | |
| 
 | |
|     def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
 | |
|         if request.video_id == 'invalid_type':
 | |
|             return 'invalid-response'
 | |
|         else:
 | |
|             return PoTokenResponse('example-token?', expires_at='123')
 | |
| 
 | |
| 
 | |
| class BaseMockCacheSpecProvider(PoTokenCacheSpecProvider, abc.ABC):
 | |
|     def __init__(self, *args, **kwargs):
 | |
|         super().__init__(*args, **kwargs)
 | |
|         self.generate_called_times = 0
 | |
|         self.is_available_called_times = 0
 | |
|         self.close_called = False
 | |
| 
 | |
|     def is_available(self) -> bool:
 | |
|         self.is_available_called_times += 1
 | |
|         return super().is_available()
 | |
| 
 | |
|     def generate_cache_spec(self, request: PoTokenRequest):
 | |
|         self.generate_called_times += 1
 | |
| 
 | |
|     def close(self):
 | |
|         self.close_called = True
 | |
|         super().close()
 | |
| 
 | |
| 
 | |
| class ExampleCacheSpecProviderPCSP(BaseMockCacheSpecProvider):
 | |
| 
 | |
|     PROVIDER_NAME = 'example'
 | |
|     PROVIDER_VERSION = '0.0.1'
 | |
|     BUG_REPORT_LOCATION = 'https://example.com/issues'
 | |
| 
 | |
|     def generate_cache_spec(self, request: PoTokenRequest):
 | |
|         super().generate_cache_spec(request)
 | |
|         return PoTokenCacheSpec(
 | |
|             key_bindings={'v': request.video_id, 'e': None},
 | |
|             default_ttl=60,
 | |
|         )
 | |
| 
 | |
| 
 | |
| class UnavailableCacheSpecProviderPCSP(BaseMockCacheSpecProvider):
 | |
| 
 | |
|     PROVIDER_NAME = 'unavailable'
 | |
|     PROVIDER_VERSION = '0.0.1'
 | |
| 
 | |
|     def is_available(self) -> bool:
 | |
|         super().is_available()
 | |
|         return False
 | |
| 
 | |
|     def generate_cache_spec(self, request: PoTokenRequest):
 | |
|         super().generate_cache_spec(request)
 | |
|         return None
 | |
| 
 | |
| 
 | |
| class UnsupportedCacheSpecProviderPCSP(BaseMockCacheSpecProvider):
 | |
| 
 | |
|     PROVIDER_NAME = 'unsupported'
 | |
|     PROVIDER_VERSION = '0.0.1'
 | |
| 
 | |
|     def generate_cache_spec(self, request: PoTokenRequest):
 | |
|         super().generate_cache_spec(request)
 | |
|         return None
 | |
| 
 | |
| 
 | |
| class InvalidSpecCacheSpecProviderPCSP(BaseMockCacheSpecProvider):
 | |
| 
 | |
|     PROVIDER_NAME = 'invalid'
 | |
|     PROVIDER_VERSION = '0.0.1'
 | |
| 
 | |
|     def generate_cache_spec(self, request: PoTokenRequest):
 | |
|         super().generate_cache_spec(request)
 | |
|         return 'invalid-spec'
 | |
| 
 | |
| 
 | |
| class ErrorSpecCacheSpecProviderPCSP(BaseMockCacheSpecProvider):
 | |
| 
 | |
|     PROVIDER_NAME = 'invalid'
 | |
|     PROVIDER_VERSION = '0.0.1'
 | |
| 
 | |
|     def generate_cache_spec(self, request: PoTokenRequest):
 | |
|         super().generate_cache_spec(request)
 | |
|         raise ValueError('something went wrong')
 | |
| 
 | |
| 
 | |
| class BaseMockCacheProvider(PoTokenCacheProvider, abc.ABC):
 | |
|     BUG_REPORT_MESSAGE = 'example bug report message'
 | |
| 
 | |
|     def __init__(self, *args, available=True, **kwargs):
 | |
|         super().__init__(*args, **kwargs)
 | |
|         self.store_calls = 0
 | |
|         self.delete_calls = 0
 | |
|         self.get_calls = 0
 | |
|         self.available_called_times = 0
 | |
|         self.available = available
 | |
| 
 | |
|     def is_available(self) -> bool:
 | |
|         self.available_called_times += 1
 | |
|         return self.available
 | |
| 
 | |
|     def store(self, *args, **kwargs):
 | |
|         self.store_calls += 1
 | |
| 
 | |
|     def delete(self, *args, **kwargs):
 | |
|         self.delete_calls += 1
 | |
| 
 | |
|     def get(self, *args, **kwargs):
 | |
|         self.get_calls += 1
 | |
| 
 | |
|     def close(self):
 | |
|         self.close_called = True
 | |
|         super().close()
 | |
| 
 | |
| 
 | |
| class ErrorPCP(BaseMockCacheProvider):
 | |
|     PROVIDER_NAME = 'error'
 | |
| 
 | |
|     def store(self, *args, **kwargs):
 | |
|         super().store(*args, **kwargs)
 | |
|         raise PoTokenCacheProviderError('something went wrong')
 | |
| 
 | |
|     def get(self, *args, **kwargs):
 | |
|         super().get(*args, **kwargs)
 | |
|         raise PoTokenCacheProviderError('something went wrong')
 | |
| 
 | |
| 
 | |
| class UnexpectedErrorPCP(BaseMockCacheProvider):
 | |
|     PROVIDER_NAME = 'unexpected_error'
 | |
| 
 | |
|     def store(self, *args, **kwargs):
 | |
|         super().store(*args, **kwargs)
 | |
|         raise ValueError('something went wrong')
 | |
| 
 | |
|     def get(self, *args, **kwargs):
 | |
|         super().get(*args, **kwargs)
 | |
|         raise ValueError('something went wrong')
 | |
| 
 | |
| 
 | |
| class MockMemoryPCP(BaseMockCacheProvider):
 | |
|     PROVIDER_NAME = 'memory'
 | |
| 
 | |
|     def __init__(self, *args, **kwargs):
 | |
|         super().__init__(*args, **kwargs)
 | |
|         self.cache = {}
 | |
| 
 | |
|     def store(self, key, value, expires_at):
 | |
|         super().store(key, value, expires_at)
 | |
|         self.cache[key] = (value, expires_at)
 | |
| 
 | |
|     def delete(self, key):
 | |
|         super().delete(key)
 | |
|         self.cache.pop(key, None)
 | |
| 
 | |
|     def get(self, key):
 | |
|         super().get(key)
 | |
|         return self.cache.get(key, [None])[0]
 | |
| 
 | |
| 
 | |
| def create_memory_pcp(ie, logger, provider_key='memory', provider_name='memory', available=True):
 | |
|     cache = MockMemoryPCP(ie, logger, {}, available=available)
 | |
|     cache.PROVIDER_KEY = provider_key
 | |
|     cache.PROVIDER_NAME = provider_name
 | |
|     return cache
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def memorypcp(ie, logger) -> MockMemoryPCP:
 | |
|     return create_memory_pcp(ie, logger)
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def pot_cache(ie, logger):
 | |
|     class MockPoTokenCache(PoTokenCache):
 | |
|         def __init__(self, *args, **kwargs):
 | |
|             super().__init__(*args, **kwargs)
 | |
|             self.get_calls = 0
 | |
|             self.store_calls = 0
 | |
|             self.close_called = False
 | |
| 
 | |
|         def get(self, *args, **kwargs):
 | |
|             self.get_calls += 1
 | |
|             return super().get(*args, **kwargs)
 | |
| 
 | |
|         def store(self, *args, **kwargs):
 | |
|             self.store_calls += 1
 | |
|             return super().store(*args, **kwargs)
 | |
| 
 | |
|         def close(self):
 | |
|             self.close_called = True
 | |
|             super().close()
 | |
| 
 | |
|     return MockPoTokenCache(
 | |
|         cache_providers=[MockMemoryPCP(ie, logger, {})],
 | |
|         cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie, logger, settings={})],
 | |
|         logger=logger,
 | |
|     )
 | |
| 
 | |
| 
 | |
| EXAMPLE_PO_TOKEN = base64.urlsafe_b64encode(b'example-token').decode()
 | |
| 
 | |
| 
 | |
| class TestPoTokenCache:
 | |
| 
 | |
|     def test_cache_success(self, memorypcp, pot_request, ie, logger):
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[memorypcp],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
| 
 | |
|         assert cache.get(pot_request) is None
 | |
|         cache.store(pot_request, response)
 | |
| 
 | |
|         cached_response = cache.get(pot_request)
 | |
|         assert cached_response is not None
 | |
|         assert cached_response.po_token == EXAMPLE_PO_TOKEN
 | |
|         assert cached_response.expires_at is not None
 | |
| 
 | |
|         assert cache.get(dataclasses.replace(pot_request, video_id='another-video-id')) is None
 | |
| 
 | |
|     def test_unsupported_cache_spec_no_fallback(self, memorypcp, pot_request, ie, logger):
 | |
|         unsupported_provider = UnsupportedCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[memorypcp],
 | |
|             cache_spec_providers=[unsupported_provider],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
|         assert cache.get(pot_request) is None
 | |
|         assert unsupported_provider.generate_called_times == 1
 | |
|         cache.store(pot_request, response)
 | |
|         assert len(memorypcp.cache) == 0
 | |
|         assert unsupported_provider.generate_called_times == 2
 | |
|         assert cache.get(pot_request) is None
 | |
|         assert unsupported_provider.generate_called_times == 3
 | |
|         assert len(logger.messages.get('error', [])) == 0
 | |
| 
 | |
|     def test_unsupported_cache_spec_fallback(self, memorypcp, pot_request, ie, logger):
 | |
|         unsupported_provider = UnsupportedCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})
 | |
|         example_provider = ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[memorypcp],
 | |
|             cache_spec_providers=[unsupported_provider, example_provider],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
| 
 | |
|         assert cache.get(pot_request) is None
 | |
|         assert unsupported_provider.generate_called_times == 1
 | |
|         assert example_provider.generate_called_times == 1
 | |
| 
 | |
|         cache.store(pot_request, response)
 | |
|         assert unsupported_provider.generate_called_times == 2
 | |
|         assert example_provider.generate_called_times == 2
 | |
| 
 | |
|         cached_response = cache.get(pot_request)
 | |
|         assert unsupported_provider.generate_called_times == 3
 | |
|         assert example_provider.generate_called_times == 3
 | |
|         assert cached_response is not None
 | |
|         assert cached_response.po_token == EXAMPLE_PO_TOKEN
 | |
|         assert cached_response.expires_at is not None
 | |
| 
 | |
|         assert len(logger.messages.get('error', [])) == 0
 | |
| 
 | |
|     def test_invalid_cache_spec_no_fallback(self, memorypcp, pot_request, ie, logger):
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[memorypcp],
 | |
|             cache_spec_providers=[InvalidSpecCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
| 
 | |
|         assert cache.get(pot_request) is None
 | |
|         cache.store(pot_request, response)
 | |
| 
 | |
|         assert cache.get(pot_request) is None
 | |
| 
 | |
|         assert 'PoTokenCacheSpecProvider "InvalidSpecCacheSpecProvider" generate_cache_spec() returned invalid spec invalid-spec; please report this issue to the provider developer at  (developer has not provided a bug report location)  .' in logger.messages['error']
 | |
| 
 | |
|     def test_invalid_cache_spec_fallback(self, memorypcp, pot_request, ie, logger):
 | |
| 
 | |
|         invalid_provider = InvalidSpecCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})
 | |
|         example_provider = ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[memorypcp],
 | |
|             cache_spec_providers=[invalid_provider, example_provider],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
| 
 | |
|         assert cache.get(pot_request) is None
 | |
|         assert invalid_provider.generate_called_times == example_provider.generate_called_times == 1
 | |
| 
 | |
|         cache.store(pot_request, response)
 | |
|         assert invalid_provider.generate_called_times == example_provider.generate_called_times == 2
 | |
| 
 | |
|         cached_response = cache.get(pot_request)
 | |
|         assert invalid_provider.generate_called_times == example_provider.generate_called_times == 3
 | |
|         assert cached_response is not None
 | |
|         assert cached_response.po_token == EXAMPLE_PO_TOKEN
 | |
|         assert cached_response.expires_at is not None
 | |
| 
 | |
|         assert 'PoTokenCacheSpecProvider "InvalidSpecCacheSpecProvider" generate_cache_spec() returned invalid spec invalid-spec; please report this issue to the provider developer at  (developer has not provided a bug report location)  .' in logger.messages['error']
 | |
| 
 | |
|     def test_unavailable_cache_spec_no_fallback(self, memorypcp, pot_request, ie, logger):
 | |
|         unavailable_provider = UnavailableCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[memorypcp],
 | |
|             cache_spec_providers=[unavailable_provider],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
| 
 | |
|         assert cache.get(pot_request) is None
 | |
|         cache.store(pot_request, response)
 | |
|         assert cache.get(pot_request) is None
 | |
|         assert unavailable_provider.generate_called_times == 0
 | |
| 
 | |
|     def test_unavailable_cache_spec_fallback(self, memorypcp, pot_request, ie, logger):
 | |
|         unavailable_provider = UnavailableCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})
 | |
|         example_provider = ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[memorypcp],
 | |
|             cache_spec_providers=[unavailable_provider, example_provider],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
| 
 | |
|         assert cache.get(pot_request) is None
 | |
|         assert unavailable_provider.generate_called_times == 0
 | |
|         assert unavailable_provider.is_available_called_times == 1
 | |
|         assert example_provider.generate_called_times == 1
 | |
| 
 | |
|         cache.store(pot_request, response)
 | |
|         assert unavailable_provider.generate_called_times == 0
 | |
|         assert unavailable_provider.is_available_called_times == 2
 | |
|         assert example_provider.generate_called_times == 2
 | |
| 
 | |
|         cached_response = cache.get(pot_request)
 | |
|         assert unavailable_provider.generate_called_times == 0
 | |
|         assert unavailable_provider.is_available_called_times == 3
 | |
|         assert example_provider.generate_called_times == 3
 | |
|         assert example_provider.is_available_called_times == 3
 | |
|         assert cached_response is not None
 | |
|         assert cached_response.po_token == EXAMPLE_PO_TOKEN
 | |
|         assert cached_response.expires_at is not None
 | |
| 
 | |
|     def test_unexpected_error_cache_spec(self, memorypcp, pot_request, ie, logger):
 | |
|         error_provider = ErrorSpecCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[memorypcp],
 | |
|             cache_spec_providers=[error_provider],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
| 
 | |
|         assert cache.get(pot_request) is None
 | |
|         cache.store(pot_request, response)
 | |
|         assert cache.get(pot_request) is None
 | |
|         assert error_provider.generate_called_times == 3
 | |
|         assert error_provider.is_available_called_times == 3
 | |
| 
 | |
|         assert 'Error occurred with "invalid" PO Token cache spec provider: ValueError(\'something went wrong\'); please report this issue to the provider developer at  (developer has not provided a bug report location)  .' in logger.messages['error']
 | |
| 
 | |
|     def test_unexpected_error_cache_spec_fallback(self, memorypcp, pot_request, ie, logger):
 | |
|         error_provider = ErrorSpecCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})
 | |
|         example_provider = ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[memorypcp],
 | |
|             cache_spec_providers=[error_provider, example_provider],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
| 
 | |
|         assert cache.get(pot_request) is None
 | |
|         assert error_provider.generate_called_times == 1
 | |
|         assert error_provider.is_available_called_times == 1
 | |
|         assert example_provider.generate_called_times == 1
 | |
| 
 | |
|         cache.store(pot_request, response)
 | |
|         assert error_provider.generate_called_times == 2
 | |
|         assert error_provider.is_available_called_times == 2
 | |
|         assert example_provider.generate_called_times == 2
 | |
| 
 | |
|         cached_response = cache.get(pot_request)
 | |
|         assert error_provider.generate_called_times == 3
 | |
|         assert error_provider.is_available_called_times == 3
 | |
|         assert example_provider.generate_called_times == 3
 | |
|         assert example_provider.is_available_called_times == 3
 | |
|         assert cached_response is not None
 | |
|         assert cached_response.po_token == EXAMPLE_PO_TOKEN
 | |
|         assert cached_response.expires_at is not None
 | |
| 
 | |
|         assert 'Error occurred with "invalid" PO Token cache spec provider: ValueError(\'something went wrong\'); please report this issue to the provider developer at  (developer has not provided a bug report location)  .' in logger.messages['error']
 | |
| 
 | |
|     def test_key_bindings_spec_provider(self, memorypcp, pot_request, ie, logger):
 | |
| 
 | |
|         class ExampleProviderPCSP(PoTokenCacheSpecProvider):
 | |
|             PROVIDER_NAME = 'example'
 | |
| 
 | |
|             def generate_cache_spec(self, request: PoTokenRequest):
 | |
|                 return PoTokenCacheSpec(
 | |
|                     key_bindings={'v': request.video_id},
 | |
|                     default_ttl=60,
 | |
|                 )
 | |
| 
 | |
|         class ExampleProviderTwoPCSP(ExampleProviderPCSP):
 | |
|             pass
 | |
| 
 | |
|         example_provider = ExampleProviderPCSP(ie=ie, logger=logger, settings={})
 | |
|         example_provider_two = ExampleProviderTwoPCSP(ie=ie, logger=logger, settings={})
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[memorypcp],
 | |
|             cache_spec_providers=[example_provider],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         assert cache.get(pot_request) is None
 | |
|         cache.store(pot_request, response)
 | |
|         assert len(memorypcp.cache) == 1
 | |
|         assert hashlib.sha256(
 | |
|             f"{{'_dlp_cache': 'v1', '_p': 'ExampleProvider', 'v': '{pot_request.video_id}'}}".encode()).hexdigest() in memorypcp.cache
 | |
| 
 | |
|         # The second spec provider returns the exact same key bindings as the first one,
 | |
|         # however the PoTokenCache should use the provider key to differentiate between them
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[memorypcp],
 | |
|             cache_spec_providers=[example_provider_two],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         assert cache.get(pot_request) is None
 | |
|         cache.store(pot_request, response)
 | |
|         assert len(memorypcp.cache) == 2
 | |
|         assert hashlib.sha256(
 | |
|             f"{{'_dlp_cache': 'v1', '_p': 'ExampleProviderTwo', 'v': '{pot_request.video_id}'}}".encode()).hexdigest() in memorypcp.cache
 | |
| 
 | |
|     def test_cache_provider_preferences(self, pot_request, ie, logger):
 | |
|         pcp_one = create_memory_pcp(ie, logger, provider_key='memory_pcp_one')
 | |
|         pcp_two = create_memory_pcp(ie, logger, provider_key='memory_pcp_two')
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[pcp_one, pcp_two],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         cache.store(pot_request, PoTokenResponse(EXAMPLE_PO_TOKEN), write_policy=CacheProviderWritePolicy.WRITE_FIRST)
 | |
|         assert len(pcp_one.cache) == 1
 | |
|         assert len(pcp_two.cache) == 0
 | |
| 
 | |
|         assert cache.get(pot_request)
 | |
|         assert pcp_one.get_calls == 1
 | |
|         assert pcp_two.get_calls == 0
 | |
| 
 | |
|         standard_preference_called = False
 | |
|         pcp_one_preference_claled = False
 | |
| 
 | |
|         def standard_preference(provider, request, *_, **__):
 | |
|             nonlocal standard_preference_called
 | |
|             standard_preference_called = True
 | |
|             assert isinstance(provider, PoTokenCacheProvider)
 | |
|             assert isinstance(request, PoTokenRequest)
 | |
|             return 1
 | |
| 
 | |
|         def pcp_one_preference(provider, request, *_, **__):
 | |
|             nonlocal pcp_one_preference_claled
 | |
|             pcp_one_preference_claled = True
 | |
|             assert isinstance(provider, PoTokenCacheProvider)
 | |
|             assert isinstance(request, PoTokenRequest)
 | |
|             if provider.PROVIDER_KEY == pcp_one.PROVIDER_KEY:
 | |
|                 return -100
 | |
|             return 0
 | |
| 
 | |
|         # test that it can hanldle multiple preferences
 | |
|         cache.cache_provider_preferences.append(standard_preference)
 | |
|         cache.cache_provider_preferences.append(pcp_one_preference)
 | |
| 
 | |
|         cache.store(pot_request, PoTokenResponse(EXAMPLE_PO_TOKEN), write_policy=CacheProviderWritePolicy.WRITE_FIRST)
 | |
|         assert cache.get(pot_request)
 | |
|         assert len(pcp_one.cache) == len(pcp_two.cache) == 1
 | |
|         assert pcp_two.get_calls == pcp_one.get_calls == 1
 | |
|         assert pcp_one.store_calls == pcp_two.store_calls == 1
 | |
|         assert standard_preference_called
 | |
|         assert pcp_one_preference_claled
 | |
| 
 | |
|     def test_secondary_cache_provider_hit(self, pot_request, ie, logger):
 | |
|         pcp_one = create_memory_pcp(ie, logger, provider_key='memory_pcp_one')
 | |
|         pcp_two = create_memory_pcp(ie, logger, provider_key='memory_pcp_two')
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[pcp_two],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         # Given the lower priority provider has the cache hit, store the response in the higher priority provider
 | |
|         cache.store(pot_request, PoTokenResponse(EXAMPLE_PO_TOKEN))
 | |
|         assert cache.get(pot_request)
 | |
| 
 | |
|         cache.cache_providers[pcp_one.PROVIDER_KEY] = pcp_one
 | |
| 
 | |
|         def pcp_one_pref(provider, *_, **__):
 | |
|             if provider.PROVIDER_KEY == pcp_one.PROVIDER_KEY:
 | |
|                 return 1
 | |
|             return -1
 | |
| 
 | |
|         cache.cache_provider_preferences.append(pcp_one_pref)
 | |
| 
 | |
|         assert cache.get(pot_request)
 | |
|         assert pcp_one.get_calls == 1
 | |
|         assert pcp_two.get_calls == 2
 | |
|         # Should write back to pcp_one (now the highest priority cache provider)
 | |
|         assert pcp_one.store_calls == pcp_two.store_calls == 1
 | |
|         assert 'Writing PO Token response to highest priority cache provider' in logger.messages['trace']
 | |
| 
 | |
|     def test_cache_provider_no_hits(self, pot_request, ie, logger):
 | |
|         pcp_one = create_memory_pcp(ie, logger, provider_key='memory_pcp_one')
 | |
|         pcp_two = create_memory_pcp(ie, logger, provider_key='memory_pcp_two')
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[pcp_one, pcp_two],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         assert cache.get(pot_request) is None
 | |
|         assert pcp_one.get_calls == pcp_two.get_calls == 1
 | |
| 
 | |
|     def test_get_invalid_po_token_response(self, pot_request, ie, logger):
 | |
|         # Test various scenarios where the po token response stored in the cache provider is invalid
 | |
|         pcp_one = create_memory_pcp(ie, logger, provider_key='memory_pcp_one')
 | |
|         pcp_two = create_memory_pcp(ie, logger, provider_key='memory_pcp_two')
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[pcp_one, pcp_two],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         valid_response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
|         cache.store(pot_request, valid_response)
 | |
|         assert len(pcp_one.cache) == len(pcp_two.cache) == 1
 | |
|         # Overwrite the valid response with an invalid one in the cache
 | |
|         pcp_one.store(next(iter(pcp_one.cache.keys())), json.dumps(dataclasses.asdict(PoTokenResponse(None))), int(time.time() + 1000))
 | |
|         assert cache.get(pot_request).po_token == valid_response.po_token
 | |
|         assert pcp_one.get_calls == pcp_two.get_calls == 1
 | |
|         assert pcp_one.delete_calls == 1  # Invalid response should be deleted from cache
 | |
|         assert pcp_one.store_calls == 3  # Since response was fetched from second cache provider, it should be stored in the first one
 | |
|         assert len(pcp_one.cache) == 1
 | |
|         assert 'Invalid PO Token response retrieved from cache provider "memory": {"po_token": null, "expires_at": null}; example bug report message' in logger.messages['error']
 | |
| 
 | |
|         # Overwrite the valid response with an invalid json in the cache
 | |
|         pcp_one.store(next(iter(pcp_one.cache.keys())), 'invalid-json', int(time.time() + 1000))
 | |
|         assert cache.get(pot_request).po_token == valid_response.po_token
 | |
|         assert pcp_one.get_calls == pcp_two.get_calls == 2
 | |
|         assert pcp_one.delete_calls == 2
 | |
|         assert pcp_one.store_calls == 5  # 3 + 1 store we made in the test + 1 store from lower priority cache provider
 | |
|         assert len(pcp_one.cache) == 1
 | |
| 
 | |
|         assert 'Invalid PO Token response retrieved from cache provider "memory": invalid-json; example bug report message' in logger.messages['error']
 | |
| 
 | |
|         # Valid json, but missing required fields
 | |
|         pcp_one.store(next(iter(pcp_one.cache.keys())), '{"unknown_param": 0}', int(time.time() + 1000))
 | |
|         assert cache.get(pot_request).po_token == valid_response.po_token
 | |
|         assert pcp_one.get_calls == pcp_two.get_calls == 3
 | |
|         assert pcp_one.delete_calls == 3
 | |
|         assert pcp_one.store_calls == 7  # 5 + 1 store from test + 1 store from lower priority cache provider
 | |
|         assert len(pcp_one.cache) == 1
 | |
| 
 | |
|         assert 'Invalid PO Token response retrieved from cache provider "memory": {"unknown_param": 0}; example bug report message' in logger.messages['error']
 | |
| 
 | |
|     def test_store_invalid_po_token_response(self, pot_request, ie, logger):
 | |
|         # Should not store an invalid po token response
 | |
|         pcp_one = create_memory_pcp(ie, logger, provider_key='memory_pcp_one')
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[pcp_one],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         cache.store(pot_request, PoTokenResponse(po_token=EXAMPLE_PO_TOKEN, expires_at=80))
 | |
|         assert cache.get(pot_request) is None
 | |
|         assert pcp_one.store_calls == 0
 | |
|         assert 'Invalid PO Token response provided to PoTokenCache.store()' in logger.messages['error'][0]
 | |
| 
 | |
|     def test_store_write_policy(self, pot_request, ie, logger):
 | |
|         pcp_one = create_memory_pcp(ie, logger, provider_key='memory_pcp_one')
 | |
|         pcp_two = create_memory_pcp(ie, logger, provider_key='memory_pcp_two')
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[pcp_one, pcp_two],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         cache.store(pot_request, PoTokenResponse(EXAMPLE_PO_TOKEN), write_policy=CacheProviderWritePolicy.WRITE_FIRST)
 | |
|         assert pcp_one.store_calls == 1
 | |
|         assert pcp_two.store_calls == 0
 | |
| 
 | |
|         cache.store(pot_request, PoTokenResponse(EXAMPLE_PO_TOKEN), write_policy=CacheProviderWritePolicy.WRITE_ALL)
 | |
|         assert pcp_one.store_calls == 2
 | |
|         assert pcp_two.store_calls == 1
 | |
| 
 | |
|     def test_store_write_first_policy_cache_spec(self, pot_request, ie, logger):
 | |
|         pcp_one = create_memory_pcp(ie, logger, provider_key='memory_pcp_one')
 | |
|         pcp_two = create_memory_pcp(ie, logger, provider_key='memory_pcp_two')
 | |
| 
 | |
|         class WriteFirstPCSP(BaseMockCacheSpecProvider):
 | |
|             def generate_cache_spec(self, request: PoTokenRequest):
 | |
|                 super().generate_cache_spec(request)
 | |
|                 return PoTokenCacheSpec(
 | |
|                     key_bindings={'v': request.video_id, 'e': None},
 | |
|                     default_ttl=60,
 | |
|                     write_policy=CacheProviderWritePolicy.WRITE_FIRST,
 | |
|                 )
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[pcp_one, pcp_two],
 | |
|             cache_spec_providers=[WriteFirstPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         cache.store(pot_request, PoTokenResponse(EXAMPLE_PO_TOKEN))
 | |
|         assert pcp_one.store_calls == 1
 | |
|         assert pcp_two.store_calls == 0
 | |
| 
 | |
|     def test_store_write_all_policy_cache_spec(self, pot_request, ie, logger):
 | |
|         pcp_one = create_memory_pcp(ie, logger, provider_key='memory_pcp_one')
 | |
|         pcp_two = create_memory_pcp(ie, logger, provider_key='memory_pcp_two')
 | |
| 
 | |
|         class WriteAllPCSP(BaseMockCacheSpecProvider):
 | |
|             def generate_cache_spec(self, request: PoTokenRequest):
 | |
|                 super().generate_cache_spec(request)
 | |
|                 return PoTokenCacheSpec(
 | |
|                     key_bindings={'v': request.video_id, 'e': None},
 | |
|                     default_ttl=60,
 | |
|                     write_policy=CacheProviderWritePolicy.WRITE_ALL,
 | |
|                 )
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[pcp_one, pcp_two],
 | |
|             cache_spec_providers=[WriteAllPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         cache.store(pot_request, PoTokenResponse(EXAMPLE_PO_TOKEN))
 | |
|         assert pcp_one.store_calls == 1
 | |
|         assert pcp_two.store_calls == 1
 | |
| 
 | |
|     def test_expires_at_pot_response(self, pot_request, memorypcp, ie, logger):
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[memorypcp],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN, expires_at=10000000000)
 | |
|         cache.store(pot_request, response)
 | |
|         assert next(iter(memorypcp.cache.values()))[1] == 10000000000
 | |
| 
 | |
|     def test_expires_at_default_spec(self, pot_request, memorypcp, ie, logger):
 | |
| 
 | |
|         class TtlPCSP(BaseMockCacheSpecProvider):
 | |
|             def generate_cache_spec(self, request: PoTokenRequest):
 | |
|                 super().generate_cache_spec(request)
 | |
|                 return PoTokenCacheSpec(
 | |
|                     key_bindings={'v': request.video_id, 'e': None},
 | |
|                     default_ttl=10000000000,
 | |
|                 )
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[memorypcp],
 | |
|             cache_spec_providers=[TtlPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
|         cache.store(pot_request, response)
 | |
|         assert next(iter(memorypcp.cache.values()))[1] >= 10000000000
 | |
| 
 | |
|     def test_cache_provider_error_no_fallback(self, pot_request, ie, logger):
 | |
|         error_pcp = ErrorPCP(ie, logger, {})
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[error_pcp],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
|         cache.store(pot_request, response)
 | |
|         assert cache.get(pot_request) is None
 | |
|         assert error_pcp.get_calls == 1
 | |
|         assert error_pcp.store_calls == 1
 | |
| 
 | |
|         assert logger.messages['warning'].count("Error from \"error\" PO Token cache provider: PoTokenCacheProviderError('something went wrong'); example bug report message") == 2
 | |
| 
 | |
|     def test_cache_provider_error_fallback(self, pot_request, ie, logger):
 | |
|         error_pcp = ErrorPCP(ie, logger, {})
 | |
|         memory_pcp = create_memory_pcp(ie, logger, provider_key='memory')
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[error_pcp, memory_pcp],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
|         cache.store(pot_request, response)
 | |
| 
 | |
|         # 1. Store fails for error_pcp, stored in memory_pcp
 | |
|         # 2. Get fails for error_pcp, fetched from memory_pcp
 | |
|         # 3. Since fetched from lower priority, it should be stored in the highest priority cache provider
 | |
|         # 4. Store fails in error_pcp. Since write policy is WRITE_FIRST, it should not try to store in memory_pcp regardless of if the store in error_pcp fails
 | |
| 
 | |
|         assert cache.get(pot_request)
 | |
|         assert error_pcp.get_calls == 1
 | |
|         assert error_pcp.store_calls == 2  # since highest priority, when fetched from lower priority, it should be stored in the highest priority cache provider
 | |
|         assert memory_pcp.get_calls == 1
 | |
|         assert memory_pcp.store_calls == 1
 | |
| 
 | |
|         assert logger.messages['warning'].count("Error from \"error\" PO Token cache provider: PoTokenCacheProviderError('something went wrong'); example bug report message") == 3
 | |
| 
 | |
|     def test_cache_provider_unexpected_error_no_fallback(self, pot_request, ie, logger):
 | |
|         error_pcp = UnexpectedErrorPCP(ie, logger, {})
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[error_pcp],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
|         cache.store(pot_request, response)
 | |
|         assert cache.get(pot_request) is None
 | |
|         assert error_pcp.get_calls == 1
 | |
|         assert error_pcp.store_calls == 1
 | |
| 
 | |
|         assert logger.messages['error'].count("Error occurred with \"unexpected_error\" PO Token cache provider: ValueError('something went wrong'); example bug report message") == 2
 | |
| 
 | |
|     def test_cache_provider_unexpected_error_fallback(self, pot_request, ie, logger):
 | |
|         error_pcp = UnexpectedErrorPCP(ie, logger, {})
 | |
|         memory_pcp = create_memory_pcp(ie, logger, provider_key='memory')
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[error_pcp, memory_pcp],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
|         cache.store(pot_request, response)
 | |
| 
 | |
|         # 1. Store fails for error_pcp, stored in memory_pcp
 | |
|         # 2. Get fails for error_pcp, fetched from memory_pcp
 | |
|         # 3. Since fetched from lower priority, it should be stored in the highest priority cache provider
 | |
|         # 4. Store fails in error_pcp. Since write policy is WRITE_FIRST, it should not try to store in memory_pcp regardless of if the store in error_pcp fails
 | |
| 
 | |
|         assert cache.get(pot_request)
 | |
|         assert error_pcp.get_calls == 1
 | |
|         assert error_pcp.store_calls == 2  # since highest priority, when fetched from lower priority, it should be stored in the highest priority cache provider
 | |
|         assert memory_pcp.get_calls == 1
 | |
|         assert memory_pcp.store_calls == 1
 | |
| 
 | |
|         assert logger.messages['error'].count("Error occurred with \"unexpected_error\" PO Token cache provider: ValueError('something went wrong'); example bug report message") == 3
 | |
| 
 | |
|     def test_cache_provider_unavailable_no_fallback(self, pot_request, ie, logger):
 | |
|         provider = create_memory_pcp(ie, logger, available=False)
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[provider],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
|         cache.store(pot_request, response)
 | |
|         assert cache.get(pot_request) is None
 | |
|         assert provider.get_calls == 0
 | |
|         assert provider.store_calls == 0
 | |
|         assert provider.available_called_times
 | |
| 
 | |
|     def test_cache_provider_unavailable_fallback(self, pot_request, ie, logger):
 | |
|         provider_unavailable = create_memory_pcp(ie, logger, provider_key='unavailable', provider_name='unavailable', available=False)
 | |
|         provider_available = create_memory_pcp(ie, logger, provider_key='available', provider_name='available')
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[provider_unavailable, provider_available],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
|         cache.store(pot_request, response)
 | |
|         assert cache.get(pot_request) is not None
 | |
|         assert provider_unavailable.get_calls == 0
 | |
|         assert provider_unavailable.store_calls == 0
 | |
|         assert provider_available.get_calls == 1
 | |
|         assert provider_available.store_calls == 1
 | |
|         assert provider_unavailable.available_called_times
 | |
|         assert provider_available.available_called_times
 | |
| 
 | |
|         # should not even try to use the provider for the request
 | |
|         assert 'Attempting to fetch a PO Token response from "unavailable" provider' not in logger.messages['trace']
 | |
|         assert 'Attempting to fetch a PO Token response from "available" provider' not in logger.messages['trace']
 | |
| 
 | |
|     def test_available_not_called(self, ie, pot_request, logger):
 | |
|         # Test that the available method is not called when provider higher in the list is available
 | |
|         provider_unavailable = create_memory_pcp(
 | |
|             ie, logger, provider_key='unavailable', provider_name='unavailable', available=False)
 | |
|         provider_available = create_memory_pcp(ie, logger, provider_key='available', provider_name='available')
 | |
| 
 | |
|         logger.log_level = logger.LogLevel.INFO
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[provider_available, provider_unavailable],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
|         cache.store(pot_request, response, write_policy=CacheProviderWritePolicy.WRITE_FIRST)
 | |
|         assert cache.get(pot_request) is not None
 | |
|         assert provider_unavailable.get_calls == 0
 | |
|         assert provider_unavailable.store_calls == 0
 | |
|         assert provider_available.get_calls == 1
 | |
|         assert provider_available.store_calls == 1
 | |
|         assert provider_unavailable.available_called_times == 0
 | |
|         assert provider_available.available_called_times
 | |
|         assert 'PO Token Cache Providers: available-0.0.0 (external), unavailable-0.0.0 (external, unavailable)' not in logger.messages.get('trace', [])
 | |
| 
 | |
|     def test_available_called_trace(self, ie, pot_request, logger):
 | |
|         # But if logging level is trace should call available (as part of debug logging)
 | |
|         provider_unavailable = create_memory_pcp(
 | |
|             ie, logger, provider_key='unavailable', provider_name='unavailable', available=False)
 | |
|         provider_available = create_memory_pcp(ie, logger, provider_key='available', provider_name='available')
 | |
| 
 | |
|         logger.log_level = logger.LogLevel.TRACE
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[provider_available, provider_unavailable],
 | |
|             cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         response = PoTokenResponse(EXAMPLE_PO_TOKEN)
 | |
|         cache.store(pot_request, response, write_policy=CacheProviderWritePolicy.WRITE_FIRST)
 | |
|         assert cache.get(pot_request) is not None
 | |
|         assert provider_unavailable.get_calls == 0
 | |
|         assert provider_unavailable.store_calls == 0
 | |
|         assert provider_available.get_calls == 1
 | |
|         assert provider_available.store_calls == 1
 | |
|         assert provider_unavailable.available_called_times
 | |
|         assert provider_available.available_called_times
 | |
|         assert 'PO Token Cache Providers: available-0.0.0 (external), unavailable-0.0.0 (external, unavailable)' in logger.messages.get('trace', [])
 | |
| 
 | |
|     def test_close(self, ie, pot_request, logger):
 | |
|         # Should call close on the cache providers and cache specs
 | |
|         memory_pcp = create_memory_pcp(ie, logger, provider_key='memory')
 | |
|         memory2_pcp = create_memory_pcp(ie, logger, provider_key='memory2')
 | |
| 
 | |
|         spec1 = ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})
 | |
|         spec2 = UnavailableCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})
 | |
| 
 | |
|         cache = PoTokenCache(
 | |
|             cache_providers=[memory2_pcp, memory_pcp],
 | |
|             cache_spec_providers=[spec1, spec2],
 | |
|             logger=logger,
 | |
|         )
 | |
| 
 | |
|         cache.close()
 | |
|         assert memory_pcp.close_called
 | |
|         assert memory2_pcp.close_called
 | |
|         assert spec1.close_called
 | |
|         assert spec2.close_called
 | |
| 
 | |
| 
 | |
| class TestPoTokenRequestDirector:
 | |
| 
 | |
|     def test_request_pot_success(self, ie, pot_request, pot_cache, pot_provider, logger):
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         director.register_provider(pot_provider)
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
| 
 | |
|     def test_request_and_cache(self, ie, pot_request, pot_cache, pot_provider, logger):
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         director.register_provider(pot_provider)
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert pot_provider.request_called_times == 1
 | |
|         assert pot_cache.get_calls == 1
 | |
|         assert pot_cache.store_calls == 1
 | |
| 
 | |
|         # Second request, should be cached
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert pot_cache.get_calls == 2
 | |
|         assert pot_cache.store_calls == 1
 | |
|         assert pot_provider.request_called_times == 1
 | |
| 
 | |
|     def test_bypass_cache(self, ie, pot_request, pot_cache, logger, pot_provider):
 | |
|         pot_request.bypass_cache = True
 | |
| 
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         director.register_provider(pot_provider)
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert pot_provider.request_called_times == 1
 | |
|         assert pot_cache.get_calls == 0
 | |
|         assert pot_cache.store_calls == 1
 | |
| 
 | |
|         # Second request, should not get from cache
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert pot_provider.request_called_times == 2
 | |
|         assert pot_cache.get_calls == 0
 | |
|         assert pot_cache.store_calls == 2
 | |
| 
 | |
|         # POT is still cached, should get from cache
 | |
|         pot_request.bypass_cache = False
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert pot_provider.request_called_times == 2
 | |
|         assert pot_cache.get_calls == 1
 | |
|         assert pot_cache.store_calls == 2
 | |
| 
 | |
|     def test_clean_pot_generate(self, ie, pot_request, pot_cache, logger):
 | |
|         # Token should be cleaned before returning
 | |
|         base_token = base64.urlsafe_b64encode(b'token').decode()
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = success_ptp(PoTokenResponse(base_token + '?extra=params'))(ie, logger, settings={})
 | |
|         director.register_provider(provider)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == base_token
 | |
|         assert provider.request_called_times == 1
 | |
| 
 | |
|         # Confirm the cleaned version was stored in the cache
 | |
|         cached_token = pot_cache.get(pot_request)
 | |
|         assert cached_token.po_token == base_token
 | |
| 
 | |
|     def test_clean_pot_cache(self, ie, pot_request, pot_cache, logger, pot_provider):
 | |
|         # Token retrieved from cache should be cleaned before returning
 | |
|         base_token = base64.urlsafe_b64encode(b'token').decode()
 | |
|         pot_cache.store(pot_request, PoTokenResponse(base_token + '?extra=params'))
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         director.register_provider(pot_provider)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == base_token
 | |
|         assert pot_cache.get_calls == 1
 | |
|         assert pot_provider.request_called_times == 0
 | |
| 
 | |
|     def test_cache_expires_at_none(self, ie, pot_request, pot_cache, logger, pot_provider):
 | |
|         # Should cache if expires_at=None in the response
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = success_ptp(PoTokenResponse(EXAMPLE_PO_TOKEN, expires_at=None))(ie, logger, settings={})
 | |
|         director.register_provider(provider)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert pot_cache.store_calls == 1
 | |
|         assert pot_cache.get(pot_request).po_token == EXAMPLE_PO_TOKEN
 | |
| 
 | |
|     def test_cache_expires_at_positive(self, ie, pot_request, pot_cache, logger, pot_provider):
 | |
|         # Should cache if expires_at is a positive number in the response
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = success_ptp(PoTokenResponse(EXAMPLE_PO_TOKEN, expires_at=99999999999))(ie, logger, settings={})
 | |
|         director.register_provider(provider)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert pot_cache.store_calls == 1
 | |
|         assert pot_cache.get(pot_request).po_token == EXAMPLE_PO_TOKEN
 | |
| 
 | |
|     @pytest.mark.parametrize('expires_at', [0, -1])
 | |
|     def test_not_cache_expires_at(self, ie, pot_request, pot_cache, logger, pot_provider, expires_at):
 | |
|         # Should not cache if expires_at <= 0 in the response
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = success_ptp(PoTokenResponse(EXAMPLE_PO_TOKEN, expires_at=expires_at))(ie, logger, settings={})
 | |
|         director.register_provider(provider)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert pot_cache.store_calls == 0
 | |
|         assert pot_cache.get(pot_request) is None
 | |
| 
 | |
|     def test_no_providers(self, ie, pot_request, pot_cache, logger):
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response is None
 | |
| 
 | |
|     def test_try_cache_no_providers(self, ie, pot_request, pot_cache, logger):
 | |
|         # Should still try the cache even if no providers are configured
 | |
|         pot_cache.store(pot_request, PoTokenResponse(EXAMPLE_PO_TOKEN))
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
| 
 | |
|     def test_close(self, ie, pot_request, pot_cache, pot_provider, logger):
 | |
|         # Should call close on the pot cache and any providers
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
| 
 | |
|         provider2 = UnavailablePTP(ie, logger, {})
 | |
|         director.register_provider(pot_provider)
 | |
|         director.register_provider(provider2)
 | |
| 
 | |
|         director.close()
 | |
|         assert pot_provider.close_called
 | |
|         assert provider2.close_called
 | |
|         assert pot_cache.close_called
 | |
| 
 | |
|     def test_pot_provider_preferences(self, pot_request, pot_cache, ie, logger):
 | |
|         pot_request.bypass_cache = True
 | |
|         provider_two_pot = base64.urlsafe_b64encode(b'token2').decode()
 | |
| 
 | |
|         example_provider = success_ptp(response=PoTokenResponse(EXAMPLE_PO_TOKEN), key='exampleone')(ie, logger, settings={})
 | |
|         example_provider_two = success_ptp(response=PoTokenResponse(provider_two_pot), key='exampletwo')(ie, logger, settings={})
 | |
| 
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         director.register_provider(example_provider)
 | |
|         director.register_provider(example_provider_two)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert example_provider.request_called_times == 1
 | |
|         assert example_provider_two.request_called_times == 0
 | |
| 
 | |
|         standard_preference_called = False
 | |
|         example_preference_called = False
 | |
| 
 | |
|         # Test that the provider preferences are respected
 | |
|         def standard_preference(provider, request, *_, **__):
 | |
|             nonlocal standard_preference_called
 | |
|             standard_preference_called = True
 | |
|             assert isinstance(provider, PoTokenProvider)
 | |
|             assert isinstance(request, PoTokenRequest)
 | |
|             return 1
 | |
| 
 | |
|         def example_preference(provider, request, *_, **__):
 | |
|             nonlocal example_preference_called
 | |
|             example_preference_called = True
 | |
|             assert isinstance(provider, PoTokenProvider)
 | |
|             assert isinstance(request, PoTokenRequest)
 | |
|             if provider.PROVIDER_KEY == example_provider.PROVIDER_KEY:
 | |
|                 return -100
 | |
|             return 0
 | |
| 
 | |
|         # test that it can handle multiple preferences
 | |
|         director.register_preference(example_preference)
 | |
|         director.register_preference(standard_preference)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == provider_two_pot
 | |
|         assert example_provider.request_called_times == 1
 | |
|         assert example_provider_two.request_called_times == 1
 | |
|         assert standard_preference_called
 | |
|         assert example_preference_called
 | |
| 
 | |
|     def test_unsupported_request_no_fallback(self, ie, logger, pot_cache, pot_request):
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = UnsupportedPTP(ie, logger, {})
 | |
|         director.register_provider(provider)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response is None
 | |
|         assert provider.request_called_times == 1
 | |
| 
 | |
|     def test_unsupported_request_fallback(self, ie, logger, pot_cache, pot_request, pot_provider):
 | |
|         # Should fallback to the next provider if the first one does not support the request
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = UnsupportedPTP(ie, logger, {})
 | |
|         director.register_provider(provider)
 | |
|         director.register_provider(pot_provider)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert provider.request_called_times == 1
 | |
|         assert pot_provider.request_called_times == 1
 | |
|         assert 'PO Token Provider "unsupported" rejected this request, trying next available provider. Reason: unsupported request' in logger.messages['trace']
 | |
| 
 | |
|     def test_unavailable_request_no_fallback(self, ie, logger, pot_cache, pot_request):
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = UnavailablePTP(ie, logger, {})
 | |
|         director.register_provider(provider)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response is None
 | |
|         assert provider.request_called_times == 0
 | |
|         assert provider.available_called_times
 | |
| 
 | |
|     def test_unavailable_request_fallback(self, ie, logger, pot_cache, pot_request, pot_provider):
 | |
|         # Should fallback to the next provider if the first one is unavailable
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = UnavailablePTP(ie, logger, {})
 | |
|         director.register_provider(provider)
 | |
|         director.register_provider(pot_provider)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert provider.request_called_times == 0
 | |
|         assert provider.available_called_times
 | |
|         assert pot_provider.request_called_times == 1
 | |
|         assert pot_provider.available_called_times
 | |
|         # should not even try use the provider for the request
 | |
|         assert 'Attempting to fetch a PO Token from "unavailable" provider' not in logger.messages['trace']
 | |
|         assert 'Attempting to fetch a PO Token from "success" provider' in logger.messages['trace']
 | |
| 
 | |
|     def test_available_not_called(self, ie, logger, pot_cache, pot_request, pot_provider):
 | |
|         # Test that the available method is not called when provider higher in the list is available
 | |
|         logger.log_level = logger.LogLevel.INFO
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = UnavailablePTP(ie, logger, {})
 | |
|         director.register_provider(pot_provider)
 | |
|         director.register_provider(provider)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert provider.request_called_times == 0
 | |
|         assert provider.available_called_times == 0
 | |
|         assert pot_provider.request_called_times == 1
 | |
|         assert pot_provider.available_called_times == 2
 | |
|         assert 'PO Token Providers: success-0.0.1 (external), unavailable-0.0.0 (external, unavailable)' not in logger.messages.get('trace', [])
 | |
| 
 | |
|     def test_available_called_trace(self, ie, logger, pot_cache, pot_request, pot_provider):
 | |
|         # But if logging level is trace should call available (as part of debug logging)
 | |
|         logger.log_level = logger.LogLevel.TRACE
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = UnavailablePTP(ie, logger, {})
 | |
|         director.register_provider(pot_provider)
 | |
|         director.register_provider(provider)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert provider.request_called_times == 0
 | |
|         assert provider.available_called_times == 1
 | |
|         assert pot_provider.request_called_times == 1
 | |
|         assert pot_provider.available_called_times == 3
 | |
|         assert 'PO Token Providers: success-0.0.1 (external), unavailable-0.0.0 (external, unavailable)' in logger.messages['trace']
 | |
| 
 | |
|     def test_provider_error_no_fallback_unexpected(self, ie, logger, pot_cache, pot_request):
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = ErrorPTP(ie, logger, {})
 | |
|         director.register_provider(provider)
 | |
|         pot_request.video_id = 'unexpected'
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response is None
 | |
|         assert provider.request_called_times == 1
 | |
|         assert "Error fetching PO Token from \"error\" provider: PoTokenProviderError('an error occurred'); please report this issue to the provider developer at  https://error.example.com/issues  ." in logger.messages['warning']
 | |
| 
 | |
|     def test_provider_error_no_fallback_expected(self, ie, logger, pot_cache, pot_request):
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = ErrorPTP(ie, logger, {})
 | |
|         director.register_provider(provider)
 | |
|         pot_request.video_id = 'expected'
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response is None
 | |
|         assert provider.request_called_times == 1
 | |
|         assert "Error fetching PO Token from \"error\" provider: PoTokenProviderError('an error occurred')" in logger.messages['warning']
 | |
| 
 | |
|     def test_provider_error_fallback(self, ie, logger, pot_cache, pot_request, pot_provider):
 | |
|         # Should fallback to the next provider if the first one raises an error
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = ErrorPTP(ie, logger, {})
 | |
|         director.register_provider(provider)
 | |
|         director.register_provider(pot_provider)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert provider.request_called_times == 1
 | |
|         assert pot_provider.request_called_times == 1
 | |
|         assert "Error fetching PO Token from \"error\" provider: PoTokenProviderError('an error occurred'); please report this issue to the provider developer at  https://error.example.com/issues  ." in logger.messages['warning']
 | |
| 
 | |
|     def test_provider_unexpected_error_no_fallback(self, ie, logger, pot_cache, pot_request):
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = UnexpectedErrorPTP(ie, logger, {})
 | |
|         director.register_provider(provider)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response is None
 | |
|         assert provider.request_called_times == 1
 | |
|         assert "Unexpected error when fetching PO Token from \"unexpected_error\" provider: ValueError('an unexpected error occurred'); please report this issue to the provider developer at  https://unexpected.example.com/issues  ." in logger.messages['error']
 | |
| 
 | |
|     def test_provider_unexpected_error_fallback(self, ie, logger, pot_cache, pot_request, pot_provider):
 | |
|         # Should fallback to the next provider if the first one raises an unexpected error
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = UnexpectedErrorPTP(ie, logger, {})
 | |
|         director.register_provider(provider)
 | |
|         director.register_provider(pot_provider)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert provider.request_called_times == 1
 | |
|         assert pot_provider.request_called_times == 1
 | |
|         assert "Unexpected error when fetching PO Token from \"unexpected_error\" provider: ValueError('an unexpected error occurred'); please report this issue to the provider developer at  https://unexpected.example.com/issues  ." in logger.messages['error']
 | |
| 
 | |
|     def test_invalid_po_token_response_type(self, ie, logger, pot_cache, pot_request, pot_provider):
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = InvalidPTP(ie, logger, {})
 | |
|         director.register_provider(provider)
 | |
| 
 | |
|         pot_request.video_id = 'invalid_type'
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response is None
 | |
|         assert provider.request_called_times == 1
 | |
|         assert 'Invalid PO Token response received from "invalid" provider: invalid-response; please report this issue to the provider developer at  https://invalid.example.com/issues  .' in logger.messages['error']
 | |
| 
 | |
|         # Should fallback to next available provider
 | |
|         director.register_provider(pot_provider)
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert provider.request_called_times == 2
 | |
|         assert pot_provider.request_called_times == 1
 | |
| 
 | |
|     def test_invalid_po_token_response(self, ie, logger, pot_cache, pot_request, pot_provider):
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
|         provider = InvalidPTP(ie, logger, {})
 | |
|         director.register_provider(provider)
 | |
| 
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response is None
 | |
|         assert provider.request_called_times == 1
 | |
|         assert "Invalid PO Token response received from \"invalid\" provider: PoTokenResponse(po_token='example-token?', expires_at='123'); please report this issue to the provider developer at  https://invalid.example.com/issues  ." in logger.messages['error']
 | |
| 
 | |
|         # Should fallback to next available provider
 | |
|         director.register_provider(pot_provider)
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == EXAMPLE_PO_TOKEN
 | |
|         assert provider.request_called_times == 2
 | |
|         assert pot_provider.request_called_times == 1
 | |
| 
 | |
|     def test_copy_request_provider(self, ie, logger, pot_cache, pot_request):
 | |
| 
 | |
|         class BadProviderPTP(BaseMockPoTokenProvider):
 | |
|             _SUPPORTED_CONTEXTS = None
 | |
|             _SUPPORTED_CLIENTS = None
 | |
| 
 | |
|             def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
 | |
|                 # Providers should not modify the request object, but we should guard against it
 | |
|                 request.video_id = 'bad'
 | |
|                 raise PoTokenProviderRejectedRequest('bad request')
 | |
| 
 | |
|         class GoodProviderPTP(BaseMockPoTokenProvider):
 | |
|             _SUPPORTED_CONTEXTS = None
 | |
|             _SUPPORTED_CLIENTS = None
 | |
| 
 | |
|             def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
 | |
|                 return PoTokenResponse(base64.urlsafe_b64encode(request.video_id.encode()).decode())
 | |
| 
 | |
|         director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
 | |
| 
 | |
|         bad_provider = BadProviderPTP(ie, logger, {})
 | |
|         good_provider = GoodProviderPTP(ie, logger, {})
 | |
| 
 | |
|         director.register_provider(bad_provider)
 | |
|         director.register_provider(good_provider)
 | |
| 
 | |
|         pot_request.video_id = 'good'
 | |
|         response = director.get_po_token(pot_request)
 | |
|         assert response == base64.urlsafe_b64encode(b'good').decode()
 | |
|         assert bad_provider.request_called_times == 1
 | |
|         assert good_provider.request_called_times == 1
 | |
|         assert pot_request.video_id == 'good'
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('spec, expected', [
 | |
|     (None, False),
 | |
|     (PoTokenCacheSpec(key_bindings={'v': 'video-id'}, default_ttl=60, write_policy=None), False),  # type: ignore
 | |
|     (PoTokenCacheSpec(key_bindings={'v': 'video-id'}, default_ttl='invalid'), False),  # type: ignore
 | |
|     (PoTokenCacheSpec(key_bindings='invalid', default_ttl=60), False),  # type: ignore
 | |
|     (PoTokenCacheSpec(key_bindings={2: 'video-id'}, default_ttl=60), False),  # type: ignore
 | |
|     (PoTokenCacheSpec(key_bindings={'v': 2}, default_ttl=60), False),  # type: ignore
 | |
|     (PoTokenCacheSpec(key_bindings={'v': None}, default_ttl=60), False),  # type: ignore
 | |
| 
 | |
|     (PoTokenCacheSpec(key_bindings={'v': 'video_id', 'e': None}, default_ttl=60), True),
 | |
|     (PoTokenCacheSpec(key_bindings={'v': 'video_id'}, default_ttl=60, write_policy=CacheProviderWritePolicy.WRITE_FIRST), True),
 | |
| ])
 | |
| def test_validate_cache_spec(spec, expected):
 | |
|     assert validate_cache_spec(spec) == expected
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('po_token', [
 | |
|     'invalid-token?',
 | |
|     '123',
 | |
| ])
 | |
| def test_clean_pot_fail(po_token):
 | |
|     with pytest.raises(ValueError, match='Invalid PO Token'):
 | |
|         clean_pot(po_token)
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize('po_token,expected', [
 | |
|     ('TwAA/+8=', 'TwAA_-8='),
 | |
|     ('TwAA%5F%2D9VA6Q92v%5FvEQ4==?extra-param=2', 'TwAA_-9VA6Q92v_vEQ4='),
 | |
| ])
 | |
| def test_clean_pot(po_token, expected):
 | |
|     assert clean_pot(po_token) == expected
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize(
 | |
|     'response, expected',
 | |
|     [
 | |
|         (None, False),
 | |
|         (PoTokenResponse(None), False),
 | |
|         (PoTokenResponse(1), False),
 | |
|         (PoTokenResponse('invalid-token?'), False),
 | |
|         (PoTokenResponse(EXAMPLE_PO_TOKEN, expires_at='abc'), False),  # type: ignore
 | |
|         (PoTokenResponse(EXAMPLE_PO_TOKEN, expires_at=100), False),
 | |
|         (PoTokenResponse(EXAMPLE_PO_TOKEN, expires_at=time.time() + 10000.0), False),  # type: ignore
 | |
|         (PoTokenResponse(EXAMPLE_PO_TOKEN), True),
 | |
|         (PoTokenResponse(EXAMPLE_PO_TOKEN, expires_at=-1), True),
 | |
|         (PoTokenResponse(EXAMPLE_PO_TOKEN, expires_at=0), True),
 | |
|         (PoTokenResponse(EXAMPLE_PO_TOKEN, expires_at=int(time.time()) + 10000), True),
 | |
|     ],
 | |
| )
 | |
| def test_validate_pot_response(response, expected):
 | |
|     assert validate_response(response) == expected
 | |
| 
 | |
| 
 | |
| def test_built_in_provider(ie, logger):
 | |
|     class BuiltinProviderDefaultT(BuiltinIEContentProvider, suffix='T'):
 | |
|         def is_available(self):
 | |
|             return True
 | |
| 
 | |
|     class BuiltinProviderCustomNameT(BuiltinIEContentProvider, suffix='T'):
 | |
|         PROVIDER_NAME = 'CustomName'
 | |
| 
 | |
|         def is_available(self):
 | |
|             return True
 | |
| 
 | |
|     class ExternalProviderDefaultT(IEContentProvider, suffix='T'):
 | |
|         def is_available(self):
 | |
|             return True
 | |
| 
 | |
|     class ExternalProviderCustomT(IEContentProvider, suffix='T'):
 | |
|         PROVIDER_NAME = 'custom'
 | |
|         PROVIDER_VERSION = '5.4b2'
 | |
| 
 | |
|         def is_available(self):
 | |
|             return True
 | |
| 
 | |
|     class ExternalProviderUnavailableT(IEContentProvider, suffix='T'):
 | |
|         def is_available(self) -> bool:
 | |
|             return False
 | |
| 
 | |
|     class BuiltinProviderUnavailableT(IEContentProvider, suffix='T'):
 | |
|         def is_available(self) -> bool:
 | |
|             return False
 | |
| 
 | |
|     built_in_default = BuiltinProviderDefaultT(ie=ie, logger=logger, settings={})
 | |
|     built_in_custom_name = BuiltinProviderCustomNameT(ie=ie, logger=logger, settings={})
 | |
|     built_in_unavailable = BuiltinProviderUnavailableT(ie=ie, logger=logger, settings={})
 | |
|     external_default = ExternalProviderDefaultT(ie=ie, logger=logger, settings={})
 | |
|     external_custom = ExternalProviderCustomT(ie=ie, logger=logger, settings={})
 | |
|     external_unavailable = ExternalProviderUnavailableT(ie=ie, logger=logger, settings={})
 | |
| 
 | |
|     assert provider_display_list([]) == 'none'
 | |
|     assert provider_display_list([built_in_default]) == 'BuiltinProviderDefault'
 | |
|     assert provider_display_list([external_unavailable]) == 'ExternalProviderUnavailable-0.0.0 (external, unavailable)'
 | |
|     assert provider_display_list([
 | |
|         built_in_default,
 | |
|         built_in_custom_name,
 | |
|         external_default,
 | |
|         external_custom,
 | |
|         external_unavailable,
 | |
|         built_in_unavailable],
 | |
|     ) == 'BuiltinProviderDefault, CustomName, ExternalProviderDefault-0.0.0 (external), custom-5.4b2 (external), ExternalProviderUnavailable-0.0.0 (external, unavailable), BuiltinProviderUnavailable-0.0.0 (external, unavailable)'
 |