import csv
import email.message
import functools
import json
import logging
import pathlib
import re
import zipfile
from typing import (
IO,
TYPE_CHECKING,
Any,
Collection,
Container,
Dict,
Iterable,
Iterator,
List,
NamedTuple,
Optional,
Tuple,
Union,
)
from pip._vendor.packaging.requirements import Requirement
from pip._vendor.packaging.specifiers import InvalidSpecifier, SpecifierSet
from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
from pip._vendor.packaging.version import LegacyVersion, Version
from pip._internal.exceptions import NoneMetadataError
from pip._internal.locations import site_packages, user_site
from pip._internal.models.direct_url import (
DIRECT_URL_METADATA_NAME,
DirectUrl,
DirectUrlValidationError,
)
from pip._internal.utils.compat import stdlib_pkgs from pip._internal.utils.egg_link import egg_link_path_from_sys_path
from pip._internal.utils.misc import is_local, normalize_path
from pip._internal.utils.urls import url_to_path
from ._json import msg_to_json
if TYPE_CHECKING:
from typing import Protocol
else:
Protocol = object
DistributionVersion = Union[LegacyVersion, Version]
InfoPath = Union[str, pathlib.PurePath]
logger = logging.getLogger(__name__)
class BaseEntryPoint(Protocol):
@property
def name(self) -> str:
raise NotImplementedError()
@property
def value(self) -> str:
raise NotImplementedError()
@property
def group(self) -> str:
raise NotImplementedError()
def _convert_installed_files_path(
entry: Tuple[str, ...],
info: Tuple[str, ...],
) -> str:
while entry and entry[0] == "..":
if not info or info[-1] == "..":
info += ("..",)
else:
info = info[:-1]
entry = entry[1:]
return str(pathlib.Path(*info, *entry))
class RequiresEntry(NamedTuple):
requirement: str
extra: str
marker: str
class BaseDistribution(Protocol):
@classmethod
def from_directory(cls, directory: str) -> "BaseDistribution":
raise NotImplementedError()
@classmethod
def from_metadata_file_contents(
cls,
metadata_contents: bytes,
filename: str,
project_name: str,
) -> "BaseDistribution":
raise NotImplementedError()
@classmethod
def from_wheel(cls, wheel: "Wheel", name: str) -> "BaseDistribution":
raise NotImplementedError()
def __repr__(self) -> str:
return f"{self.raw_name} {self.version} ({self.location})"
def __str__(self) -> str:
return f"{self.raw_name} {self.version}"
@property
def location(self) -> Optional[str]:
raise NotImplementedError()
@property
def editable_project_location(self) -> Optional[str]:
direct_url = self.direct_url
if direct_url:
if direct_url.is_local_editable():
return url_to_path(direct_url.url)
else:
egg_link_path = egg_link_path_from_sys_path(self.raw_name)
if egg_link_path:
return self.location
return None
@property
def installed_location(self) -> Optional[str]:
raise NotImplementedError()
@property
def info_location(self) -> Optional[str]:
raise NotImplementedError()
@property
def installed_by_distutils(self) -> bool:
info_location = self.info_location
if not info_location:
return False
return pathlib.Path(info_location).is_file()
@property
def installed_as_egg(self) -> bool:
location = self.location
if not location:
return False
return location.endswith(".egg")
@property
def installed_with_setuptools_egg_info(self) -> bool:
info_location = self.info_location
if not info_location:
return False
if not info_location.endswith(".egg-info"):
return False
return pathlib.Path(info_location).is_dir()
@property
def installed_with_dist_info(self) -> bool:
info_location = self.info_location
if not info_location:
return False
if not info_location.endswith(".dist-info"):
return False
return pathlib.Path(info_location).is_dir()
@property
def canonical_name(self) -> NormalizedName:
raise NotImplementedError()
@property
def version(self) -> DistributionVersion:
raise NotImplementedError()
@property
def setuptools_filename(self) -> str:
return self.raw_name.replace("-", "_")
@property
def direct_url(self) -> Optional[DirectUrl]:
try:
content = self.read_text(DIRECT_URL_METADATA_NAME)
except FileNotFoundError:
return None
try:
return DirectUrl.from_json(content)
except (
UnicodeDecodeError,
json.JSONDecodeError,
DirectUrlValidationError,
) as e:
logger.warning(
"Error parsing %s for %s: %s",
DIRECT_URL_METADATA_NAME,
self.canonical_name,
e,
)
return None
@property
def installer(self) -> str:
try:
installer_text = self.read_text("INSTALLER")
except (OSError, ValueError, NoneMetadataError):
return "" for line in installer_text.splitlines():
cleaned_line = line.strip()
if cleaned_line:
return cleaned_line
return ""
@property
def requested(self) -> bool:
return self.is_file("REQUESTED")
@property
def editable(self) -> bool:
return bool(self.editable_project_location)
@property
def local(self) -> bool:
if self.installed_location is None:
return False
return is_local(self.installed_location)
@property
def in_usersite(self) -> bool:
if self.installed_location is None or user_site is None:
return False
return self.installed_location.startswith(normalize_path(user_site))
@property
def in_site_packages(self) -> bool:
if self.installed_location is None or site_packages is None:
return False
return self.installed_location.startswith(normalize_path(site_packages))
def is_file(self, path: InfoPath) -> bool:
raise NotImplementedError()
def iter_distutils_script_names(self) -> Iterator[str]:
raise NotImplementedError()
def read_text(self, path: InfoPath) -> str:
raise NotImplementedError()
def iter_entry_points(self) -> Iterable[BaseEntryPoint]:
raise NotImplementedError()
def _metadata_impl(self) -> email.message.Message:
raise NotImplementedError()
@functools.lru_cache(maxsize=1)
def _metadata_cached(self) -> email.message.Message:
metadata = self._metadata_impl()
self._add_egg_info_requires(metadata)
return metadata
@property
def metadata(self) -> email.message.Message:
return self._metadata_cached()
@property
def metadata_dict(self) -> Dict[str, Any]:
return msg_to_json(self.metadata)
@property
def metadata_version(self) -> Optional[str]:
return self.metadata.get("Metadata-Version")
@property
def raw_name(self) -> str:
return self.metadata.get("Name", self.canonical_name)
@property
def requires_python(self) -> SpecifierSet:
value = self.metadata.get("Requires-Python")
if value is None:
return SpecifierSet()
try:
spec = SpecifierSet(str(value))
except InvalidSpecifier as e:
message = "Package %r has an invalid Requires-Python: %s"
logger.warning(message, self.raw_name, e)
return SpecifierSet()
return spec
def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]:
raise NotImplementedError()
def iter_provided_extras(self) -> Iterable[str]:
raise NotImplementedError()
def is_extra_provided(self, extra: str) -> bool:
raise NotImplementedError()
def _iter_declared_entries_from_record(self) -> Optional[Iterator[str]]:
try:
text = self.read_text("RECORD")
except FileNotFoundError:
return None
return (str(pathlib.Path(row[0])) for row in csv.reader(text.splitlines()))
def _iter_declared_entries_from_legacy(self) -> Optional[Iterator[str]]:
try:
text = self.read_text("installed-files.txt")
except FileNotFoundError:
return None
paths = (p for p in text.splitlines(keepends=False) if p)
root = self.location
info = self.info_location
if root is None or info is None:
return paths
try:
info_rel = pathlib.Path(info).relative_to(root)
except ValueError: return paths
if not info_rel.parts: return paths
return (
_convert_installed_files_path(pathlib.Path(p).parts, info_rel.parts)
for p in paths
)
def iter_declared_entries(self) -> Optional[Iterator[str]]:
return (
self._iter_declared_entries_from_record()
or self._iter_declared_entries_from_legacy()
)
def _iter_requires_txt_entries(self) -> Iterator[RequiresEntry]:
try:
content = self.read_text("requires.txt")
except FileNotFoundError:
return
extra = marker = "" for line in content.splitlines():
line = line.strip()
if not line or line.startswith("#"): continue
if line.startswith("[") and line.endswith("]"): extra, _, marker = line.strip("[]").partition(":")
continue
yield RequiresEntry(requirement=line, extra=extra, marker=marker)
def _iter_egg_info_extras(self) -> Iterable[str]:
known_extras = {""}
for entry in self._iter_requires_txt_entries():
extra = canonicalize_name(entry.extra)
if extra in known_extras:
continue
known_extras.add(extra)
yield extra
def _iter_egg_info_dependencies(self) -> Iterable[str]:
for entry in self._iter_requires_txt_entries():
extra = canonicalize_name(entry.extra)
if extra and entry.marker:
marker = f'({entry.marker}) and extra == "{extra}"'
elif extra:
marker = f'extra == "{extra}"'
elif entry.marker:
marker = entry.marker
else:
marker = ""
if marker:
yield f"{entry.requirement} ; {marker}"
else:
yield entry.requirement
def _add_egg_info_requires(self, metadata: email.message.Message) -> None:
if not metadata.get_all("Requires-Dist"):
for dep in self._iter_egg_info_dependencies():
metadata["Requires-Dist"] = dep
if not metadata.get_all("Provides-Extra"):
for extra in self._iter_egg_info_extras():
metadata["Provides-Extra"] = extra
class BaseEnvironment:
@classmethod
def default(cls) -> "BaseEnvironment":
raise NotImplementedError()
@classmethod
def from_paths(cls, paths: Optional[List[str]]) -> "BaseEnvironment":
raise NotImplementedError()
def get_distribution(self, name: str) -> Optional["BaseDistribution"]:
raise NotImplementedError()
def _iter_distributions(self) -> Iterator["BaseDistribution"]:
raise NotImplementedError()
def iter_all_distributions(self) -> Iterator[BaseDistribution]:
for dist in self._iter_distributions():
project_name_valid = re.match(
r"^([A-Z0-9]|[A-Z0-9][A-Z0-9._-]*[A-Z0-9])$",
dist.canonical_name,
flags=re.IGNORECASE,
)
if not project_name_valid:
logger.warning(
"Ignoring invalid distribution %s (%s)",
dist.canonical_name,
dist.location,
)
continue
yield dist
def iter_installed_distributions(
self,
local_only: bool = True,
skip: Container[str] = stdlib_pkgs,
include_editables: bool = True,
editables_only: bool = False,
user_only: bool = False,
) -> Iterator[BaseDistribution]:
it = self.iter_all_distributions()
if local_only:
it = (d for d in it if d.local)
if not include_editables:
it = (d for d in it if not d.editable)
if editables_only:
it = (d for d in it if d.editable)
if user_only:
it = (d for d in it if d.in_usersite)
return (d for d in it if d.canonical_name not in skip)
class Wheel(Protocol):
location: str
def as_zipfile(self) -> zipfile.ZipFile:
raise NotImplementedError()
class FilesystemWheel(Wheel):
def __init__(self, location: str) -> None:
self.location = location
def as_zipfile(self) -> zipfile.ZipFile:
return zipfile.ZipFile(self.location, allowZip64=True)
class MemoryWheel(Wheel):
def __init__(self, location: str, stream: IO[bytes]) -> None:
self.location = location
self.stream = stream
def as_zipfile(self) -> zipfile.ZipFile:
return zipfile.ZipFile(self.stream, allowZip64=True)