import contextlib
import functools
import logging
import os
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, cast
from pip._vendor.packaging.utils import canonicalize_name
from pip._vendor.resolvelib import BaseReporter, ResolutionImpossible
from pip._vendor.resolvelib import Resolver as RLResolver
from pip._vendor.resolvelib.structs import DirectedGraph
from pip._internal.cache import WheelCache
from pip._internal.index.package_finder import PackageFinder
from pip._internal.operations.prepare import RequirementPreparer
from pip._internal.req.constructors import install_req_extend_extras
from pip._internal.req.req_install import InstallRequirement
from pip._internal.req.req_set import RequirementSet
from pip._internal.resolution.base import BaseResolver, InstallRequirementProvider
from pip._internal.resolution.resolvelib.provider import PipProvider
from pip._internal.resolution.resolvelib.reporter import (
PipDebuggingReporter,
PipReporter,
)
from pip._internal.utils.packaging import get_requirement
from .base import Candidate, Requirement
from .factory import Factory
if TYPE_CHECKING:
from pip._vendor.resolvelib.resolvers import Result as RLResult
Result = RLResult[Requirement, Candidate, str]
logger = logging.getLogger(__name__)
class Resolver(BaseResolver):
_allowed_strategies = {"eager", "only-if-needed", "to-satisfy-only"}
def __init__(
self,
preparer: RequirementPreparer,
finder: PackageFinder,
wheel_cache: Optional[WheelCache],
make_install_req: InstallRequirementProvider,
use_user_site: bool,
ignore_dependencies: bool,
ignore_installed: bool,
ignore_requires_python: bool,
force_reinstall: bool,
upgrade_strategy: str,
py_version_info: Optional[Tuple[int, ...]] = None,
):
super().__init__()
assert upgrade_strategy in self._allowed_strategies
self.factory = Factory(
finder=finder,
preparer=preparer,
make_install_req=make_install_req,
wheel_cache=wheel_cache,
use_user_site=use_user_site,
force_reinstall=force_reinstall,
ignore_installed=ignore_installed,
ignore_requires_python=ignore_requires_python,
py_version_info=py_version_info,
)
self.ignore_dependencies = ignore_dependencies
self.upgrade_strategy = upgrade_strategy
self._result: Optional[Result] = None
def resolve(
self, root_reqs: List[InstallRequirement], check_supported_wheels: bool
) -> RequirementSet:
collected = self.factory.collect_root_requirements(root_reqs)
provider = PipProvider(
factory=self.factory,
constraints=collected.constraints,
ignore_dependencies=self.ignore_dependencies,
upgrade_strategy=self.upgrade_strategy,
user_requested=collected.user_requested,
)
if "PIP_RESOLVER_DEBUG" in os.environ:
reporter: BaseReporter = PipDebuggingReporter()
else:
reporter = PipReporter()
resolver: RLResolver[Requirement, Candidate, str] = RLResolver(
provider,
reporter,
)
try:
limit_how_complex_resolution_can_be = 200000
result = self._result = resolver.resolve(
collected.requirements, max_rounds=limit_how_complex_resolution_can_be
)
except ResolutionImpossible as e:
error = self.factory.get_installation_error(
cast("ResolutionImpossible[Requirement, Candidate]", e),
collected.constraints,
)
raise error from e
req_set = RequirementSet(check_supported_wheels=check_supported_wheels)
for candidate in sorted(
result.mapping.values(), key=lambda c: c.name != c.project_name
):
ireq = candidate.get_install_requirement()
if ireq is None:
if candidate.name != candidate.project_name:
with contextlib.suppress(KeyError):
req = req_set.get_requirement(candidate.project_name)
req_set.add_named_requirement(
install_req_extend_extras(
req, get_requirement(candidate.name).extras
)
)
continue
installed_dist = self.factory.get_dist_to_uninstall(candidate)
if installed_dist is None:
ireq.should_reinstall = False
elif self.factory.force_reinstall:
ireq.should_reinstall = True
elif installed_dist.version != candidate.version:
ireq.should_reinstall = True
elif candidate.is_editable or installed_dist.editable:
ireq.should_reinstall = True
elif candidate.source_link and candidate.source_link.is_file:
if candidate.source_link.is_wheel:
logger.info(
"%s is already installed with the same version as the "
"provided wheel. Use --force-reinstall to force an "
"installation of the wheel.",
ireq.name,
)
continue
ireq.should_reinstall = True
else:
continue
link = candidate.source_link
if link and link.is_yanked:
msg = (
"The candidate selected for download or install is a "
"yanked version: {name!r} candidate (version {version} "
"at {link})\nReason for being yanked: {reason}"
).format(
name=candidate.name,
version=candidate.version,
link=link,
reason=link.yanked_reason or "<none given>",
)
logger.warning(msg)
req_set.add_named_requirement(ireq)
reqs = req_set.all_requirements
self.factory.preparer.prepare_linked_requirements_more(reqs)
for req in reqs:
req.prepared = True
req.needs_more_preparation = False
return req_set
def get_installation_order(
self, req_set: RequirementSet
) -> List[InstallRequirement]:
assert self._result is not None, "must call resolve() first"
if not req_set.requirements:
return []
graph = self._result.graph
weights = get_topological_weights(graph, set(req_set.requirements.keys()))
sorted_items = sorted(
req_set.requirements.items(),
key=functools.partial(_req_set_item_sorter, weights=weights),
reverse=True,
)
return [ireq for _, ireq in sorted_items]
def get_topological_weights(
graph: "DirectedGraph[Optional[str]]", requirement_keys: Set[str]
) -> Dict[Optional[str], int]:
path: Set[Optional[str]] = set()
weights: Dict[Optional[str], int] = {}
def visit(node: Optional[str]) -> None:
if node in path:
return
path.add(node)
for child in graph.iter_children(node):
visit(child)
path.remove(node)
if node not in requirement_keys:
return
last_known_parent_count = weights.get(node, 0)
weights[node] = max(last_known_parent_count, len(path))
while True:
leaves = set()
for key in graph:
if key is None:
continue
for _child in graph.iter_children(key):
break
else:
leaves.add(key)
if not leaves:
break
weight = len(graph) - 1
for leaf in leaves:
if leaf not in requirement_keys:
continue
weights[leaf] = weight
for leaf in leaves:
graph.remove(leaf)
visit(None)
difference = set(weights.keys()).difference(requirement_keys)
assert not difference, difference
return weights
def _req_set_item_sorter(
item: Tuple[str, InstallRequirement],
weights: Dict[Optional[str], int],
) -> Tuple[int, str]:
name = canonicalize_name(item[0])
return weights[name], name