import enum
import functools
import itertools
import logging
import re
from typing import TYPE_CHECKING, FrozenSet, Iterable, List, Optional, Set, Tuple, Union
from pip._vendor.packaging import specifiers
from pip._vendor.packaging.tags import Tag
from pip._vendor.packaging.utils import canonicalize_name
from pip._vendor.packaging.version import _BaseVersion
from pip._vendor.packaging.version import parse as parse_version
from pip._internal.exceptions import (
BestVersionAlreadyInstalled,
DistributionNotFound,
InvalidWheelFilename,
UnsupportedWheel,
)
from pip._internal.index.collector import LinkCollector, parse_links
from pip._internal.models.candidate import InstallationCandidate
from pip._internal.models.format_control import FormatControl
from pip._internal.models.link import Link
from pip._internal.models.search_scope import SearchScope
from pip._internal.models.selection_prefs import SelectionPreferences
from pip._internal.models.target_python import TargetPython
from pip._internal.models.wheel import Wheel
from pip._internal.req import InstallRequirement
from pip._internal.utils._log import getLogger
from pip._internal.utils.filetypes import WHEEL_EXTENSION
from pip._internal.utils.hashes import Hashes
from pip._internal.utils.logging import indent_log
from pip._internal.utils.misc import build_netloc
from pip._internal.utils.packaging import check_requires_python
from pip._internal.utils.unpacking import SUPPORTED_EXTENSIONS
if TYPE_CHECKING:
from pip._vendor.typing_extensions import TypeGuard
__all__ = ["FormatControl", "BestCandidateResult", "PackageFinder"]
logger = getLogger(__name__)
BuildTag = Union[Tuple[()], Tuple[int, str]]
CandidateSortingKey = Tuple[int, int, int, _BaseVersion, Optional[int], BuildTag]
def _check_link_requires_python(
link: Link,
version_info: Tuple[int, int, int],
ignore_requires_python: bool = False,
) -> bool:
try:
is_compatible = check_requires_python(
link.requires_python,
version_info=version_info,
)
except specifiers.InvalidSpecifier:
logger.debug(
"Ignoring invalid Requires-Python (%r) for link: %s",
link.requires_python,
link,
)
else:
if not is_compatible:
version = ".".join(map(str, version_info))
if not ignore_requires_python:
logger.verbose(
"Link requires a different Python (%s not in: %r): %s",
version,
link.requires_python,
link,
)
return False
logger.debug(
"Ignoring failed Requires-Python check (%s not in: %r) for link: %s",
version,
link.requires_python,
link,
)
return True
class LinkType(enum.Enum):
candidate = enum.auto()
different_project = enum.auto()
yanked = enum.auto()
format_unsupported = enum.auto()
format_invalid = enum.auto()
platform_mismatch = enum.auto()
requires_python_mismatch = enum.auto()
class LinkEvaluator:
_py_version_re = re.compile(r"-py([123]\.?[0-9]?)$")
def __init__(
self,
project_name: str,
canonical_name: str,
formats: FrozenSet[str],
target_python: TargetPython,
allow_yanked: bool,
ignore_requires_python: Optional[bool] = None,
) -> None:
if ignore_requires_python is None:
ignore_requires_python = False
self._allow_yanked = allow_yanked
self._canonical_name = canonical_name
self._ignore_requires_python = ignore_requires_python
self._formats = formats
self._target_python = target_python
self.project_name = project_name
def evaluate_link(self, link: Link) -> Tuple[LinkType, str]:
version = None
if link.is_yanked and not self._allow_yanked:
reason = link.yanked_reason or "<none given>"
return (LinkType.yanked, f"yanked for reason: {reason}")
if link.egg_fragment:
egg_info = link.egg_fragment
ext = link.ext
else:
egg_info, ext = link.splitext()
if not ext:
return (LinkType.format_unsupported, "not a file")
if ext not in SUPPORTED_EXTENSIONS:
return (
LinkType.format_unsupported,
f"unsupported archive format: {ext}",
)
if "binary" not in self._formats and ext == WHEEL_EXTENSION:
reason = f"No binaries permitted for {self.project_name}"
return (LinkType.format_unsupported, reason)
if "macosx10" in link.path and ext == ".zip":
return (LinkType.format_unsupported, "macosx10 one")
if ext == WHEEL_EXTENSION:
try:
wheel = Wheel(link.filename)
except InvalidWheelFilename:
return (
LinkType.format_invalid,
"invalid wheel filename",
)
if canonicalize_name(wheel.name) != self._canonical_name:
reason = f"wrong project name (not {self.project_name})"
return (LinkType.different_project, reason)
supported_tags = self._target_python.get_unsorted_tags()
if not wheel.supported(supported_tags):
file_tags = ", ".join(wheel.get_formatted_file_tags())
reason = (
f"none of the wheel's tags ({file_tags}) are compatible "
f"(run pip debug --verbose to show compatible tags)"
)
return (LinkType.platform_mismatch, reason)
version = wheel.version
if "source" not in self._formats and ext != WHEEL_EXTENSION:
reason = f"No sources permitted for {self.project_name}"
return (LinkType.format_unsupported, reason)
if not version:
version = _extract_version_from_fragment(
egg_info,
self._canonical_name,
)
if not version:
reason = f"Missing project version for {self.project_name}"
return (LinkType.format_invalid, reason)
match = self._py_version_re.search(version)
if match:
version = version[: match.start()]
py_version = match.group(1)
if py_version != self._target_python.py_version:
return (
LinkType.platform_mismatch,
"Python version is incorrect",
)
supports_python = _check_link_requires_python(
link,
version_info=self._target_python.py_version_info,
ignore_requires_python=self._ignore_requires_python,
)
if not supports_python:
reason = f"{version} Requires-Python {link.requires_python}"
return (LinkType.requires_python_mismatch, reason)
logger.debug("Found link %s, version: %s", link, version)
return (LinkType.candidate, version)
def filter_unallowed_hashes(
candidates: List[InstallationCandidate],
hashes: Optional[Hashes],
project_name: str,
) -> List[InstallationCandidate]:
if not hashes:
logger.debug(
"Given no hashes to check %s links for project %r: "
"discarding no candidates",
len(candidates),
project_name,
)
return list(candidates)
matches_or_no_digest = []
non_matches = []
match_count = 0
for candidate in candidates:
link = candidate.link
if not link.has_hash:
pass
elif link.is_hash_allowed(hashes=hashes):
match_count += 1
else:
non_matches.append(candidate)
continue
matches_or_no_digest.append(candidate)
if match_count:
filtered = matches_or_no_digest
else:
filtered = list(candidates)
if len(filtered) == len(candidates):
discard_message = "discarding no candidates"
else:
discard_message = "discarding {} non-matches:\n {}".format(
len(non_matches),
"\n ".join(str(candidate.link) for candidate in non_matches),
)
logger.debug(
"Checked %s links for project %r against %s hashes "
"(%s matches, %s no digest): %s",
len(candidates),
project_name,
hashes.digest_count,
match_count,
len(matches_or_no_digest) - match_count,
discard_message,
)
return filtered
class CandidatePreferences:
def __init__(
self,
prefer_binary: bool = False,
allow_all_prereleases: bool = False,
) -> None:
self.allow_all_prereleases = allow_all_prereleases
self.prefer_binary = prefer_binary
class BestCandidateResult:
def __init__(
self,
candidates: List[InstallationCandidate],
applicable_candidates: List[InstallationCandidate],
best_candidate: Optional[InstallationCandidate],
) -> None:
assert set(applicable_candidates) <= set(candidates)
if best_candidate is None:
assert not applicable_candidates
else:
assert best_candidate in applicable_candidates
self._applicable_candidates = applicable_candidates
self._candidates = candidates
self.best_candidate = best_candidate
def iter_all(self) -> Iterable[InstallationCandidate]:
return iter(self._candidates)
def iter_applicable(self) -> Iterable[InstallationCandidate]:
return iter(self._applicable_candidates)
class CandidateEvaluator:
@classmethod
def create(
cls,
project_name: str,
target_python: Optional[TargetPython] = None,
prefer_binary: bool = False,
allow_all_prereleases: bool = False,
specifier: Optional[specifiers.BaseSpecifier] = None,
hashes: Optional[Hashes] = None,
) -> "CandidateEvaluator":
if target_python is None:
target_python = TargetPython()
if specifier is None:
specifier = specifiers.SpecifierSet()
supported_tags = target_python.get_sorted_tags()
return cls(
project_name=project_name,
supported_tags=supported_tags,
specifier=specifier,
prefer_binary=prefer_binary,
allow_all_prereleases=allow_all_prereleases,
hashes=hashes,
)
def __init__(
self,
project_name: str,
supported_tags: List[Tag],
specifier: specifiers.BaseSpecifier,
prefer_binary: bool = False,
allow_all_prereleases: bool = False,
hashes: Optional[Hashes] = None,
) -> None:
self._allow_all_prereleases = allow_all_prereleases
self._hashes = hashes
self._prefer_binary = prefer_binary
self._project_name = project_name
self._specifier = specifier
self._supported_tags = supported_tags
self._wheel_tag_preferences = {
tag: idx for idx, tag in enumerate(supported_tags)
}
def get_applicable_candidates(
self,
candidates: List[InstallationCandidate],
) -> List[InstallationCandidate]:
allow_prereleases = self._allow_all_prereleases or None
specifier = self._specifier
versions = {
str(v)
for v in specifier.filter(
(str(c.version) for c in candidates),
prereleases=allow_prereleases,
)
}
applicable_candidates = [c for c in candidates if str(c.version) in versions]
filtered_applicable_candidates = filter_unallowed_hashes(
candidates=applicable_candidates,
hashes=self._hashes,
project_name=self._project_name,
)
return sorted(filtered_applicable_candidates, key=self._sort_key)
def _sort_key(self, candidate: InstallationCandidate) -> CandidateSortingKey:
valid_tags = self._supported_tags
support_num = len(valid_tags)
build_tag: BuildTag = ()
binary_preference = 0
link = candidate.link
if link.is_wheel:
wheel = Wheel(link.filename)
try:
pri = -(
wheel.find_most_preferred_tag(
valid_tags, self._wheel_tag_preferences
)
)
except ValueError:
raise UnsupportedWheel(
f"{wheel.filename} is not a supported wheel for this platform. It "
"can't be sorted."
)
if self._prefer_binary:
binary_preference = 1
if wheel.build_tag is not None:
match = re.match(r"^(\d+)(.*)$", wheel.build_tag)
assert match is not None, "guaranteed by filename validation"
build_tag_groups = match.groups()
build_tag = (int(build_tag_groups[0]), build_tag_groups[1])
else: pri = -(support_num)
has_allowed_hash = int(link.is_hash_allowed(self._hashes))
yank_value = -1 * int(link.is_yanked) return (
has_allowed_hash,
yank_value,
binary_preference,
candidate.version,
pri,
build_tag,
)
def sort_best_candidate(
self,
candidates: List[InstallationCandidate],
) -> Optional[InstallationCandidate]:
if not candidates:
return None
best_candidate = max(candidates, key=self._sort_key)
return best_candidate
def compute_best_candidate(
self,
candidates: List[InstallationCandidate],
) -> BestCandidateResult:
applicable_candidates = self.get_applicable_candidates(candidates)
best_candidate = self.sort_best_candidate(applicable_candidates)
return BestCandidateResult(
candidates,
applicable_candidates=applicable_candidates,
best_candidate=best_candidate,
)
class PackageFinder:
def __init__(
self,
link_collector: LinkCollector,
target_python: TargetPython,
allow_yanked: bool,
format_control: Optional[FormatControl] = None,
candidate_prefs: Optional[CandidatePreferences] = None,
ignore_requires_python: Optional[bool] = None,
) -> None:
if candidate_prefs is None:
candidate_prefs = CandidatePreferences()
format_control = format_control or FormatControl(set(), set())
self._allow_yanked = allow_yanked
self._candidate_prefs = candidate_prefs
self._ignore_requires_python = ignore_requires_python
self._link_collector = link_collector
self._target_python = target_python
self.format_control = format_control
self._logged_links: Set[Tuple[Link, LinkType, str]] = set()
@classmethod
def create(
cls,
link_collector: LinkCollector,
selection_prefs: SelectionPreferences,
target_python: Optional[TargetPython] = None,
) -> "PackageFinder":
if target_python is None:
target_python = TargetPython()
candidate_prefs = CandidatePreferences(
prefer_binary=selection_prefs.prefer_binary,
allow_all_prereleases=selection_prefs.allow_all_prereleases,
)
return cls(
candidate_prefs=candidate_prefs,
link_collector=link_collector,
target_python=target_python,
allow_yanked=selection_prefs.allow_yanked,
format_control=selection_prefs.format_control,
ignore_requires_python=selection_prefs.ignore_requires_python,
)
@property
def target_python(self) -> TargetPython:
return self._target_python
@property
def search_scope(self) -> SearchScope:
return self._link_collector.search_scope
@search_scope.setter
def search_scope(self, search_scope: SearchScope) -> None:
self._link_collector.search_scope = search_scope
@property
def find_links(self) -> List[str]:
return self._link_collector.find_links
@property
def index_urls(self) -> List[str]:
return self.search_scope.index_urls
@property
def trusted_hosts(self) -> Iterable[str]:
for host_port in self._link_collector.session.pip_trusted_origins:
yield build_netloc(*host_port)
@property
def allow_all_prereleases(self) -> bool:
return self._candidate_prefs.allow_all_prereleases
def set_allow_all_prereleases(self) -> None:
self._candidate_prefs.allow_all_prereleases = True
@property
def prefer_binary(self) -> bool:
return self._candidate_prefs.prefer_binary
def set_prefer_binary(self) -> None:
self._candidate_prefs.prefer_binary = True
def requires_python_skipped_reasons(self) -> List[str]:
reasons = {
detail
for _, result, detail in self._logged_links
if result == LinkType.requires_python_mismatch
}
return sorted(reasons)
def make_link_evaluator(self, project_name: str) -> LinkEvaluator:
canonical_name = canonicalize_name(project_name)
formats = self.format_control.get_allowed_formats(canonical_name)
return LinkEvaluator(
project_name=project_name,
canonical_name=canonical_name,
formats=formats,
target_python=self._target_python,
allow_yanked=self._allow_yanked,
ignore_requires_python=self._ignore_requires_python,
)
def _sort_links(self, links: Iterable[Link]) -> List[Link]:
eggs, no_eggs = [], []
seen: Set[Link] = set()
for link in links:
if link not in seen:
seen.add(link)
if link.egg_fragment:
eggs.append(link)
else:
no_eggs.append(link)
return no_eggs + eggs
def _log_skipped_link(self, link: Link, result: LinkType, detail: str) -> None:
entry = (link, result, detail)
if entry not in self._logged_links:
logger.debug("Skipping link: %s: %s", detail, link)
self._logged_links.add(entry)
def get_install_candidate(
self, link_evaluator: LinkEvaluator, link: Link
) -> Optional[InstallationCandidate]:
result, detail = link_evaluator.evaluate_link(link)
if result != LinkType.candidate:
self._log_skipped_link(link, result, detail)
return None
return InstallationCandidate(
name=link_evaluator.project_name,
link=link,
version=detail,
)
def evaluate_links(
self, link_evaluator: LinkEvaluator, links: Iterable[Link]
) -> List[InstallationCandidate]:
candidates = []
for link in self._sort_links(links):
candidate = self.get_install_candidate(link_evaluator, link)
if candidate is not None:
candidates.append(candidate)
return candidates
def process_project_url(
self, project_url: Link, link_evaluator: LinkEvaluator
) -> List[InstallationCandidate]:
logger.debug(
"Fetching project page and analyzing links: %s",
project_url,
)
index_response = self._link_collector.fetch_response(project_url)
if index_response is None:
return []
page_links = list(parse_links(index_response))
with indent_log():
package_links = self.evaluate_links(
link_evaluator,
links=page_links,
)
return package_links
@functools.lru_cache(maxsize=None)
def find_all_candidates(self, project_name: str) -> List[InstallationCandidate]:
link_evaluator = self.make_link_evaluator(project_name)
collected_sources = self._link_collector.collect_sources(
project_name=project_name,
candidates_from_page=functools.partial(
self.process_project_url,
link_evaluator=link_evaluator,
),
)
page_candidates_it = itertools.chain.from_iterable(
source.page_candidates()
for sources in collected_sources
for source in sources
if source is not None
)
page_candidates = list(page_candidates_it)
file_links_it = itertools.chain.from_iterable(
source.file_links()
for sources in collected_sources
for source in sources
if source is not None
)
file_candidates = self.evaluate_links(
link_evaluator,
sorted(file_links_it, reverse=True),
)
if logger.isEnabledFor(logging.DEBUG) and file_candidates:
paths = []
for candidate in file_candidates:
assert candidate.link.url try:
paths.append(candidate.link.file_path)
except Exception:
paths.append(candidate.link.url)
logger.debug("Local files found: %s", ", ".join(paths))
return file_candidates + page_candidates
def make_candidate_evaluator(
self,
project_name: str,
specifier: Optional[specifiers.BaseSpecifier] = None,
hashes: Optional[Hashes] = None,
) -> CandidateEvaluator:
candidate_prefs = self._candidate_prefs
return CandidateEvaluator.create(
project_name=project_name,
target_python=self._target_python,
prefer_binary=candidate_prefs.prefer_binary,
allow_all_prereleases=candidate_prefs.allow_all_prereleases,
specifier=specifier,
hashes=hashes,
)
@functools.lru_cache(maxsize=None)
def find_best_candidate(
self,
project_name: str,
specifier: Optional[specifiers.BaseSpecifier] = None,
hashes: Optional[Hashes] = None,
) -> BestCandidateResult:
candidates = self.find_all_candidates(project_name)
candidate_evaluator = self.make_candidate_evaluator(
project_name=project_name,
specifier=specifier,
hashes=hashes,
)
return candidate_evaluator.compute_best_candidate(candidates)
def find_requirement(
self, req: InstallRequirement, upgrade: bool
) -> Optional[InstallationCandidate]:
hashes = req.hashes(trust_internet=False)
best_candidate_result = self.find_best_candidate(
req.name,
specifier=req.specifier,
hashes=hashes,
)
best_candidate = best_candidate_result.best_candidate
installed_version: Optional[_BaseVersion] = None
if req.satisfied_by is not None:
installed_version = req.satisfied_by.version
def _format_versions(cand_iter: Iterable[InstallationCandidate]) -> str:
return (
", ".join(
sorted(
{str(c.version) for c in cand_iter},
key=parse_version,
)
)
or "none"
)
if installed_version is None and best_candidate is None:
logger.critical(
"Could not find a version that satisfies the requirement %s "
"(from versions: %s)",
req,
_format_versions(best_candidate_result.iter_all()),
)
raise DistributionNotFound(f"No matching distribution found for {req}")
def _should_install_candidate(
candidate: Optional[InstallationCandidate],
) -> "TypeGuard[InstallationCandidate]":
if installed_version is None:
return True
if best_candidate is None:
return False
return best_candidate.version > installed_version
if not upgrade and installed_version is not None:
if _should_install_candidate(best_candidate):
logger.debug(
"Existing installed version (%s) satisfies requirement "
"(most up-to-date version is %s)",
installed_version,
best_candidate.version,
)
else:
logger.debug(
"Existing installed version (%s) is most up-to-date and "
"satisfies requirement",
installed_version,
)
return None
if _should_install_candidate(best_candidate):
logger.debug(
"Using version %s (newest of versions: %s)",
best_candidate.version,
_format_versions(best_candidate_result.iter_applicable()),
)
return best_candidate
logger.debug(
"Installed version (%s) is most up-to-date (past versions: %s)",
installed_version,
_format_versions(best_candidate_result.iter_applicable()),
)
raise BestVersionAlreadyInstalled
def _find_name_version_sep(fragment: str, canonical_name: str) -> int:
for i, c in enumerate(fragment):
if c != "-":
continue
if canonicalize_name(fragment[:i]) == canonical_name:
return i
raise ValueError(f"{fragment} does not match {canonical_name}")
def _extract_version_from_fragment(fragment: str, canonical_name: str) -> Optional[str]:
try:
version_start = _find_name_version_sep(fragment, canonical_name) + 1
except ValueError:
return None
version = fragment[version_start:]
if not version:
return None
return version