import fnmatch
import os
import os.path
import random
import sys
from contextlib import contextmanager
from tempfile import NamedTemporaryFile
from typing import Any, BinaryIO, Generator, List, Union, cast
from pip._vendor.tenacity import retry, stop_after_delay, wait_fixed
from pip._internal.utils.compat import get_path_uid
from pip._internal.utils.misc import format_size
def check_path_owner(path: str) -> bool:
if sys.platform == "win32" or not hasattr(os, "geteuid"):
return True
assert os.path.isabs(path)
previous = None
while path != previous:
if os.path.lexists(path):
if os.geteuid() == 0:
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
return False
@contextmanager
def adjacent_tmp_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]:
with NamedTemporaryFile(
delete=False,
dir=os.path.dirname(path),
prefix=os.path.basename(path),
suffix=".tmp",
**kwargs,
) as f:
result = cast(BinaryIO, f)
try:
yield result
finally:
result.flush()
os.fsync(result.fileno())
_replace_retry = retry(reraise=True, stop=stop_after_delay(1), wait=wait_fixed(0.25))
replace = _replace_retry(os.replace)
def test_writable_dir(path: str) -> bool:
while not os.path.isdir(path):
parent = os.path.dirname(path)
if parent == path:
break path = parent
if os.name == "posix":
return os.access(path, os.W_OK)
return _test_writable_dir_win(path)
def _test_writable_dir_win(path: str) -> bool:
basename = "accesstest_deleteme_fishfingers_custard_"
alphabet = "abcdefghijklmnopqrstuvwxyz0123456789"
for _ in range(10):
name = basename + "".join(random.choice(alphabet) for _ in range(6))
file = os.path.join(path, name)
try:
fd = os.open(file, os.O_RDWR | os.O_CREAT | os.O_EXCL)
except FileExistsError:
pass
except PermissionError:
return False
else:
os.close(fd)
os.unlink(file)
return True
raise OSError("Unexpected condition testing for writable directory")
def find_files(path: str, pattern: str) -> List[str]:
result: List[str] = []
for root, _, files in os.walk(path):
matches = fnmatch.filter(files, pattern)
result.extend(os.path.join(root, f) for f in matches)
return result
def file_size(path: str) -> Union[int, float]:
if os.path.islink(path):
return 0
return os.path.getsize(path)
def format_file_size(path: str) -> str:
return format_size(file_size(path))
def directory_size(path: str) -> Union[int, float]:
size = 0.0
for root, _dirs, files in os.walk(path):
for filename in files:
file_path = os.path.join(root, filename)
size += file_size(file_path)
return size
def format_directory_size(path: str) -> str:
return format_size(directory_size(path))