import collections
import math
from typing import (
TYPE_CHECKING,
Dict,
Iterable,
Iterator,
Mapping,
Sequence,
TypeVar,
Union,
)
from pip._vendor.resolvelib.providers import AbstractProvider
from .base import Candidate, Constraint, Requirement
from .candidates import REQUIRES_PYTHON_IDENTIFIER
from .factory import Factory
if TYPE_CHECKING:
from pip._vendor.resolvelib.providers import Preference
from pip._vendor.resolvelib.resolvers import RequirementInformation
PreferenceInformation = RequirementInformation[Requirement, Candidate]
_ProviderBase = AbstractProvider[Requirement, Candidate, str]
else:
_ProviderBase = AbstractProvider
D = TypeVar("D")
V = TypeVar("V")
def _get_with_identifier(
mapping: Mapping[str, V],
identifier: str,
default: D,
) -> Union[D, V]:
if identifier in mapping:
return mapping[identifier]
name, open_bracket, _ = identifier.partition("[")
if open_bracket and name in mapping:
return mapping[name]
return default
class PipProvider(_ProviderBase):
def __init__(
self,
factory: Factory,
constraints: Dict[str, Constraint],
ignore_dependencies: bool,
upgrade_strategy: str,
user_requested: Dict[str, int],
) -> None:
self._factory = factory
self._constraints = constraints
self._ignore_dependencies = ignore_dependencies
self._upgrade_strategy = upgrade_strategy
self._user_requested = user_requested
self._known_depths: Dict[str, float] = collections.defaultdict(lambda: math.inf)
def identify(self, requirement_or_candidate: Union[Requirement, Candidate]) -> str:
return requirement_or_candidate.name
def get_preference(
self,
identifier: str,
resolutions: Mapping[str, Candidate],
candidates: Mapping[str, Iterator[Candidate]],
information: Mapping[str, Iterable["PreferenceInformation"]],
backtrack_causes: Sequence["PreferenceInformation"],
) -> "Preference":
try:
next(iter(information[identifier]))
except StopIteration:
has_information = False
else:
has_information = True
if has_information:
lookups = (r.get_candidate_lookup() for r, _ in information[identifier])
candidate, ireqs = zip(*lookups)
else:
candidate, ireqs = None, ()
operators = [
specifier.operator
for specifier_set in (ireq.specifier for ireq in ireqs if ireq)
for specifier in specifier_set
]
direct = candidate is not None
pinned = any(op[:2] == "==" for op in operators)
unfree = bool(operators)
try:
requested_order: Union[int, float] = self._user_requested[identifier]
except KeyError:
requested_order = math.inf
if has_information:
parent_depths = (
self._known_depths[parent.name] if parent is not None else 0.0
for _, parent in information[identifier]
)
inferred_depth = min(d for d in parent_depths) + 1.0
else:
inferred_depth = math.inf
else:
inferred_depth = 1.0
self._known_depths[identifier] = inferred_depth
requested_order = self._user_requested.get(identifier, math.inf)
requires_python = identifier == REQUIRES_PYTHON_IDENTIFIER
backtrack_cause = self.is_backtrack_cause(identifier, backtrack_causes)
return (
not requires_python,
not direct,
not pinned,
not backtrack_cause,
inferred_depth,
requested_order,
not unfree,
identifier,
)
def find_matches(
self,
identifier: str,
requirements: Mapping[str, Iterator[Requirement]],
incompatibilities: Mapping[str, Iterator[Candidate]],
) -> Iterable[Candidate]:
def _eligible_for_upgrade(identifier: str) -> bool:
if self._upgrade_strategy == "eager":
return True
elif self._upgrade_strategy == "only-if-needed":
user_order = _get_with_identifier(
self._user_requested,
identifier,
default=None,
)
return user_order is not None
return False
constraint = _get_with_identifier(
self._constraints,
identifier,
default=Constraint.empty(),
)
return self._factory.find_candidates(
identifier=identifier,
requirements=requirements,
constraint=constraint,
prefers_installed=(not _eligible_for_upgrade(identifier)),
incompatibilities=incompatibilities,
)
def is_satisfied_by(self, requirement: Requirement, candidate: Candidate) -> bool:
return requirement.is_satisfied_by(candidate)
def get_dependencies(self, candidate: Candidate) -> Sequence[Requirement]:
with_requires = not self._ignore_dependencies
return [r for r in candidate.iter_dependencies(with_requires) if r is not None]
@staticmethod
def is_backtrack_cause(
identifier: str, backtrack_causes: Sequence["PreferenceInformation"]
) -> bool:
for backtrack_cause in backtrack_causes:
if identifier == backtrack_cause.requirement.name:
return True
if backtrack_cause.parent and identifier == backtrack_cause.parent.name:
return True
return False