import functools
import importlib.metadata
import logging
import os
import pathlib
import sys
import zipfile
import zipimport
from typing import Iterator, List, Optional, Sequence, Set, Tuple
from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
from pip._internal.metadata.base import BaseDistribution, BaseEnvironment
from pip._internal.models.wheel import Wheel
from pip._internal.utils.deprecation import deprecated
from pip._internal.utils.filetypes import WHEEL_EXTENSION
from ._compat import BadMetadata, BasePath, get_dist_name, get_info_location
from ._dists import Distribution
logger = logging.getLogger(__name__)
def _looks_like_wheel(location: str) -> bool:
if not location.endswith(WHEEL_EXTENSION):
return False
if not os.path.isfile(location):
return False
if not Wheel.wheel_file_re.match(os.path.basename(location)):
return False
return zipfile.is_zipfile(location)
class _DistributionFinder:
FoundResult = Tuple[importlib.metadata.Distribution, Optional[BasePath]]
def __init__(self) -> None:
self._found_names: Set[NormalizedName] = set()
def _find_impl(self, location: str) -> Iterator[FoundResult]:
if _looks_like_wheel(location):
return
for dist in importlib.metadata.distributions(path=[location]):
info_location = get_info_location(dist)
try:
raw_name = get_dist_name(dist)
except BadMetadata as e:
logger.warning("Skipping %s due to %s", info_location, e.reason)
continue
normalized_name = canonicalize_name(raw_name)
if normalized_name in self._found_names:
continue
self._found_names.add(normalized_name)
yield dist, info_location
def find(self, location: str) -> Iterator[BaseDistribution]:
for dist, info_location in self._find_impl(location):
if info_location is None:
installed_location: Optional[BasePath] = None
else:
installed_location = info_location.parent
yield Distribution(dist, info_location, installed_location)
def find_linked(self, location: str) -> Iterator[BaseDistribution]:
path = pathlib.Path(location)
if not path.is_dir():
return
for child in path.iterdir():
if child.suffix != ".egg-link":
continue
with child.open() as f:
lines = (line.strip() for line in f)
target_rel = next((line for line in lines if line), "")
if not target_rel:
continue
target_location = str(path.joinpath(target_rel))
for dist, info_location in self._find_impl(target_location):
yield Distribution(dist, info_location, path)
def _find_eggs_in_dir(self, location: str) -> Iterator[BaseDistribution]:
from pip._vendor.pkg_resources import find_distributions
from pip._internal.metadata import pkg_resources as legacy
with os.scandir(location) as it:
for entry in it:
if not entry.name.endswith(".egg"):
continue
for dist in find_distributions(entry.path):
yield legacy.Distribution(dist)
def _find_eggs_in_zip(self, location: str) -> Iterator[BaseDistribution]:
from pip._vendor.pkg_resources import find_eggs_in_zip
from pip._internal.metadata import pkg_resources as legacy
try:
importer = zipimport.zipimporter(location)
except zipimport.ZipImportError:
return
for dist in find_eggs_in_zip(importer, location):
yield legacy.Distribution(dist)
def find_eggs(self, location: str) -> Iterator[BaseDistribution]:
if os.path.isdir(location):
yield from self._find_eggs_in_dir(location)
if zipfile.is_zipfile(location):
yield from self._find_eggs_in_zip(location)
@functools.lru_cache(maxsize=None) def _emit_egg_deprecation(location: Optional[str]) -> None:
deprecated(
reason=f"Loading egg at {location} is deprecated.",
replacement="to use pip for package installation.",
gone_in="24.3",
issue=12330,
)
class Environment(BaseEnvironment):
def __init__(self, paths: Sequence[str]) -> None:
self._paths = paths
@classmethod
def default(cls) -> BaseEnvironment:
return cls(sys.path)
@classmethod
def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment:
if paths is None:
return cls(sys.path)
return cls(paths)
def _iter_distributions(self) -> Iterator[BaseDistribution]:
finder = _DistributionFinder()
for location in self._paths:
yield from finder.find(location)
for dist in finder.find_eggs(location):
_emit_egg_deprecation(dist.location)
yield dist
yield from finder.find_linked(location)
def get_distribution(self, name: str) -> Optional[BaseDistribution]:
matches = (
distribution
for distribution in self.iter_all_distributions()
if distribution.canonical_name == canonicalize_name(name)
)
return next(matches, None)