import collections
import compileall
import contextlib
import csv
import importlib
import logging
import os.path
import re
import shutil
import sys
import warnings
from base64 import urlsafe_b64encode
from email.message import Message
from itertools import chain, filterfalse, starmap
from typing import (
IO,
TYPE_CHECKING,
Any,
BinaryIO,
Callable,
Dict,
Generator,
Iterable,
Iterator,
List,
NewType,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
from zipfile import ZipFile, ZipInfo
from pip._vendor.distlib.scripts import ScriptMaker
from pip._vendor.distlib.util import get_export_entry
from pip._vendor.packaging.utils import canonicalize_name
from pip._internal.exceptions import InstallationError
from pip._internal.locations import get_major_minor_version
from pip._internal.metadata import (
BaseDistribution,
FilesystemWheel,
get_wheel_distribution,
)
from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl
from pip._internal.models.scheme import SCHEME_KEYS, Scheme
from pip._internal.utils.filesystem import adjacent_tmp_file, replace
from pip._internal.utils.misc import captured_stdout, ensure_dir, hash_file, partition
from pip._internal.utils.unpacking import (
current_umask,
is_within_directory,
set_extracted_file_to_default_mode_plus_executable,
zip_item_is_executable,
)
from pip._internal.utils.wheel import parse_wheel
if TYPE_CHECKING:
from typing import Protocol
class File(Protocol):
src_record_path: "RecordPath"
dest_path: str
changed: bool
def save(self) -> None:
pass
logger = logging.getLogger(__name__)
RecordPath = NewType("RecordPath", str)
InstalledCSVRow = Tuple[RecordPath, str, Union[int, str]]
def rehash(path: str, blocksize: int = 1 << 20) -> Tuple[str, str]:
h, length = hash_file(path, blocksize)
digest = "sha256=" + urlsafe_b64encode(h.digest()).decode("latin1").rstrip("=")
return (digest, str(length))
def csv_io_kwargs(mode: str) -> Dict[str, Any]:
return {"mode": mode, "newline": "", "encoding": "utf-8"}
def fix_script(path: str) -> bool:
assert os.path.isfile(path)
with open(path, "rb") as script:
firstline = script.readline()
if not firstline.startswith(b"#!python"):
return False
exename = sys.executable.encode(sys.getfilesystemencoding())
firstline = b"#!" + exename + os.linesep.encode("ascii")
rest = script.read()
with open(path, "wb") as script:
script.write(firstline)
script.write(rest)
return True
def wheel_root_is_purelib(metadata: Message) -> bool:
return metadata.get("Root-Is-Purelib", "").lower() == "true"
def get_entrypoints(dist: BaseDistribution) -> Tuple[Dict[str, str], Dict[str, str]]:
console_scripts = {}
gui_scripts = {}
for entry_point in dist.iter_entry_points():
if entry_point.group == "console_scripts":
console_scripts[entry_point.name] = entry_point.value
elif entry_point.group == "gui_scripts":
gui_scripts[entry_point.name] = entry_point.value
return console_scripts, gui_scripts
def message_about_scripts_not_on_PATH(scripts: Sequence[str]) -> Optional[str]:
if not scripts:
return None
grouped_by_dir: Dict[str, Set[str]] = collections.defaultdict(set)
for destfile in scripts:
parent_dir = os.path.dirname(destfile)
script_name = os.path.basename(destfile)
grouped_by_dir[parent_dir].add(script_name)
not_warn_dirs = [
os.path.normcase(os.path.normpath(i)).rstrip(os.sep)
for i in os.environ.get("PATH", "").split(os.pathsep)
]
not_warn_dirs.append(
os.path.normcase(os.path.normpath(os.path.dirname(sys.executable)))
)
warn_for: Dict[str, Set[str]] = {
parent_dir: scripts
for parent_dir, scripts in grouped_by_dir.items()
if os.path.normcase(os.path.normpath(parent_dir)) not in not_warn_dirs
}
if not warn_for:
return None
msg_lines = []
for parent_dir, dir_scripts in warn_for.items():
sorted_scripts: List[str] = sorted(dir_scripts)
if len(sorted_scripts) == 1:
start_text = f"script {sorted_scripts[0]} is"
else:
start_text = "scripts {} are".format(
", ".join(sorted_scripts[:-1]) + " and " + sorted_scripts[-1]
)
msg_lines.append(
f"The {start_text} installed in '{parent_dir}' which is not on PATH."
)
last_line_fmt = (
"Consider adding {} to PATH or, if you prefer "
"to suppress this warning, use --no-warn-script-location."
)
if len(msg_lines) == 1:
msg_lines.append(last_line_fmt.format("this directory"))
else:
msg_lines.append(last_line_fmt.format("these directories"))
warn_for_tilde = any(
i[0] == "~" for i in os.environ.get("PATH", "").split(os.pathsep) if i
)
if warn_for_tilde:
tilde_warning_msg = (
"NOTE: The current PATH contains path(s) starting with `~`, "
"which may not be expanded by all applications."
)
msg_lines.append(tilde_warning_msg)
return "\n".join(msg_lines)
def _normalized_outrows(
outrows: Iterable[InstalledCSVRow],
) -> List[Tuple[str, str, str]]:
return sorted(
(record_path, hash_, str(size)) for record_path, hash_, size in outrows
)
def _record_to_fs_path(record_path: RecordPath, lib_dir: str) -> str:
return os.path.join(lib_dir, record_path)
def _fs_to_record_path(path: str, lib_dir: str) -> RecordPath:
if os.path.splitdrive(path)[0].lower() == os.path.splitdrive(lib_dir)[0].lower():
path = os.path.relpath(path, lib_dir)
path = path.replace(os.path.sep, "/")
return cast("RecordPath", path)
def get_csv_rows_for_installed(
old_csv_rows: List[List[str]],
installed: Dict[RecordPath, RecordPath],
changed: Set[RecordPath],
generated: List[str],
lib_dir: str,
) -> List[InstalledCSVRow]:
installed_rows: List[InstalledCSVRow] = []
for row in old_csv_rows:
if len(row) > 3:
logger.warning("RECORD line has more than three elements: %s", row)
old_record_path = cast("RecordPath", row[0])
new_record_path = installed.pop(old_record_path, old_record_path)
if new_record_path in changed:
digest, length = rehash(_record_to_fs_path(new_record_path, lib_dir))
else:
digest = row[1] if len(row) > 1 else ""
length = row[2] if len(row) > 2 else ""
installed_rows.append((new_record_path, digest, length))
for f in generated:
path = _fs_to_record_path(f, lib_dir)
digest, length = rehash(f)
installed_rows.append((path, digest, length))
return installed_rows + [
(installed_record_path, "", "") for installed_record_path in installed.values()
]
def get_console_script_specs(console: Dict[str, str]) -> List[str]:
console = console.copy()
scripts_to_generate = []
pip_script = console.pop("pip", None)
if pip_script:
if "ENSUREPIP_OPTIONS" not in os.environ:
scripts_to_generate.append("pip = " + pip_script)
if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall":
scripts_to_generate.append(f"pip{sys.version_info[0]} = {pip_script}")
scripts_to_generate.append(f"pip{get_major_minor_version()} = {pip_script}")
pip_ep = [k for k in console if re.match(r"pip(\d+(\.\d+)?)?$", k)]
for k in pip_ep:
del console[k]
easy_install_script = console.pop("easy_install", None)
if easy_install_script:
if "ENSUREPIP_OPTIONS" not in os.environ:
scripts_to_generate.append("easy_install = " + easy_install_script)
scripts_to_generate.append(
f"easy_install-{get_major_minor_version()} = {easy_install_script}"
)
easy_install_ep = [
k for k in console if re.match(r"easy_install(-\d+\.\d+)?$", k)
]
for k in easy_install_ep:
del console[k]
scripts_to_generate.extend(starmap("{} = {}".format, console.items()))
return scripts_to_generate
class ZipBackedFile:
def __init__(
self, src_record_path: RecordPath, dest_path: str, zip_file: ZipFile
) -> None:
self.src_record_path = src_record_path
self.dest_path = dest_path
self._zip_file = zip_file
self.changed = False
def _getinfo(self) -> ZipInfo:
return self._zip_file.getinfo(self.src_record_path)
def save(self) -> None:
parent_dir = os.path.dirname(self.dest_path)
ensure_dir(parent_dir)
if os.path.exists(self.dest_path):
os.unlink(self.dest_path)
zipinfo = self._getinfo()
with self._zip_file.open(zipinfo) as f:
with open(self.dest_path, "wb") as dest:
shutil.copyfileobj(f, dest)
if zip_item_is_executable(zipinfo):
set_extracted_file_to_default_mode_plus_executable(self.dest_path)
class ScriptFile:
def __init__(self, file: "File") -> None:
self._file = file
self.src_record_path = self._file.src_record_path
self.dest_path = self._file.dest_path
self.changed = False
def save(self) -> None:
self._file.save()
self.changed = fix_script(self.dest_path)
class MissingCallableSuffix(InstallationError):
def __init__(self, entry_point: str) -> None:
super().__init__(
f"Invalid script entry point: {entry_point} - A callable "
"suffix is required. Cf https://packaging.python.org/"
"specifications/entry-points/#use-for-scripts for more "
"information."
)
def _raise_for_invalid_entrypoint(specification: str) -> None:
entry = get_export_entry(specification)
if entry is not None and entry.suffix is None:
raise MissingCallableSuffix(str(entry))
class PipScriptMaker(ScriptMaker):
def make(
self, specification: str, options: Optional[Dict[str, Any]] = None
) -> List[str]:
_raise_for_invalid_entrypoint(specification)
return super().make(specification, options)
def _install_wheel(
name: str,
wheel_zip: ZipFile,
wheel_path: str,
scheme: Scheme,
pycompile: bool = True,
warn_script_location: bool = True,
direct_url: Optional[DirectUrl] = None,
requested: bool = False,
) -> None:
info_dir, metadata = parse_wheel(wheel_zip, name)
if wheel_root_is_purelib(metadata):
lib_dir = scheme.purelib
else:
lib_dir = scheme.platlib
installed: Dict[RecordPath, RecordPath] = {}
changed: Set[RecordPath] = set()
generated: List[str] = []
def record_installed(
srcfile: RecordPath, destfile: str, modified: bool = False
) -> None:
newpath = _fs_to_record_path(destfile, lib_dir)
installed[srcfile] = newpath
if modified:
changed.add(newpath)
def is_dir_path(path: RecordPath) -> bool:
return path.endswith("/")
def assert_no_path_traversal(dest_dir_path: str, target_path: str) -> None:
if not is_within_directory(dest_dir_path, target_path):
message = (
"The wheel {!r} has a file {!r} trying to install"
" outside the target directory {!r}"
)
raise InstallationError(
message.format(wheel_path, target_path, dest_dir_path)
)
def root_scheme_file_maker(
zip_file: ZipFile, dest: str
) -> Callable[[RecordPath], "File"]:
def make_root_scheme_file(record_path: RecordPath) -> "File":
normed_path = os.path.normpath(record_path)
dest_path = os.path.join(dest, normed_path)
assert_no_path_traversal(dest, dest_path)
return ZipBackedFile(record_path, dest_path, zip_file)
return make_root_scheme_file
def data_scheme_file_maker(
zip_file: ZipFile, scheme: Scheme
) -> Callable[[RecordPath], "File"]:
scheme_paths = {key: getattr(scheme, key) for key in SCHEME_KEYS}
def make_data_scheme_file(record_path: RecordPath) -> "File":
normed_path = os.path.normpath(record_path)
try:
_, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2)
except ValueError:
message = (
"Unexpected file in {}: {!r}. .data directory contents"
" should be named like: '<scheme key>/<path>'."
).format(wheel_path, record_path)
raise InstallationError(message)
try:
scheme_path = scheme_paths[scheme_key]
except KeyError:
valid_scheme_keys = ", ".join(sorted(scheme_paths))
message = (
"Unknown scheme key used in {}: {} (for file {!r}). .data"
" directory contents should be in subdirectories named"
" with a valid scheme key ({})"
).format(wheel_path, scheme_key, record_path, valid_scheme_keys)
raise InstallationError(message)
dest_path = os.path.join(scheme_path, dest_subpath)
assert_no_path_traversal(scheme_path, dest_path)
return ZipBackedFile(record_path, dest_path, zip_file)
return make_data_scheme_file
def is_data_scheme_path(path: RecordPath) -> bool:
return path.split("/", 1)[0].endswith(".data")
paths = cast(List[RecordPath], wheel_zip.namelist())
file_paths = filterfalse(is_dir_path, paths)
root_scheme_paths, data_scheme_paths = partition(is_data_scheme_path, file_paths)
make_root_scheme_file = root_scheme_file_maker(wheel_zip, lib_dir)
files: Iterator[File] = map(make_root_scheme_file, root_scheme_paths)
def is_script_scheme_path(path: RecordPath) -> bool:
parts = path.split("/", 2)
return len(parts) > 2 and parts[0].endswith(".data") and parts[1] == "scripts"
other_scheme_paths, script_scheme_paths = partition(
is_script_scheme_path, data_scheme_paths
)
make_data_scheme_file = data_scheme_file_maker(wheel_zip, scheme)
other_scheme_files = map(make_data_scheme_file, other_scheme_paths)
files = chain(files, other_scheme_files)
distribution = get_wheel_distribution(
FilesystemWheel(wheel_path),
canonicalize_name(name),
)
console, gui = get_entrypoints(distribution)
def is_entrypoint_wrapper(file: "File") -> bool:
path = file.dest_path
name = os.path.basename(path)
if name.lower().endswith(".exe"):
matchname = name[:-4]
elif name.lower().endswith("-script.py"):
matchname = name[:-10]
elif name.lower().endswith(".pya"):
matchname = name[:-4]
else:
matchname = name
return matchname in console or matchname in gui
script_scheme_files: Iterator[File] = map(
make_data_scheme_file, script_scheme_paths
)
script_scheme_files = filterfalse(is_entrypoint_wrapper, script_scheme_files)
script_scheme_files = map(ScriptFile, script_scheme_files)
files = chain(files, script_scheme_files)
for file in files:
file.save()
record_installed(file.src_record_path, file.dest_path, file.changed)
def pyc_source_file_paths() -> Generator[str, None, None]:
for installed_path in sorted(set(installed.values())):
full_installed_path = os.path.join(lib_dir, installed_path)
if not os.path.isfile(full_installed_path):
continue
if not full_installed_path.endswith(".py"):
continue
yield full_installed_path
def pyc_output_path(path: str) -> str:
return importlib.util.cache_from_source(path)
if pycompile:
with captured_stdout() as stdout:
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
for path in pyc_source_file_paths():
success = compileall.compile_file(path, force=True, quiet=True)
if success:
pyc_path = pyc_output_path(path)
assert os.path.exists(pyc_path)
pyc_record_path = cast(
"RecordPath", pyc_path.replace(os.path.sep, "/")
)
record_installed(pyc_record_path, pyc_path)
logger.debug(stdout.getvalue())
maker = PipScriptMaker(None, scheme.scripts)
maker.clobber = True
maker.variants = {""}
maker.set_mode = True
scripts_to_generate = get_console_script_specs(console)
gui_scripts_to_generate = list(starmap("{} = {}".format, gui.items()))
generated_console_scripts = maker.make_multiple(scripts_to_generate)
generated.extend(generated_console_scripts)
generated.extend(maker.make_multiple(gui_scripts_to_generate, {"gui": True}))
if warn_script_location:
msg = message_about_scripts_not_on_PATH(generated_console_scripts)
if msg is not None:
logger.warning(msg)
generated_file_mode = 0o666 & ~current_umask()
@contextlib.contextmanager
def _generate_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]:
with adjacent_tmp_file(path, **kwargs) as f:
yield f
os.chmod(f.name, generated_file_mode)
replace(f.name, path)
dest_info_dir = os.path.join(lib_dir, info_dir)
installer_path = os.path.join(dest_info_dir, "INSTALLER")
with _generate_file(installer_path) as installer_file:
installer_file.write(b"pip\n")
generated.append(installer_path)
if direct_url is not None:
direct_url_path = os.path.join(dest_info_dir, DIRECT_URL_METADATA_NAME)
with _generate_file(direct_url_path) as direct_url_file:
direct_url_file.write(direct_url.to_json().encode("utf-8"))
generated.append(direct_url_path)
if requested:
requested_path = os.path.join(dest_info_dir, "REQUESTED")
with open(requested_path, "wb"):
pass
generated.append(requested_path)
record_text = distribution.read_text("RECORD")
record_rows = list(csv.reader(record_text.splitlines()))
rows = get_csv_rows_for_installed(
record_rows,
installed=installed,
changed=changed,
generated=generated,
lib_dir=lib_dir,
)
record_path = os.path.join(dest_info_dir, "RECORD")
with _generate_file(record_path, **csv_io_kwargs("w")) as record_file:
writer = csv.writer(cast("IO[str]", record_file))
writer.writerows(_normalized_outrows(rows))
@contextlib.contextmanager
def req_error_context(req_description: str) -> Generator[None, None, None]:
try:
yield
except InstallationError as e:
message = f"For req: {req_description}. {e.args[0]}"
raise InstallationError(message) from e
def install_wheel(
name: str,
wheel_path: str,
scheme: Scheme,
req_description: str,
pycompile: bool = True,
warn_script_location: bool = True,
direct_url: Optional[DirectUrl] = None,
requested: bool = False,
) -> None:
with ZipFile(wheel_path, allowZip64=True) as z:
with req_error_context(req_description):
_install_wheel(
name=name,
wheel_zip=z,
wheel_path=wheel_path,
scheme=scheme,
pycompile=pycompile,
warn_script_location=warn_script_location,
direct_url=direct_url,
requested=requested,
)