mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2025-12-09 17:55:28 +00:00
[cleanup] Deprecate more compat functions (#11439)
Authored by: seproDev
This commit is contained in:
@@ -26,7 +26,7 @@ import unicodedata
|
||||
|
||||
from .cache import Cache
|
||||
from .compat import urllib # isort: split
|
||||
from .compat import compat_os_name, urllib_req_to_req
|
||||
from .compat import urllib_req_to_req
|
||||
from .cookies import CookieLoadError, LenientSimpleCookie, load_cookies
|
||||
from .downloader import FFmpegFD, get_suitable_downloader, shorten_protocol_name
|
||||
from .downloader.rtmp import rtmpdump_version
|
||||
@@ -109,7 +109,6 @@ from .utils import (
|
||||
determine_ext,
|
||||
determine_protocol,
|
||||
encode_compat_str,
|
||||
encodeFilename,
|
||||
escapeHTML,
|
||||
expand_path,
|
||||
extract_basic_auth,
|
||||
@@ -167,7 +166,7 @@ from .utils.networking import (
|
||||
)
|
||||
from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
|
||||
|
||||
if compat_os_name == 'nt':
|
||||
if os.name == 'nt':
|
||||
import ctypes
|
||||
|
||||
|
||||
@@ -643,7 +642,7 @@ class YoutubeDL:
|
||||
out=stdout,
|
||||
error=sys.stderr,
|
||||
screen=sys.stderr if self.params.get('quiet') else stdout,
|
||||
console=None if compat_os_name == 'nt' else next(
|
||||
console=None if os.name == 'nt' else next(
|
||||
filter(supports_terminal_sequences, (sys.stderr, sys.stdout)), None),
|
||||
)
|
||||
|
||||
@@ -952,7 +951,7 @@ class YoutubeDL:
|
||||
self._write_string(f'{self._bidi_workaround(message)}\n', self._out_files.error, only_once=only_once)
|
||||
|
||||
def _send_console_code(self, code):
|
||||
if compat_os_name == 'nt' or not self._out_files.console:
|
||||
if os.name == 'nt' or not self._out_files.console:
|
||||
return
|
||||
self._write_string(code, self._out_files.console)
|
||||
|
||||
@@ -960,7 +959,7 @@ class YoutubeDL:
|
||||
if not self.params.get('consoletitle', False):
|
||||
return
|
||||
message = remove_terminal_sequences(message)
|
||||
if compat_os_name == 'nt':
|
||||
if os.name == 'nt':
|
||||
if ctypes.windll.kernel32.GetConsoleWindow():
|
||||
# c_wchar_p() might not be necessary if `message` is
|
||||
# already of type unicode()
|
||||
@@ -3255,9 +3254,9 @@ class YoutubeDL:
|
||||
|
||||
if full_filename is None:
|
||||
return
|
||||
if not self._ensure_dir_exists(encodeFilename(full_filename)):
|
||||
if not self._ensure_dir_exists(full_filename):
|
||||
return
|
||||
if not self._ensure_dir_exists(encodeFilename(temp_filename)):
|
||||
if not self._ensure_dir_exists(temp_filename):
|
||||
return
|
||||
|
||||
if self._write_description('video', info_dict,
|
||||
@@ -3289,16 +3288,16 @@ class YoutubeDL:
|
||||
if self.params.get('writeannotations', False):
|
||||
annofn = self.prepare_filename(info_dict, 'annotation')
|
||||
if annofn:
|
||||
if not self._ensure_dir_exists(encodeFilename(annofn)):
|
||||
if not self._ensure_dir_exists(annofn):
|
||||
return
|
||||
if not self.params.get('overwrites', True) and os.path.exists(encodeFilename(annofn)):
|
||||
if not self.params.get('overwrites', True) and os.path.exists(annofn):
|
||||
self.to_screen('[info] Video annotations are already present')
|
||||
elif not info_dict.get('annotations'):
|
||||
self.report_warning('There are no annotations to write.')
|
||||
else:
|
||||
try:
|
||||
self.to_screen('[info] Writing video annotations to: ' + annofn)
|
||||
with open(encodeFilename(annofn), 'w', encoding='utf-8') as annofile:
|
||||
with open(annofn, 'w', encoding='utf-8') as annofile:
|
||||
annofile.write(info_dict['annotations'])
|
||||
except (KeyError, TypeError):
|
||||
self.report_warning('There are no annotations to write.')
|
||||
@@ -3314,14 +3313,14 @@ class YoutubeDL:
|
||||
f'Cannot write internet shortcut file because the actual URL of "{info_dict["webpage_url"]}" is unknown')
|
||||
return True
|
||||
linkfn = replace_extension(self.prepare_filename(info_dict, 'link'), link_type, info_dict.get('ext'))
|
||||
if not self._ensure_dir_exists(encodeFilename(linkfn)):
|
||||
if not self._ensure_dir_exists(linkfn):
|
||||
return False
|
||||
if self.params.get('overwrites', True) and os.path.exists(encodeFilename(linkfn)):
|
||||
if self.params.get('overwrites', True) and os.path.exists(linkfn):
|
||||
self.to_screen(f'[info] Internet shortcut (.{link_type}) is already present')
|
||||
return True
|
||||
try:
|
||||
self.to_screen(f'[info] Writing internet shortcut (.{link_type}) to: {linkfn}')
|
||||
with open(encodeFilename(to_high_limit_path(linkfn)), 'w', encoding='utf-8',
|
||||
with open(to_high_limit_path(linkfn), 'w', encoding='utf-8',
|
||||
newline='\r\n' if link_type == 'url' else '\n') as linkfile:
|
||||
template_vars = {'url': url}
|
||||
if link_type == 'desktop':
|
||||
@@ -3352,7 +3351,7 @@ class YoutubeDL:
|
||||
|
||||
if self.params.get('skip_download'):
|
||||
info_dict['filepath'] = temp_filename
|
||||
info_dict['__finaldir'] = os.path.dirname(os.path.abspath(encodeFilename(full_filename)))
|
||||
info_dict['__finaldir'] = os.path.dirname(os.path.abspath(full_filename))
|
||||
info_dict['__files_to_move'] = files_to_move
|
||||
replace_info_dict(self.run_pp(MoveFilesAfterDownloadPP(self, False), info_dict))
|
||||
info_dict['__write_download_archive'] = self.params.get('force_write_download_archive')
|
||||
@@ -3482,7 +3481,7 @@ class YoutubeDL:
|
||||
self.report_file_already_downloaded(dl_filename)
|
||||
|
||||
dl_filename = dl_filename or temp_filename
|
||||
info_dict['__finaldir'] = os.path.dirname(os.path.abspath(encodeFilename(full_filename)))
|
||||
info_dict['__finaldir'] = os.path.dirname(os.path.abspath(full_filename))
|
||||
|
||||
except network_exceptions as err:
|
||||
self.report_error(f'unable to download video data: {err}')
|
||||
@@ -4297,7 +4296,7 @@ class YoutubeDL:
|
||||
else:
|
||||
try:
|
||||
self.to_screen(f'[info] Writing {label} description to: {descfn}')
|
||||
with open(encodeFilename(descfn), 'w', encoding='utf-8') as descfile:
|
||||
with open(descfn, 'w', encoding='utf-8') as descfile:
|
||||
descfile.write(ie_result['description'])
|
||||
except OSError:
|
||||
self.report_error(f'Cannot write {label} description file {descfn}')
|
||||
@@ -4399,7 +4398,7 @@ class YoutubeDL:
|
||||
try:
|
||||
uf = self.urlopen(Request(t['url'], headers=t.get('http_headers', {})))
|
||||
self.to_screen(f'[info] Writing {thumb_display_id} to: {thumb_filename}')
|
||||
with open(encodeFilename(thumb_filename), 'wb') as thumbf:
|
||||
with open(thumb_filename, 'wb') as thumbf:
|
||||
shutil.copyfileobj(uf, thumbf)
|
||||
ret.append((thumb_filename, thumb_filename_final))
|
||||
t['filepath'] = thumb_filename
|
||||
|
||||
@@ -14,7 +14,6 @@ import os
|
||||
import re
|
||||
import traceback
|
||||
|
||||
from .compat import compat_os_name
|
||||
from .cookies import SUPPORTED_BROWSERS, SUPPORTED_KEYRINGS, CookieLoadError
|
||||
from .downloader.external import get_external_downloader
|
||||
from .extractor import list_extractor_classes
|
||||
@@ -44,7 +43,6 @@ from .utils import (
|
||||
GeoUtils,
|
||||
PlaylistEntries,
|
||||
SameFileError,
|
||||
decodeOption,
|
||||
download_range_func,
|
||||
expand_path,
|
||||
float_or_none,
|
||||
@@ -883,8 +881,8 @@ def parse_options(argv=None):
|
||||
'listsubtitles': opts.listsubtitles,
|
||||
'subtitlesformat': opts.subtitlesformat,
|
||||
'subtitleslangs': opts.subtitleslangs,
|
||||
'matchtitle': decodeOption(opts.matchtitle),
|
||||
'rejecttitle': decodeOption(opts.rejecttitle),
|
||||
'matchtitle': opts.matchtitle,
|
||||
'rejecttitle': opts.rejecttitle,
|
||||
'max_downloads': opts.max_downloads,
|
||||
'prefer_free_formats': opts.prefer_free_formats,
|
||||
'trim_file_name': opts.trim_file_name,
|
||||
@@ -1053,7 +1051,7 @@ def _real_main(argv=None):
|
||||
ydl.warn_if_short_id(args)
|
||||
|
||||
# Show a useful error message and wait for keypress if not launched from shell on Windows
|
||||
if not args and compat_os_name == 'nt' and getattr(sys, 'frozen', False):
|
||||
if not args and os.name == 'nt' and getattr(sys, 'frozen', False):
|
||||
import ctypes.wintypes
|
||||
import msvcrt
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@ from math import ceil
|
||||
|
||||
from .compat import compat_ord
|
||||
from .dependencies import Cryptodome
|
||||
from .utils import bytes_to_intlist, intlist_to_bytes
|
||||
|
||||
if Cryptodome.AES:
|
||||
def aes_cbc_decrypt_bytes(data, key, iv):
|
||||
@@ -17,15 +16,15 @@ if Cryptodome.AES:
|
||||
else:
|
||||
def aes_cbc_decrypt_bytes(data, key, iv):
|
||||
""" Decrypt bytes with AES-CBC using native implementation since pycryptodome is unavailable """
|
||||
return intlist_to_bytes(aes_cbc_decrypt(*map(bytes_to_intlist, (data, key, iv))))
|
||||
return bytes(aes_cbc_decrypt(*map(list, (data, key, iv))))
|
||||
|
||||
def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
|
||||
""" Decrypt bytes with AES-GCM using native implementation since pycryptodome is unavailable """
|
||||
return intlist_to_bytes(aes_gcm_decrypt_and_verify(*map(bytes_to_intlist, (data, key, tag, nonce))))
|
||||
return bytes(aes_gcm_decrypt_and_verify(*map(list, (data, key, tag, nonce))))
|
||||
|
||||
|
||||
def aes_cbc_encrypt_bytes(data, key, iv, **kwargs):
|
||||
return intlist_to_bytes(aes_cbc_encrypt(*map(bytes_to_intlist, (data, key, iv)), **kwargs))
|
||||
return bytes(aes_cbc_encrypt(*map(list, (data, key, iv)), **kwargs))
|
||||
|
||||
|
||||
BLOCK_SIZE_BYTES = 16
|
||||
@@ -221,7 +220,7 @@ def aes_gcm_decrypt_and_verify(data, key, tag, nonce):
|
||||
j0 = [*nonce, 0, 0, 0, 1]
|
||||
else:
|
||||
fill = (BLOCK_SIZE_BYTES - (len(nonce) % BLOCK_SIZE_BYTES)) % BLOCK_SIZE_BYTES + 8
|
||||
ghash_in = nonce + [0] * fill + bytes_to_intlist((8 * len(nonce)).to_bytes(8, 'big'))
|
||||
ghash_in = nonce + [0] * fill + list((8 * len(nonce)).to_bytes(8, 'big'))
|
||||
j0 = ghash(hash_subkey, ghash_in)
|
||||
|
||||
# TODO: add nonce support to aes_ctr_decrypt
|
||||
@@ -234,9 +233,9 @@ def aes_gcm_decrypt_and_verify(data, key, tag, nonce):
|
||||
s_tag = ghash(
|
||||
hash_subkey,
|
||||
data
|
||||
+ [0] * pad_len # pad
|
||||
+ bytes_to_intlist((0 * 8).to_bytes(8, 'big') # length of associated data
|
||||
+ ((len(data) * 8).to_bytes(8, 'big'))), # length of data
|
||||
+ [0] * pad_len # pad
|
||||
+ list((0 * 8).to_bytes(8, 'big') # length of associated data
|
||||
+ ((len(data) * 8).to_bytes(8, 'big'))), # length of data
|
||||
)
|
||||
|
||||
if tag != aes_ctr_encrypt(s_tag, key, j0):
|
||||
@@ -300,8 +299,8 @@ def aes_decrypt_text(data, password, key_size_bytes):
|
||||
"""
|
||||
NONCE_LENGTH_BYTES = 8
|
||||
|
||||
data = bytes_to_intlist(base64.b64decode(data))
|
||||
password = bytes_to_intlist(password.encode())
|
||||
data = list(base64.b64decode(data))
|
||||
password = list(password.encode())
|
||||
|
||||
key = password[:key_size_bytes] + [0] * (key_size_bytes - len(password))
|
||||
key = aes_encrypt(key[:BLOCK_SIZE_BYTES], key_expansion(key)) * (key_size_bytes // BLOCK_SIZE_BYTES)
|
||||
@@ -310,7 +309,7 @@ def aes_decrypt_text(data, password, key_size_bytes):
|
||||
cipher = data[NONCE_LENGTH_BYTES:]
|
||||
|
||||
decrypted_data = aes_ctr_decrypt(cipher, key, nonce + [0] * (BLOCK_SIZE_BYTES - NONCE_LENGTH_BYTES))
|
||||
return intlist_to_bytes(decrypted_data)
|
||||
return bytes(decrypted_data)
|
||||
|
||||
|
||||
RCON = (0x8d, 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80, 0x1b, 0x36)
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import os
|
||||
import sys
|
||||
import xml.etree.ElementTree as etree
|
||||
|
||||
from .compat_utils import passthrough_module
|
||||
@@ -24,33 +23,14 @@ def compat_etree_fromstring(text):
|
||||
return etree.XML(text, parser=etree.XMLParser(target=_TreeBuilder()))
|
||||
|
||||
|
||||
compat_os_name = os._name if os.name == 'java' else os.name
|
||||
|
||||
|
||||
def compat_shlex_quote(s):
|
||||
from ..utils import shell_quote
|
||||
return shell_quote(s)
|
||||
|
||||
|
||||
def compat_ord(c):
|
||||
return c if isinstance(c, int) else ord(c)
|
||||
|
||||
|
||||
if compat_os_name == 'nt' and sys.version_info < (3, 8):
|
||||
# os.path.realpath on Windows does not follow symbolic links
|
||||
# prior to Python 3.8 (see https://bugs.python.org/issue9949)
|
||||
def compat_realpath(path):
|
||||
while os.path.islink(path):
|
||||
path = os.path.abspath(os.readlink(path))
|
||||
return os.path.realpath(path)
|
||||
else:
|
||||
compat_realpath = os.path.realpath
|
||||
|
||||
|
||||
# Python 3.8+ does not honor %HOME% on windows, but this breaks compatibility with youtube-dl
|
||||
# See https://github.com/yt-dlp/yt-dlp/issues/792
|
||||
# https://docs.python.org/3/library/os.path.html#os.path.expanduser
|
||||
if compat_os_name in ('nt', 'ce'):
|
||||
if os.name in ('nt', 'ce'):
|
||||
def compat_expanduser(path):
|
||||
HOME = os.environ.get('HOME')
|
||||
if not HOME:
|
||||
|
||||
@@ -8,16 +8,14 @@ passthrough_module(__name__, '.._legacy', callback=lambda attr: warnings.warn(
|
||||
DeprecationWarning(f'{__name__}.{attr} is deprecated'), stacklevel=6))
|
||||
del passthrough_module
|
||||
|
||||
import base64
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import functools # noqa: F401
|
||||
import os
|
||||
|
||||
compat_str = str
|
||||
|
||||
compat_b64decode = base64.b64decode
|
||||
compat_os_name = os.name
|
||||
compat_realpath = os.path.realpath
|
||||
|
||||
compat_urlparse = urllib.parse
|
||||
compat_parse_qs = urllib.parse.parse_qs
|
||||
compat_urllib_parse_unquote = urllib.parse.unquote
|
||||
compat_urllib_parse_urlencode = urllib.parse.urlencode
|
||||
compat_urllib_parse_urlparse = urllib.parse.urlparse
|
||||
|
||||
def compat_shlex_quote(s):
|
||||
from ..utils import shell_quote
|
||||
return shell_quote(s)
|
||||
|
||||
@@ -30,7 +30,7 @@ from asyncio import run as compat_asyncio_run # noqa: F401
|
||||
from re import Pattern as compat_Pattern # noqa: F401
|
||||
from re import match as compat_Match # noqa: F401
|
||||
|
||||
from . import compat_expanduser, compat_HTMLParseError, compat_realpath
|
||||
from . import compat_expanduser, compat_HTMLParseError
|
||||
from .compat_utils import passthrough_module
|
||||
from ..dependencies import brotli as compat_brotli # noqa: F401
|
||||
from ..dependencies import websockets as compat_websockets # noqa: F401
|
||||
@@ -78,7 +78,7 @@ compat_kwargs = lambda kwargs: kwargs
|
||||
compat_map = map
|
||||
compat_numeric_types = (int, float, complex)
|
||||
compat_os_path_expanduser = compat_expanduser
|
||||
compat_os_path_realpath = compat_realpath
|
||||
compat_os_path_realpath = os.path.realpath
|
||||
compat_print = print
|
||||
compat_shlex_split = shlex.split
|
||||
compat_socket_create_connection = socket.create_connection
|
||||
@@ -104,5 +104,12 @@ compat_xml_parse_error = compat_xml_etree_ElementTree_ParseError = etree.ParseEr
|
||||
compat_xpath = lambda xpath: xpath
|
||||
compat_zip = zip
|
||||
workaround_optparse_bug9161 = lambda: None
|
||||
compat_str = str
|
||||
compat_b64decode = base64.b64decode
|
||||
compat_urlparse = urllib.parse
|
||||
compat_parse_qs = urllib.parse.parse_qs
|
||||
compat_urllib_parse_unquote = urllib.parse.unquote
|
||||
compat_urllib_parse_urlencode = urllib.parse.urlencode
|
||||
compat_urllib_parse_urlparse = urllib.parse.urlparse
|
||||
|
||||
legacy = []
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
# flake8: noqa: F405
|
||||
from functools import * # noqa: F403
|
||||
|
||||
from .compat_utils import passthrough_module
|
||||
|
||||
passthrough_module(__name__, 'functools')
|
||||
del passthrough_module
|
||||
@@ -7,9 +7,9 @@ passthrough_module(__name__, 'urllib.request')
|
||||
del passthrough_module
|
||||
|
||||
|
||||
from .. import compat_os_name
|
||||
import os
|
||||
|
||||
if compat_os_name == 'nt':
|
||||
if os.name == 'nt':
|
||||
# On older Python versions, proxies are extracted from Windows registry erroneously. [1]
|
||||
# If the https proxy in the registry does not have a scheme, urllib will incorrectly add https:// to it. [2]
|
||||
# It is unlikely that the user has actually set it to be https, so we should be fine to safely downgrade
|
||||
@@ -37,4 +37,4 @@ if compat_os_name == 'nt':
|
||||
def getproxies():
|
||||
return getproxies_environment() or getproxies_registry_patched()
|
||||
|
||||
del compat_os_name
|
||||
del os
|
||||
|
||||
@@ -25,7 +25,6 @@ from .aes import (
|
||||
aes_gcm_decrypt_and_verify_bytes,
|
||||
unpad_pkcs7,
|
||||
)
|
||||
from .compat import compat_os_name
|
||||
from .dependencies import (
|
||||
_SECRETSTORAGE_UNAVAILABLE_REASON,
|
||||
secretstorage,
|
||||
@@ -343,7 +342,7 @@ def _extract_chrome_cookies(browser_name, profile, keyring, logger):
|
||||
logger.debug(f'cookie version breakdown: {counts}')
|
||||
return jar
|
||||
except PermissionError as error:
|
||||
if compat_os_name == 'nt' and error.errno == 13:
|
||||
if os.name == 'nt' and error.errno == 13:
|
||||
message = 'Could not copy Chrome cookie database. See https://github.com/yt-dlp/yt-dlp/issues/7271 for more info'
|
||||
logger.error(message)
|
||||
raise DownloadError(message) # force exit
|
||||
|
||||
@@ -20,9 +20,7 @@ from ..utils import (
|
||||
Namespace,
|
||||
RetryManager,
|
||||
classproperty,
|
||||
decodeArgument,
|
||||
deprecation_warning,
|
||||
encodeFilename,
|
||||
format_bytes,
|
||||
join_nonempty,
|
||||
parse_bytes,
|
||||
@@ -219,7 +217,7 @@ class FileDownloader:
|
||||
def temp_name(self, filename):
|
||||
"""Returns a temporary filename for the given filename."""
|
||||
if self.params.get('nopart', False) or filename == '-' or \
|
||||
(os.path.exists(encodeFilename(filename)) and not os.path.isfile(encodeFilename(filename))):
|
||||
(os.path.exists(filename) and not os.path.isfile(filename)):
|
||||
return filename
|
||||
return filename + '.part'
|
||||
|
||||
@@ -273,7 +271,7 @@ class FileDownloader:
|
||||
"""Try to set the last-modified time of the given file."""
|
||||
if last_modified_hdr is None:
|
||||
return
|
||||
if not os.path.isfile(encodeFilename(filename)):
|
||||
if not os.path.isfile(filename):
|
||||
return
|
||||
timestr = last_modified_hdr
|
||||
if timestr is None:
|
||||
@@ -432,13 +430,13 @@ class FileDownloader:
|
||||
"""
|
||||
nooverwrites_and_exists = (
|
||||
not self.params.get('overwrites', True)
|
||||
and os.path.exists(encodeFilename(filename))
|
||||
and os.path.exists(filename)
|
||||
)
|
||||
|
||||
if not hasattr(filename, 'write'):
|
||||
continuedl_and_exists = (
|
||||
self.params.get('continuedl', True)
|
||||
and os.path.isfile(encodeFilename(filename))
|
||||
and os.path.isfile(filename)
|
||||
and not self.params.get('nopart', False)
|
||||
)
|
||||
|
||||
@@ -448,7 +446,7 @@ class FileDownloader:
|
||||
self._hook_progress({
|
||||
'filename': filename,
|
||||
'status': 'finished',
|
||||
'total_bytes': os.path.getsize(encodeFilename(filename)),
|
||||
'total_bytes': os.path.getsize(filename),
|
||||
}, info_dict)
|
||||
self._finish_multiline_status()
|
||||
return True, False
|
||||
@@ -489,9 +487,7 @@ class FileDownloader:
|
||||
if not self.params.get('verbose', False):
|
||||
return
|
||||
|
||||
str_args = [decodeArgument(a) for a in args]
|
||||
|
||||
if exe is None:
|
||||
exe = os.path.basename(str_args[0])
|
||||
exe = os.path.basename(args[0])
|
||||
|
||||
self.write_debug(f'{exe} command line: {shell_quote(str_args)}')
|
||||
self.write_debug(f'{exe} command line: {shell_quote(args)}')
|
||||
|
||||
@@ -23,7 +23,6 @@ from ..utils import (
|
||||
cli_valueless_option,
|
||||
determine_ext,
|
||||
encodeArgument,
|
||||
encodeFilename,
|
||||
find_available_port,
|
||||
remove_end,
|
||||
traverse_obj,
|
||||
@@ -67,7 +66,7 @@ class ExternalFD(FragmentFD):
|
||||
'elapsed': time.time() - started,
|
||||
}
|
||||
if filename != '-':
|
||||
fsize = os.path.getsize(encodeFilename(tmpfilename))
|
||||
fsize = os.path.getsize(tmpfilename)
|
||||
self.try_rename(tmpfilename, filename)
|
||||
status.update({
|
||||
'downloaded_bytes': fsize,
|
||||
@@ -184,9 +183,9 @@ class ExternalFD(FragmentFD):
|
||||
dest.write(decrypt_fragment(fragment, src.read()))
|
||||
src.close()
|
||||
if not self.params.get('keep_fragments', False):
|
||||
self.try_remove(encodeFilename(fragment_filename))
|
||||
self.try_remove(fragment_filename)
|
||||
dest.close()
|
||||
self.try_remove(encodeFilename(f'{tmpfilename}.frag.urls'))
|
||||
self.try_remove(f'{tmpfilename}.frag.urls')
|
||||
return 0
|
||||
|
||||
def _call_process(self, cmd, info_dict):
|
||||
@@ -620,7 +619,7 @@ class FFmpegFD(ExternalFD):
|
||||
args += self._configuration_args(('_o1', '_o', ''))
|
||||
|
||||
args = [encodeArgument(opt) for opt in args]
|
||||
args.append(encodeFilename(ffpp._ffmpeg_filename_argument(tmpfilename), True))
|
||||
args.append(ffpp._ffmpeg_filename_argument(tmpfilename))
|
||||
self._debug_cmd(args)
|
||||
|
||||
piped = any(fmt['url'] in ('-', 'pipe:') for fmt in selected_formats)
|
||||
|
||||
@@ -9,10 +9,9 @@ import time
|
||||
from .common import FileDownloader
|
||||
from .http import HttpFD
|
||||
from ..aes import aes_cbc_decrypt_bytes, unpad_pkcs7
|
||||
from ..compat import compat_os_name
|
||||
from ..networking import Request
|
||||
from ..networking.exceptions import HTTPError, IncompleteRead
|
||||
from ..utils import DownloadError, RetryManager, encodeFilename, traverse_obj
|
||||
from ..utils import DownloadError, RetryManager, traverse_obj
|
||||
from ..utils.networking import HTTPHeaderDict
|
||||
from ..utils.progress import ProgressCalculator
|
||||
|
||||
@@ -152,7 +151,7 @@ class FragmentFD(FileDownloader):
|
||||
if self.__do_ytdl_file(ctx):
|
||||
self._write_ytdl_file(ctx)
|
||||
if not self.params.get('keep_fragments', False):
|
||||
self.try_remove(encodeFilename(ctx['fragment_filename_sanitized']))
|
||||
self.try_remove(ctx['fragment_filename_sanitized'])
|
||||
del ctx['fragment_filename_sanitized']
|
||||
|
||||
def _prepare_frag_download(self, ctx):
|
||||
@@ -188,7 +187,7 @@ class FragmentFD(FileDownloader):
|
||||
})
|
||||
|
||||
if self.__do_ytdl_file(ctx):
|
||||
ytdl_file_exists = os.path.isfile(encodeFilename(self.ytdl_filename(ctx['filename'])))
|
||||
ytdl_file_exists = os.path.isfile(self.ytdl_filename(ctx['filename']))
|
||||
continuedl = self.params.get('continuedl', True)
|
||||
if continuedl and ytdl_file_exists:
|
||||
self._read_ytdl_file(ctx)
|
||||
@@ -390,7 +389,7 @@ class FragmentFD(FileDownloader):
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
pass
|
||||
|
||||
if compat_os_name == 'nt':
|
||||
if os.name == 'nt':
|
||||
def future_result(future):
|
||||
while True:
|
||||
try:
|
||||
|
||||
@@ -15,7 +15,6 @@ from ..utils import (
|
||||
ThrottledDownload,
|
||||
XAttrMetadataError,
|
||||
XAttrUnavailableError,
|
||||
encodeFilename,
|
||||
int_or_none,
|
||||
parse_http_range,
|
||||
try_call,
|
||||
@@ -58,9 +57,8 @@ class HttpFD(FileDownloader):
|
||||
|
||||
if self.params.get('continuedl', True):
|
||||
# Establish possible resume length
|
||||
if os.path.isfile(encodeFilename(ctx.tmpfilename)):
|
||||
ctx.resume_len = os.path.getsize(
|
||||
encodeFilename(ctx.tmpfilename))
|
||||
if os.path.isfile(ctx.tmpfilename):
|
||||
ctx.resume_len = os.path.getsize(ctx.tmpfilename)
|
||||
|
||||
ctx.is_resume = ctx.resume_len > 0
|
||||
|
||||
@@ -241,7 +239,7 @@ class HttpFD(FileDownloader):
|
||||
ctx.resume_len = byte_counter
|
||||
else:
|
||||
try:
|
||||
ctx.resume_len = os.path.getsize(encodeFilename(ctx.tmpfilename))
|
||||
ctx.resume_len = os.path.getsize(ctx.tmpfilename)
|
||||
except FileNotFoundError:
|
||||
ctx.resume_len = 0
|
||||
raise RetryDownload(e)
|
||||
|
||||
@@ -8,7 +8,6 @@ from ..utils import (
|
||||
Popen,
|
||||
check_executable,
|
||||
encodeArgument,
|
||||
encodeFilename,
|
||||
get_exe_version,
|
||||
)
|
||||
|
||||
@@ -179,7 +178,7 @@ class RtmpFD(FileDownloader):
|
||||
return False
|
||||
|
||||
while retval in (RD_INCOMPLETE, RD_FAILED) and not test and not live:
|
||||
prevsize = os.path.getsize(encodeFilename(tmpfilename))
|
||||
prevsize = os.path.getsize(tmpfilename)
|
||||
self.to_screen(f'[rtmpdump] Downloaded {prevsize} bytes')
|
||||
time.sleep(5.0) # This seems to be needed
|
||||
args = [*basic_args, '--resume']
|
||||
@@ -187,7 +186,7 @@ class RtmpFD(FileDownloader):
|
||||
args += ['--skip', '1']
|
||||
args = [encodeArgument(a) for a in args]
|
||||
retval = run_rtmpdump(args)
|
||||
cursize = os.path.getsize(encodeFilename(tmpfilename))
|
||||
cursize = os.path.getsize(tmpfilename)
|
||||
if prevsize == cursize and retval == RD_FAILED:
|
||||
break
|
||||
# Some rtmp streams seem abort after ~ 99.8%. Don't complain for those
|
||||
@@ -196,7 +195,7 @@ class RtmpFD(FileDownloader):
|
||||
retval = RD_SUCCESS
|
||||
break
|
||||
if retval == RD_SUCCESS or (test and retval == RD_INCOMPLETE):
|
||||
fsize = os.path.getsize(encodeFilename(tmpfilename))
|
||||
fsize = os.path.getsize(tmpfilename)
|
||||
self.to_screen(f'[rtmpdump] Downloaded {fsize} bytes')
|
||||
self.try_rename(tmpfilename, filename)
|
||||
self._hook_progress({
|
||||
|
||||
@@ -2,7 +2,7 @@ import os
|
||||
import subprocess
|
||||
|
||||
from .common import FileDownloader
|
||||
from ..utils import check_executable, encodeFilename
|
||||
from ..utils import check_executable
|
||||
|
||||
|
||||
class RtspFD(FileDownloader):
|
||||
@@ -26,7 +26,7 @@ class RtspFD(FileDownloader):
|
||||
|
||||
retval = subprocess.call(args)
|
||||
if retval == 0:
|
||||
fsize = os.path.getsize(encodeFilename(tmpfilename))
|
||||
fsize = os.path.getsize(tmpfilename)
|
||||
self.to_screen(f'\r[{args[0]}] {fsize} bytes')
|
||||
self.try_rename(tmpfilename, filename)
|
||||
self._hook_progress({
|
||||
|
||||
@@ -6,7 +6,6 @@ import hmac
|
||||
import io
|
||||
import json
|
||||
import re
|
||||
import struct
|
||||
import time
|
||||
import urllib.parse
|
||||
import uuid
|
||||
@@ -18,10 +17,8 @@ from ..networking.exceptions import TransportError
|
||||
from ..utils import (
|
||||
ExtractorError,
|
||||
OnDemandPagedList,
|
||||
bytes_to_intlist,
|
||||
decode_base_n,
|
||||
int_or_none,
|
||||
intlist_to_bytes,
|
||||
time_seconds,
|
||||
traverse_obj,
|
||||
update_url_query,
|
||||
@@ -72,15 +69,15 @@ class AbemaLicenseRH(RequestHandler):
|
||||
})
|
||||
|
||||
res = decode_base_n(license_response['k'], table=self._STRTABLE)
|
||||
encvideokey = bytes_to_intlist(struct.pack('>QQ', res >> 64, res & 0xffffffffffffffff))
|
||||
encvideokey = list(res.to_bytes(16, 'big'))
|
||||
|
||||
h = hmac.new(
|
||||
binascii.unhexlify(self._HKEY),
|
||||
(license_response['cid'] + self.ie._DEVICE_ID).encode(),
|
||||
digestmod=hashlib.sha256)
|
||||
enckey = bytes_to_intlist(h.digest())
|
||||
enckey = list(h.digest())
|
||||
|
||||
return intlist_to_bytes(aes_ecb_decrypt(encvideokey, enckey))
|
||||
return bytes(aes_ecb_decrypt(encvideokey, enckey))
|
||||
|
||||
|
||||
class AbemaTVBaseIE(InfoExtractor):
|
||||
|
||||
@@ -11,11 +11,9 @@ from ..networking.exceptions import HTTPError
|
||||
from ..utils import (
|
||||
ExtractorError,
|
||||
ass_subtitles_timecode,
|
||||
bytes_to_intlist,
|
||||
bytes_to_long,
|
||||
float_or_none,
|
||||
int_or_none,
|
||||
intlist_to_bytes,
|
||||
join_nonempty,
|
||||
long_to_bytes,
|
||||
parse_iso8601,
|
||||
@@ -198,16 +196,16 @@ Format: Marked,Start,End,Style,Name,MarginL,MarginR,MarginV,Effect,Text'''
|
||||
|
||||
links_url = try_get(options, lambda x: x['video']['url']) or (video_base_url + 'link')
|
||||
self._K = ''.join(random.choices('0123456789abcdef', k=16))
|
||||
message = bytes_to_intlist(json.dumps({
|
||||
message = list(json.dumps({
|
||||
'k': self._K,
|
||||
't': token,
|
||||
}))
|
||||
}).encode())
|
||||
|
||||
# Sometimes authentication fails for no good reason, retry with
|
||||
# a different random padding
|
||||
links_data = None
|
||||
for _ in range(3):
|
||||
padded_message = intlist_to_bytes(pkcs1pad(message, 128))
|
||||
padded_message = bytes(pkcs1pad(message, 128))
|
||||
n, e = self._RSA_KEY
|
||||
encrypted_message = long_to_bytes(pow(bytes_to_long(padded_message), e, n))
|
||||
authorization = base64.b64encode(encrypted_message).decode()
|
||||
|
||||
@@ -8,10 +8,8 @@ import time
|
||||
from .common import InfoExtractor
|
||||
from ..aes import aes_encrypt
|
||||
from ..utils import (
|
||||
bytes_to_intlist,
|
||||
determine_ext,
|
||||
int_or_none,
|
||||
intlist_to_bytes,
|
||||
join_nonempty,
|
||||
smuggle_url,
|
||||
strip_jsonp,
|
||||
@@ -234,8 +232,8 @@ class AnvatoIE(InfoExtractor):
|
||||
server_time = self._server_time(access_key, video_id)
|
||||
input_data = f'{server_time}~{md5_text(video_data_url)}~{md5_text(server_time)}'
|
||||
|
||||
auth_secret = intlist_to_bytes(aes_encrypt(
|
||||
bytes_to_intlist(input_data[:64]), bytes_to_intlist(self._AUTH_KEY)))
|
||||
auth_secret = bytes(aes_encrypt(
|
||||
list(input_data[:64].encode()), list(self._AUTH_KEY)))
|
||||
query = {
|
||||
'X-Anvato-Adst-Auth': base64.b64encode(auth_secret).decode('ascii'),
|
||||
'rtyp': 'fp',
|
||||
|
||||
@@ -25,7 +25,6 @@ import xml.etree.ElementTree
|
||||
from ..compat import (
|
||||
compat_etree_fromstring,
|
||||
compat_expanduser,
|
||||
compat_os_name,
|
||||
urllib_req_to_req,
|
||||
)
|
||||
from ..cookies import LenientSimpleCookie
|
||||
@@ -1029,7 +1028,7 @@ class InfoExtractor:
|
||||
filename = sanitize_filename(f'{basen}.dump', restricted=True)
|
||||
# Working around MAX_PATH limitation on Windows (see
|
||||
# http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx)
|
||||
if compat_os_name == 'nt':
|
||||
if os.name == 'nt':
|
||||
absfilepath = os.path.abspath(filename)
|
||||
if len(absfilepath) > 259:
|
||||
filename = fR'\\?\{absfilepath}'
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
import base64
|
||||
|
||||
from .common import InfoExtractor
|
||||
from ..aes import aes_cbc_decrypt, unpad_pkcs7
|
||||
from ..aes import aes_cbc_decrypt_bytes, unpad_pkcs7
|
||||
from ..utils import (
|
||||
ExtractorError,
|
||||
bytes_to_intlist,
|
||||
intlist_to_bytes,
|
||||
unified_strdate,
|
||||
)
|
||||
|
||||
@@ -68,10 +66,10 @@ class ShemarooMeIE(InfoExtractor):
|
||||
data_json = self._download_json('https://www.shemaroome.com/users/user_all_lists', video_id, data=data.encode())
|
||||
if not data_json.get('status'):
|
||||
raise ExtractorError('Premium videos cannot be downloaded yet.', expected=True)
|
||||
url_data = bytes_to_intlist(base64.b64decode(data_json['new_play_url']))
|
||||
key = bytes_to_intlist(base64.b64decode(data_json['key']))
|
||||
iv = [0] * 16
|
||||
m3u8_url = unpad_pkcs7(intlist_to_bytes(aes_cbc_decrypt(url_data, key, iv))).decode('ascii')
|
||||
url_data = base64.b64decode(data_json['new_play_url'])
|
||||
key = base64.b64decode(data_json['key'])
|
||||
iv = bytes(16)
|
||||
m3u8_url = unpad_pkcs7(aes_cbc_decrypt_bytes(url_data, key, iv)).decode('ascii')
|
||||
headers = {'stream_key': data_json['stream_key']}
|
||||
formats, m3u8_subs = self._extract_m3u8_formats_and_subtitles(m3u8_url, video_id, fatal=False, headers=headers)
|
||||
for fmt in formats:
|
||||
|
||||
@@ -9,7 +9,6 @@ from ..utils import (
|
||||
RetryManager,
|
||||
_configuration_args,
|
||||
deprecation_warning,
|
||||
encodeFilename,
|
||||
)
|
||||
|
||||
|
||||
@@ -151,7 +150,7 @@ class PostProcessor(metaclass=PostProcessorMetaClass):
|
||||
|
||||
def try_utime(self, path, atime, mtime, errnote='Cannot update utime of file'):
|
||||
try:
|
||||
os.utime(encodeFilename(path), (atime, mtime))
|
||||
os.utime(path, (atime, mtime))
|
||||
except Exception:
|
||||
self.report_warning(errnote)
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@ from ..utils import (
|
||||
PostProcessingError,
|
||||
check_executable,
|
||||
encodeArgument,
|
||||
encodeFilename,
|
||||
prepend_extension,
|
||||
shell_quote,
|
||||
)
|
||||
@@ -68,7 +67,7 @@ class EmbedThumbnailPP(FFmpegPostProcessor):
|
||||
self.to_screen('There are no thumbnails on disk')
|
||||
return [], info
|
||||
thumbnail_filename = info['thumbnails'][idx]['filepath']
|
||||
if not os.path.exists(encodeFilename(thumbnail_filename)):
|
||||
if not os.path.exists(thumbnail_filename):
|
||||
self.report_warning('Skipping embedding the thumbnail because the file is missing.')
|
||||
return [], info
|
||||
|
||||
@@ -85,7 +84,7 @@ class EmbedThumbnailPP(FFmpegPostProcessor):
|
||||
thumbnail_filename = convertor.convert_thumbnail(thumbnail_filename, 'png')
|
||||
thumbnail_ext = 'png'
|
||||
|
||||
mtime = os.stat(encodeFilename(filename)).st_mtime
|
||||
mtime = os.stat(filename).st_mtime
|
||||
|
||||
success = True
|
||||
if info['ext'] == 'mp3':
|
||||
@@ -154,12 +153,12 @@ class EmbedThumbnailPP(FFmpegPostProcessor):
|
||||
else:
|
||||
if not prefer_atomicparsley:
|
||||
self.to_screen('mutagen was not found. Falling back to AtomicParsley')
|
||||
cmd = [encodeFilename(atomicparsley, True),
|
||||
encodeFilename(filename, True),
|
||||
cmd = [atomicparsley,
|
||||
filename,
|
||||
encodeArgument('--artwork'),
|
||||
encodeFilename(thumbnail_filename, True),
|
||||
thumbnail_filename,
|
||||
encodeArgument('-o'),
|
||||
encodeFilename(temp_filename, True)]
|
||||
temp_filename]
|
||||
cmd += [encodeArgument(o) for o in self._configuration_args('AtomicParsley')]
|
||||
|
||||
self._report_run('atomicparsley', filename)
|
||||
|
||||
@@ -21,7 +21,6 @@ from ..utils import (
|
||||
determine_ext,
|
||||
dfxp2srt,
|
||||
encodeArgument,
|
||||
encodeFilename,
|
||||
filter_dict,
|
||||
float_or_none,
|
||||
is_outdated_version,
|
||||
@@ -243,13 +242,13 @@ class FFmpegPostProcessor(PostProcessor):
|
||||
try:
|
||||
if self.probe_available:
|
||||
cmd = [
|
||||
encodeFilename(self.probe_executable, True),
|
||||
self.probe_executable,
|
||||
encodeArgument('-show_streams')]
|
||||
else:
|
||||
cmd = [
|
||||
encodeFilename(self.executable, True),
|
||||
self.executable,
|
||||
encodeArgument('-i')]
|
||||
cmd.append(encodeFilename(self._ffmpeg_filename_argument(path), True))
|
||||
cmd.append(self._ffmpeg_filename_argument(path))
|
||||
self.write_debug(f'{self.basename} command line: {shell_quote(cmd)}')
|
||||
stdout, stderr, returncode = Popen.run(
|
||||
cmd, text=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
@@ -282,7 +281,7 @@ class FFmpegPostProcessor(PostProcessor):
|
||||
self.check_version()
|
||||
|
||||
cmd = [
|
||||
encodeFilename(self.probe_executable, True),
|
||||
self.probe_executable,
|
||||
encodeArgument('-hide_banner'),
|
||||
encodeArgument('-show_format'),
|
||||
encodeArgument('-show_streams'),
|
||||
@@ -335,9 +334,9 @@ class FFmpegPostProcessor(PostProcessor):
|
||||
self.check_version()
|
||||
|
||||
oldest_mtime = min(
|
||||
os.stat(encodeFilename(path)).st_mtime for path, _ in input_path_opts if path)
|
||||
os.stat(path).st_mtime for path, _ in input_path_opts if path)
|
||||
|
||||
cmd = [encodeFilename(self.executable, True), encodeArgument('-y')]
|
||||
cmd = [self.executable, encodeArgument('-y')]
|
||||
# avconv does not have repeat option
|
||||
if self.basename == 'ffmpeg':
|
||||
cmd += [encodeArgument('-loglevel'), encodeArgument('repeat+info')]
|
||||
@@ -353,7 +352,7 @@ class FFmpegPostProcessor(PostProcessor):
|
||||
args.append('-i')
|
||||
return (
|
||||
[encodeArgument(arg) for arg in args]
|
||||
+ [encodeFilename(self._ffmpeg_filename_argument(file), True)])
|
||||
+ [self._ffmpeg_filename_argument(file)])
|
||||
|
||||
for arg_type, path_opts in (('i', input_path_opts), ('o', output_path_opts)):
|
||||
cmd += itertools.chain.from_iterable(
|
||||
@@ -522,8 +521,8 @@ class FFmpegExtractAudioPP(FFmpegPostProcessor):
|
||||
return [], information
|
||||
orig_path = prepend_extension(path, 'orig')
|
||||
temp_path = prepend_extension(path, 'temp')
|
||||
if (self._nopostoverwrites and os.path.exists(encodeFilename(new_path))
|
||||
and os.path.exists(encodeFilename(orig_path))):
|
||||
if (self._nopostoverwrites and os.path.exists(new_path)
|
||||
and os.path.exists(orig_path)):
|
||||
self.to_screen(f'Post-process file {new_path} exists, skipping')
|
||||
return [], information
|
||||
|
||||
@@ -838,7 +837,7 @@ class FFmpegMergerPP(FFmpegPostProcessor):
|
||||
args.extend(['-map', f'{i}:v:0'])
|
||||
self.to_screen(f'Merging formats into "{filename}"')
|
||||
self.run_ffmpeg_multiple_files(info['__files_to_merge'], temp_filename, args)
|
||||
os.rename(encodeFilename(temp_filename), encodeFilename(filename))
|
||||
os.rename(temp_filename, filename)
|
||||
return info['__files_to_merge'], info
|
||||
|
||||
def can_merge(self):
|
||||
@@ -1039,7 +1038,7 @@ class FFmpegSplitChaptersPP(FFmpegPostProcessor):
|
||||
|
||||
def _ffmpeg_args_for_chapter(self, number, chapter, info):
|
||||
destination = self._prepare_filename(number, chapter, info)
|
||||
if not self._downloader._ensure_dir_exists(encodeFilename(destination)):
|
||||
if not self._downloader._ensure_dir_exists(destination):
|
||||
return
|
||||
|
||||
chapter['filepath'] = destination
|
||||
|
||||
@@ -4,8 +4,6 @@ from .common import PostProcessor
|
||||
from ..compat import shutil
|
||||
from ..utils import (
|
||||
PostProcessingError,
|
||||
decodeFilename,
|
||||
encodeFilename,
|
||||
make_dir,
|
||||
)
|
||||
|
||||
@@ -21,25 +19,25 @@ class MoveFilesAfterDownloadPP(PostProcessor):
|
||||
return 'MoveFiles'
|
||||
|
||||
def run(self, info):
|
||||
dl_path, dl_name = os.path.split(encodeFilename(info['filepath']))
|
||||
dl_path, dl_name = os.path.split(info['filepath'])
|
||||
finaldir = info.get('__finaldir', dl_path)
|
||||
finalpath = os.path.join(finaldir, dl_name)
|
||||
if self._downloaded:
|
||||
info['__files_to_move'][info['filepath']] = decodeFilename(finalpath)
|
||||
info['__files_to_move'][info['filepath']] = finalpath
|
||||
|
||||
make_newfilename = lambda old: decodeFilename(os.path.join(finaldir, os.path.basename(encodeFilename(old))))
|
||||
make_newfilename = lambda old: os.path.join(finaldir, os.path.basename(old))
|
||||
for oldfile, newfile in info['__files_to_move'].items():
|
||||
if not newfile:
|
||||
newfile = make_newfilename(oldfile)
|
||||
if os.path.abspath(encodeFilename(oldfile)) == os.path.abspath(encodeFilename(newfile)):
|
||||
if os.path.abspath(oldfile) == os.path.abspath(newfile):
|
||||
continue
|
||||
if not os.path.exists(encodeFilename(oldfile)):
|
||||
if not os.path.exists(oldfile):
|
||||
self.report_warning(f'File "{oldfile}" cannot be found')
|
||||
continue
|
||||
if os.path.exists(encodeFilename(newfile)):
|
||||
if os.path.exists(newfile):
|
||||
if self.get_param('overwrites', True):
|
||||
self.report_warning(f'Replacing existing file "{newfile}"')
|
||||
os.remove(encodeFilename(newfile))
|
||||
os.remove(newfile)
|
||||
else:
|
||||
self.report_warning(
|
||||
f'Cannot move file "{oldfile}" out of temporary directory since "{newfile}" already exists. ')
|
||||
|
||||
@@ -9,7 +9,6 @@ from ..utils import (
|
||||
check_executable,
|
||||
cli_option,
|
||||
encodeArgument,
|
||||
encodeFilename,
|
||||
prepend_extension,
|
||||
shell_quote,
|
||||
str_or_none,
|
||||
@@ -52,7 +51,7 @@ class SponSkrubPP(PostProcessor):
|
||||
return [], information
|
||||
|
||||
filename = information['filepath']
|
||||
if not os.path.exists(encodeFilename(filename)): # no download
|
||||
if not os.path.exists(filename): # no download
|
||||
return [], information
|
||||
|
||||
if information['extractor_key'].lower() != 'youtube':
|
||||
@@ -71,8 +70,8 @@ class SponSkrubPP(PostProcessor):
|
||||
self.report_warning('If sponskrub is run multiple times, unintended parts of the video could be cut out.')
|
||||
|
||||
temp_filename = prepend_extension(filename, self._temp_ext)
|
||||
if os.path.exists(encodeFilename(temp_filename)):
|
||||
os.remove(encodeFilename(temp_filename))
|
||||
if os.path.exists(temp_filename):
|
||||
os.remove(temp_filename)
|
||||
|
||||
cmd = [self.path]
|
||||
if not self.cutout:
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import os
|
||||
|
||||
from .common import PostProcessor
|
||||
from ..compat import compat_os_name
|
||||
from ..utils import (
|
||||
PostProcessingError,
|
||||
XAttrMetadataError,
|
||||
@@ -57,7 +56,7 @@ class XAttrMetadataPP(PostProcessor):
|
||||
elif e.reason == 'VALUE_TOO_LONG':
|
||||
self.report_warning(f'Unable to write extended attribute "{xattrname}" due to too long values.')
|
||||
else:
|
||||
tip = ('You need to use NTFS' if compat_os_name == 'nt'
|
||||
tip = ('You need to use NTFS' if os.name == 'nt'
|
||||
else 'You may have to enable them in your "/etc/fstab"')
|
||||
raise PostProcessingError(f'This filesystem doesn\'t support extended attributes. {tip}')
|
||||
|
||||
|
||||
@@ -13,7 +13,6 @@ import sys
|
||||
from dataclasses import dataclass
|
||||
from zipimport import zipimporter
|
||||
|
||||
from .compat import compat_realpath
|
||||
from .networking import Request
|
||||
from .networking.exceptions import HTTPError, network_exceptions
|
||||
from .utils import (
|
||||
@@ -201,8 +200,6 @@ class UpdateInfo:
|
||||
binary_name: str | None = _get_binary_name() # noqa: RUF009: Always returns the same value
|
||||
checksum: str | None = None
|
||||
|
||||
_has_update = True
|
||||
|
||||
|
||||
class Updater:
|
||||
# XXX: use class variables to simplify testing
|
||||
@@ -523,7 +520,7 @@ class Updater:
|
||||
@functools.cached_property
|
||||
def filename(self):
|
||||
"""Filename of the executable"""
|
||||
return compat_realpath(_get_variant_and_executable_path()[1])
|
||||
return os.path.realpath(_get_variant_and_executable_path()[1])
|
||||
|
||||
@functools.cached_property
|
||||
def cmd(self):
|
||||
@@ -562,62 +559,14 @@ class Updater:
|
||||
f'Unable to {action}{delim} visit '
|
||||
f'https://github.com/{self.requested_repo}/releases/{path}', True)
|
||||
|
||||
# XXX: Everything below this line in this class is deprecated / for compat only
|
||||
@property
|
||||
def _target_tag(self):
|
||||
"""Deprecated; requested tag with 'tags/' prepended when necessary for API calls"""
|
||||
return f'tags/{self.requested_tag}' if self.requested_tag != 'latest' else self.requested_tag
|
||||
|
||||
def _check_update(self):
|
||||
"""Deprecated; report whether there is an update available"""
|
||||
return bool(self.query_update(_output=True))
|
||||
|
||||
def __getattr__(self, attribute: str):
|
||||
"""Compat getter function for deprecated attributes"""
|
||||
deprecated_props_map = {
|
||||
'check_update': '_check_update',
|
||||
'target_tag': '_target_tag',
|
||||
'target_channel': 'requested_channel',
|
||||
}
|
||||
update_info_props_map = {
|
||||
'has_update': '_has_update',
|
||||
'new_version': 'version',
|
||||
'latest_version': 'requested_version',
|
||||
'release_name': 'binary_name',
|
||||
'release_hash': 'checksum',
|
||||
}
|
||||
|
||||
if attribute not in deprecated_props_map and attribute not in update_info_props_map:
|
||||
raise AttributeError(f'{type(self).__name__!r} object has no attribute {attribute!r}')
|
||||
|
||||
msg = f'{type(self).__name__}.{attribute} is deprecated and will be removed in a future version'
|
||||
if attribute in deprecated_props_map:
|
||||
source_name = deprecated_props_map[attribute]
|
||||
if not source_name.startswith('_'):
|
||||
msg += f'. Please use {source_name!r} instead'
|
||||
source = self
|
||||
mapping = deprecated_props_map
|
||||
|
||||
else: # attribute in update_info_props_map
|
||||
msg += '. Please call query_update() instead'
|
||||
source = self.query_update()
|
||||
if source is None:
|
||||
source = UpdateInfo('', None, None, None)
|
||||
source._has_update = False
|
||||
mapping = update_info_props_map
|
||||
|
||||
deprecation_warning(msg)
|
||||
for target_name, source_name in mapping.items():
|
||||
value = getattr(source, source_name)
|
||||
setattr(self, target_name, value)
|
||||
|
||||
return getattr(self, attribute)
|
||||
|
||||
|
||||
def run_update(ydl):
|
||||
"""Update the program file with the latest version from the repository
|
||||
@returns Whether there was a successful update (No update = False)
|
||||
"""
|
||||
deprecation_warning(
|
||||
'"yt_dlp.update.run_update(ydl)" is deprecated and may be removed in a future version. '
|
||||
'Use "yt_dlp.update.Updater(ydl).update()" instead')
|
||||
return Updater(ydl).update()
|
||||
|
||||
|
||||
|
||||
@@ -9,31 +9,23 @@ passthrough_module(__name__, '.._legacy', callback=lambda attr: warnings.warn(
|
||||
del passthrough_module
|
||||
|
||||
|
||||
from ._utils import preferredencoding
|
||||
import re
|
||||
import struct
|
||||
|
||||
|
||||
def encodeFilename(s, for_subprocess=False):
|
||||
assert isinstance(s, str)
|
||||
return s
|
||||
def bytes_to_intlist(bs):
|
||||
if not bs:
|
||||
return []
|
||||
if isinstance(bs[0], int): # Python 3
|
||||
return list(bs)
|
||||
else:
|
||||
return [ord(c) for c in bs]
|
||||
|
||||
|
||||
def decodeFilename(b, for_subprocess=False):
|
||||
return b
|
||||
def intlist_to_bytes(xs):
|
||||
if not xs:
|
||||
return b''
|
||||
return struct.pack('%dB' % len(xs), *xs)
|
||||
|
||||
|
||||
def decodeArgument(b):
|
||||
return b
|
||||
|
||||
|
||||
def decodeOption(optval):
|
||||
if optval is None:
|
||||
return optval
|
||||
if isinstance(optval, bytes):
|
||||
optval = optval.decode(preferredencoding())
|
||||
|
||||
assert isinstance(optval, str)
|
||||
return optval
|
||||
|
||||
|
||||
def error_to_compat_str(err):
|
||||
return str(err)
|
||||
compiled_regex_type = type(re.compile(''))
|
||||
|
||||
@@ -313,3 +313,30 @@ def make_HTTPS_handler(params, **kwargs):
|
||||
|
||||
def process_communicate_or_kill(p, *args, **kwargs):
|
||||
return Popen.communicate_or_kill(p, *args, **kwargs)
|
||||
|
||||
|
||||
def encodeFilename(s, for_subprocess=False):
|
||||
assert isinstance(s, str)
|
||||
return s
|
||||
|
||||
|
||||
def decodeFilename(b, for_subprocess=False):
|
||||
return b
|
||||
|
||||
|
||||
def decodeArgument(b):
|
||||
return b
|
||||
|
||||
|
||||
def decodeOption(optval):
|
||||
if optval is None:
|
||||
return optval
|
||||
if isinstance(optval, bytes):
|
||||
optval = optval.decode(preferredencoding())
|
||||
|
||||
assert isinstance(optval, str)
|
||||
return optval
|
||||
|
||||
|
||||
def error_to_compat_str(err):
|
||||
return str(err)
|
||||
|
||||
@@ -49,15 +49,11 @@ from ..compat import (
|
||||
compat_etree_fromstring,
|
||||
compat_expanduser,
|
||||
compat_HTMLParseError,
|
||||
compat_os_name,
|
||||
)
|
||||
from ..dependencies import xattr
|
||||
|
||||
__name__ = __name__.rsplit('.', 1)[0] # noqa: A001: Pretend to be the parent module
|
||||
|
||||
# This is not clearly defined otherwise
|
||||
compiled_regex_type = type(re.compile(''))
|
||||
|
||||
|
||||
class NO_DEFAULT:
|
||||
pass
|
||||
@@ -874,7 +870,7 @@ class Popen(subprocess.Popen):
|
||||
kwargs.setdefault('encoding', 'utf-8')
|
||||
kwargs.setdefault('errors', 'replace')
|
||||
|
||||
if shell and compat_os_name == 'nt' and kwargs.get('executable') is None:
|
||||
if shell and os.name == 'nt' and kwargs.get('executable') is None:
|
||||
if not isinstance(args, str):
|
||||
args = shell_quote(args, shell=True)
|
||||
shell = False
|
||||
@@ -1457,7 +1453,7 @@ def system_identifier():
|
||||
@functools.cache
|
||||
def get_windows_version():
|
||||
""" Get Windows version. returns () if it's not running on Windows """
|
||||
if compat_os_name == 'nt':
|
||||
if os.name == 'nt':
|
||||
return version_tuple(platform.win32_ver()[1])
|
||||
else:
|
||||
return ()
|
||||
@@ -1470,7 +1466,7 @@ def write_string(s, out=None, encoding=None):
|
||||
if not out:
|
||||
return
|
||||
|
||||
if compat_os_name == 'nt' and supports_terminal_sequences(out):
|
||||
if os.name == 'nt' and supports_terminal_sequences(out):
|
||||
s = re.sub(r'([\r\n]+)', r' \1', s)
|
||||
|
||||
enc, buffer = None, out
|
||||
@@ -1503,21 +1499,6 @@ def deprecation_warning(msg, *, printer=None, stacklevel=0, **kwargs):
|
||||
deprecation_warning._cache = set()
|
||||
|
||||
|
||||
def bytes_to_intlist(bs):
|
||||
if not bs:
|
||||
return []
|
||||
if isinstance(bs[0], int): # Python 3
|
||||
return list(bs)
|
||||
else:
|
||||
return [ord(c) for c in bs]
|
||||
|
||||
|
||||
def intlist_to_bytes(xs):
|
||||
if not xs:
|
||||
return b''
|
||||
return struct.pack('%dB' % len(xs), *xs)
|
||||
|
||||
|
||||
class LockingUnsupportedError(OSError):
|
||||
msg = 'File locking is not supported'
|
||||
|
||||
@@ -1701,7 +1682,7 @@ _CMD_QUOTE_TRANS = str.maketrans({
|
||||
def shell_quote(args, *, shell=False):
|
||||
args = list(variadic(args))
|
||||
|
||||
if compat_os_name != 'nt':
|
||||
if os.name != 'nt':
|
||||
return shlex.join(args)
|
||||
|
||||
trans = _CMD_QUOTE_TRANS if shell else _WINDOWS_QUOTE_TRANS
|
||||
@@ -4516,7 +4497,7 @@ def urshift(val, n):
|
||||
def write_xattr(path, key, value):
|
||||
# Windows: Write xattrs to NTFS Alternate Data Streams:
|
||||
# http://en.wikipedia.org/wiki/NTFS#Alternate_data_streams_.28ADS.29
|
||||
if compat_os_name == 'nt':
|
||||
if os.name == 'nt':
|
||||
assert ':' not in key
|
||||
assert os.path.exists(path)
|
||||
|
||||
@@ -4778,12 +4759,12 @@ def jwt_decode_hs256(jwt):
|
||||
return json.loads(base64.urlsafe_b64decode(f'{payload_b64}==='))
|
||||
|
||||
|
||||
WINDOWS_VT_MODE = False if compat_os_name == 'nt' else None
|
||||
WINDOWS_VT_MODE = False if os.name == 'nt' else None
|
||||
|
||||
|
||||
@functools.cache
|
||||
def supports_terminal_sequences(stream):
|
||||
if compat_os_name == 'nt':
|
||||
if os.name == 'nt':
|
||||
if not WINDOWS_VT_MODE:
|
||||
return False
|
||||
elif not os.getenv('TERM'):
|
||||
@@ -4877,7 +4858,7 @@ def parse_http_range(range):
|
||||
|
||||
def read_stdin(what):
|
||||
if what:
|
||||
eof = 'Ctrl+Z' if compat_os_name == 'nt' else 'Ctrl+D'
|
||||
eof = 'Ctrl+Z' if os.name == 'nt' else 'Ctrl+D'
|
||||
write_string(f'Reading {what} from STDIN - EOF ({eof}) to end:\n')
|
||||
return sys.stdin
|
||||
|
||||
|
||||
Reference in New Issue
Block a user