From 09078190b0f33d14ae2b402913c64b724acf4bcb Mon Sep 17 00:00:00 2001 From: sepro Date: Tue, 30 Dec 2025 15:02:35 +0100 Subject: [PATCH] [ie/iqiyi] Remove broken login support (#15441) Authored by: seproDev --- test/test_iqiyi_sdk_interpreter.py | 44 ------- yt_dlp/extractor/iqiyi.py | 184 ----------------------------- 2 files changed, 228 deletions(-) delete mode 100644 test/test_iqiyi_sdk_interpreter.py diff --git a/test/test_iqiyi_sdk_interpreter.py b/test/test_iqiyi_sdk_interpreter.py deleted file mode 100644 index 4e41007c82..0000000000 --- a/test/test_iqiyi_sdk_interpreter.py +++ /dev/null @@ -1,44 +0,0 @@ -#!/usr/bin/env python3 - -# Allow direct execution -import os -import sys -import unittest - -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - - -from test.helper import FakeYDL, is_download_test -from yt_dlp.extractor import IqiyiIE - - -class WarningLogger: - def __init__(self): - self.messages = [] - - def warning(self, msg): - self.messages.append(msg) - - def debug(self, msg): - pass - - def error(self, msg): - pass - - -@is_download_test -class TestIqiyiSDKInterpreter(unittest.TestCase): - def test_iqiyi_sdk_interpreter(self): - """ - Test the functionality of IqiyiSDKInterpreter by trying to log in - - If `sign` is incorrect, /validate call throws an HTTP 556 error - """ - logger = WarningLogger() - ie = IqiyiIE(FakeYDL({'logger': logger})) - ie._perform_login('foo', 'bar') - self.assertTrue('unable to log in:' in logger.messages[0]) - - -if __name__ == '__main__': - unittest.main() diff --git a/yt_dlp/extractor/iqiyi.py b/yt_dlp/extractor/iqiyi.py index 735b44637c..f8b4afa9f9 100644 --- a/yt_dlp/extractor/iqiyi.py +++ b/yt_dlp/extractor/iqiyi.py @@ -9,14 +9,12 @@ from .openload import PhantomJSwrapper from ..utils import ( ExtractorError, clean_html, - decode_packed_codes, float_or_none, format_field, get_element_by_attribute, get_element_by_id, int_or_none, js_to_json, - ohdave_rsa_encrypt, parse_age_limit, parse_duration, parse_iso8601, @@ -33,143 +31,12 @@ def md5_text(text): return hashlib.md5(text.encode()).hexdigest() -class IqiyiSDK: - def __init__(self, target, ip, timestamp): - self.target = target - self.ip = ip - self.timestamp = timestamp - - @staticmethod - def split_sum(data): - return str(sum(int(p, 16) for p in data)) - - @staticmethod - def digit_sum(num): - if isinstance(num, int): - num = str(num) - return str(sum(map(int, num))) - - def even_odd(self): - even = self.digit_sum(str(self.timestamp)[::2]) - odd = self.digit_sum(str(self.timestamp)[1::2]) - return even, odd - - def preprocess(self, chunksize): - self.target = md5_text(self.target) - chunks = [] - for i in range(32 // chunksize): - chunks.append(self.target[chunksize * i:chunksize * (i + 1)]) - if 32 % chunksize: - chunks.append(self.target[32 - 32 % chunksize:]) - return chunks, list(map(int, self.ip.split('.'))) - - def mod(self, modulus): - chunks, ip = self.preprocess(32) - self.target = chunks[0] + ''.join(str(p % modulus) for p in ip) - - def split(self, chunksize): - modulus_map = { - 4: 256, - 5: 10, - 8: 100, - } - - chunks, ip = self.preprocess(chunksize) - ret = '' - for i in range(len(chunks)): - ip_part = str(ip[i] % modulus_map[chunksize]) if i < 4 else '' - if chunksize == 8: - ret += ip_part + chunks[i] - else: - ret += chunks[i] + ip_part - self.target = ret - - def handle_input16(self): - self.target = md5_text(self.target) - self.target = self.split_sum(self.target[:16]) + self.target + self.split_sum(self.target[16:]) - - def handle_input8(self): - self.target = md5_text(self.target) - ret = '' - for i in range(4): - part = self.target[8 * i:8 * (i + 1)] - ret += self.split_sum(part) + part - self.target = ret - - def handleSum(self): - self.target = md5_text(self.target) - self.target = self.split_sum(self.target) + self.target - - def date(self, scheme): - self.target = md5_text(self.target) - d = time.localtime(self.timestamp) - strings = { - 'y': str(d.tm_year), - 'm': '%02d' % d.tm_mon, - 'd': '%02d' % d.tm_mday, - } - self.target += ''.join(strings[c] for c in scheme) - - def split_time_even_odd(self): - even, odd = self.even_odd() - self.target = odd + md5_text(self.target) + even - - def split_time_odd_even(self): - even, odd = self.even_odd() - self.target = even + md5_text(self.target) + odd - - def split_ip_time_sum(self): - chunks, ip = self.preprocess(32) - self.target = str(sum(ip)) + chunks[0] + self.digit_sum(self.timestamp) - - def split_time_ip_sum(self): - chunks, ip = self.preprocess(32) - self.target = self.digit_sum(self.timestamp) + chunks[0] + str(sum(ip)) - - -class IqiyiSDKInterpreter: - def __init__(self, sdk_code): - self.sdk_code = sdk_code - - def run(self, target, ip, timestamp): - self.sdk_code = decode_packed_codes(self.sdk_code) - - functions = re.findall(r'input=([a-zA-Z0-9]+)\(input', self.sdk_code) - - sdk = IqiyiSDK(target, ip, timestamp) - - other_functions = { - 'handleSum': sdk.handleSum, - 'handleInput8': sdk.handle_input8, - 'handleInput16': sdk.handle_input16, - 'splitTimeEvenOdd': sdk.split_time_even_odd, - 'splitTimeOddEven': sdk.split_time_odd_even, - 'splitIpTimeSum': sdk.split_ip_time_sum, - 'splitTimeIpSum': sdk.split_time_ip_sum, - } - for function in functions: - if re.match(r'mod\d+', function): - sdk.mod(int(function[3:])) - elif re.match(r'date[ymd]{3}', function): - sdk.date(function[4:]) - elif re.match(r'split\d+', function): - sdk.split(int(function[5:])) - elif function in other_functions: - other_functions[function]() - else: - raise ExtractorError(f'Unknown function {function}') - - return sdk.target - - class IqiyiIE(InfoExtractor): IE_NAME = 'iqiyi' IE_DESC = '爱奇艺' _VALID_URL = r'https?://(?:(?:[^.]+\.)?iqiyi\.com|www\.pps\.tv)/.+\.html' - _NETRC_MACHINE = 'iqiyi' - _TESTS = [{ 'url': 'http://www.iqiyi.com/v_19rrojlavg.html', # MD5 checksum differs on my machine and Travis CI @@ -234,57 +101,6 @@ class IqiyiIE(InfoExtractor): '18': 7, # 1080p } - @staticmethod - def _rsa_fun(data): - # public key extracted from http://static.iqiyi.com/js/qiyiV2/20160129180840/jobs/i18n/i18nIndex.js - N = 0xab86b6371b5318aaa1d3c9e612a9f1264f372323c8c0f19875b5fc3b3fd3afcc1e5bec527aa94bfa85bffc157e4245aebda05389a5357b75115ac94f074aefcd - e = 65537 - - return ohdave_rsa_encrypt(data, e, N) - - def _perform_login(self, username, password): - - data = self._download_json( - 'http://kylin.iqiyi.com/get_token', None, - note='Get token for logging', errnote='Unable to get token for logging') - sdk = data['sdk'] - timestamp = int(time.time()) - target = ( - f'/apis/reglogin/login.action?lang=zh_TW&area_code=null&email={username}' - f'&passwd={self._rsa_fun(password.encode())}&agenttype=1&from=undefined&keeplogin=0&piccode=&fromurl=&_pos=1') - - interp = IqiyiSDKInterpreter(sdk) - sign = interp.run(target, data['ip'], timestamp) - - validation_params = { - 'target': target, - 'server': 'BEA3AA1908656AABCCFF76582C4C6660', - 'token': data['token'], - 'bird_src': 'f8d91d57af224da7893dd397d52d811a', - 'sign': sign, - 'bird_t': timestamp, - } - validation_result = self._download_json( - 'http://kylin.iqiyi.com/validate?' + urllib.parse.urlencode(validation_params), None, - note='Validate credentials', errnote='Unable to validate credentials') - - MSG_MAP = { - 'P00107': 'please login via the web interface and enter the CAPTCHA code', - 'P00117': 'bad username or password', - } - - code = validation_result['code'] - if code != 'A00000': - msg = MSG_MAP.get(code) - if not msg: - msg = f'error {code}' - if validation_result.get('msg'): - msg += ': ' + validation_result['msg'] - self.report_warning('unable to log in: ' + msg) - return False - - return True - def get_raw_data(self, tvid, video_id): tm = int(time.time() * 1000)