import logging
import sys
from collections import defaultdict
from itertools import chain
from typing import DefaultDict, Iterable, List, Optional, Set, Tuple
from pip._vendor.packaging import specifiers
from pip._vendor.packaging.requirements import Requirement
from pip._internal.cache import WheelCache
from pip._internal.exceptions import (
BestVersionAlreadyInstalled,
DistributionNotFound,
HashError,
HashErrors,
InstallationError,
NoneMetadataError,
UnsupportedPythonVersion,
)
from pip._internal.index.package_finder import PackageFinder
from pip._internal.metadata import BaseDistribution
from pip._internal.models.link import Link
from pip._internal.models.wheel import Wheel
from pip._internal.operations.prepare import RequirementPreparer
from pip._internal.req.req_install import (
InstallRequirement,
check_invalid_constraint_type,
)
from pip._internal.req.req_set import RequirementSet
from pip._internal.resolution.base import BaseResolver, InstallRequirementProvider
from pip._internal.utils import compatibility_tags
from pip._internal.utils.compatibility_tags import get_supported
from pip._internal.utils.direct_url_helpers import direct_url_from_link
from pip._internal.utils.logging import indent_log
from pip._internal.utils.misc import normalize_version_info
from pip._internal.utils.packaging import check_requires_python
logger = logging.getLogger(__name__)
DiscoveredDependencies = DefaultDict[str, List[InstallRequirement]]
def _check_dist_requires_python(
dist: BaseDistribution,
version_info: Tuple[int, int, int],
ignore_requires_python: bool = False,
) -> None:
try:
requires_python = str(dist.requires_python)
except FileNotFoundError as e:
raise NoneMetadataError(dist, str(e))
try:
is_compatible = check_requires_python(
requires_python,
version_info=version_info,
)
except specifiers.InvalidSpecifier as exc:
logger.warning(
"Package %r has an invalid Requires-Python: %s", dist.raw_name, exc
)
return
if is_compatible:
return
version = ".".join(map(str, version_info))
if ignore_requires_python:
logger.debug(
"Ignoring failed Requires-Python check for package %r: %s not in %r",
dist.raw_name,
version,
requires_python,
)
return
raise UnsupportedPythonVersion(
"Package {!r} requires a different Python: {} not in {!r}".format(
dist.raw_name, version, requires_python
)
)
class Resolver(BaseResolver):
_allowed_strategies = {"eager", "only-if-needed", "to-satisfy-only"}
def __init__(
self,
preparer: RequirementPreparer,
finder: PackageFinder,
wheel_cache: Optional[WheelCache],
make_install_req: InstallRequirementProvider,
use_user_site: bool,
ignore_dependencies: bool,
ignore_installed: bool,
ignore_requires_python: bool,
force_reinstall: bool,
upgrade_strategy: str,
py_version_info: Optional[Tuple[int, ...]] = None,
) -> None:
super().__init__()
assert upgrade_strategy in self._allowed_strategies
if py_version_info is None:
py_version_info = sys.version_info[:3]
else:
py_version_info = normalize_version_info(py_version_info)
self._py_version_info = py_version_info
self.preparer = preparer
self.finder = finder
self.wheel_cache = wheel_cache
self.upgrade_strategy = upgrade_strategy
self.force_reinstall = force_reinstall
self.ignore_dependencies = ignore_dependencies
self.ignore_installed = ignore_installed
self.ignore_requires_python = ignore_requires_python
self.use_user_site = use_user_site
self._make_install_req = make_install_req
self._discovered_dependencies: DiscoveredDependencies = defaultdict(list)
def resolve(
self, root_reqs: List[InstallRequirement], check_supported_wheels: bool
) -> RequirementSet:
requirement_set = RequirementSet(check_supported_wheels=check_supported_wheels)
for req in root_reqs:
if req.constraint:
check_invalid_constraint_type(req)
self._add_requirement_to_set(requirement_set, req)
discovered_reqs: List[InstallRequirement] = []
hash_errors = HashErrors()
for req in chain(requirement_set.all_requirements, discovered_reqs):
try:
discovered_reqs.extend(self._resolve_one(requirement_set, req))
except HashError as exc:
exc.req = req
hash_errors.append(exc)
if hash_errors:
raise hash_errors
return requirement_set
def _add_requirement_to_set(
self,
requirement_set: RequirementSet,
install_req: InstallRequirement,
parent_req_name: Optional[str] = None,
extras_requested: Optional[Iterable[str]] = None,
) -> Tuple[List[InstallRequirement], Optional[InstallRequirement]]:
if not install_req.match_markers(extras_requested):
logger.info(
"Ignoring %s: markers '%s' don't match your environment",
install_req.name,
install_req.markers,
)
return [], None
if install_req.link and install_req.link.is_wheel:
wheel = Wheel(install_req.link.filename)
tags = compatibility_tags.get_supported()
if requirement_set.check_supported_wheels and not wheel.supported(tags):
raise InstallationError(
f"{wheel.filename} is not a supported wheel on this platform."
)
assert (
not install_req.user_supplied or parent_req_name is None
), "a user supplied req shouldn't have a parent"
if not install_req.name:
requirement_set.add_unnamed_requirement(install_req)
return [install_req], None
try:
existing_req: Optional[
InstallRequirement
] = requirement_set.get_requirement(install_req.name)
except KeyError:
existing_req = None
has_conflicting_requirement = (
parent_req_name is None
and existing_req
and not existing_req.constraint
and existing_req.extras == install_req.extras
and existing_req.req
and install_req.req
and existing_req.req.specifier != install_req.req.specifier
)
if has_conflicting_requirement:
raise InstallationError(
"Double requirement given: {} (already in {}, name={!r})".format(
install_req, existing_req, install_req.name
)
)
if not existing_req:
requirement_set.add_named_requirement(install_req)
return [install_req], install_req
if install_req.constraint or not existing_req.constraint:
return [], existing_req
does_not_satisfy_constraint = install_req.link and not (
existing_req.link and install_req.link.path == existing_req.link.path
)
if does_not_satisfy_constraint:
raise InstallationError(
f"Could not satisfy constraints for '{install_req.name}': "
"installation from path or url cannot be "
"constrained to a version"
)
existing_req.constraint = False
if install_req.user_supplied:
existing_req.user_supplied = True
existing_req.extras = tuple(
sorted(set(existing_req.extras) | set(install_req.extras))
)
logger.debug(
"Setting %s extras to: %s",
existing_req,
existing_req.extras,
)
return [existing_req], existing_req
def _is_upgrade_allowed(self, req: InstallRequirement) -> bool:
if self.upgrade_strategy == "to-satisfy-only":
return False
elif self.upgrade_strategy == "eager":
return True
else:
assert self.upgrade_strategy == "only-if-needed"
return req.user_supplied or req.constraint
def _set_req_to_reinstall(self, req: InstallRequirement) -> None:
if not self.use_user_site or req.satisfied_by.in_usersite:
req.should_reinstall = True
req.satisfied_by = None
def _check_skip_installed(
self, req_to_install: InstallRequirement
) -> Optional[str]:
if self.ignore_installed:
return None
req_to_install.check_if_exists(self.use_user_site)
if not req_to_install.satisfied_by:
return None
if self.force_reinstall:
self._set_req_to_reinstall(req_to_install)
return None
if not self._is_upgrade_allowed(req_to_install):
if self.upgrade_strategy == "only-if-needed":
return "already satisfied, skipping upgrade"
return "already satisfied"
if not req_to_install.link:
try:
self.finder.find_requirement(req_to_install, upgrade=True)
except BestVersionAlreadyInstalled:
return "already up-to-date"
except DistributionNotFound:
pass
self._set_req_to_reinstall(req_to_install)
return None
def _find_requirement_link(self, req: InstallRequirement) -> Optional[Link]:
upgrade = self._is_upgrade_allowed(req)
best_candidate = self.finder.find_requirement(req, upgrade)
if not best_candidate:
return None
link = best_candidate.link
if link.is_yanked:
reason = link.yanked_reason or "<none given>"
msg = (
"The candidate selected for download or install is a "
f"yanked version: {best_candidate}\n"
f"Reason for being yanked: {reason}"
)
logger.warning(msg)
return link
def _populate_link(self, req: InstallRequirement) -> None:
if req.link is None:
req.link = self._find_requirement_link(req)
if self.wheel_cache is None or self.preparer.require_hashes:
return
cache_entry = self.wheel_cache.get_cache_entry(
link=req.link,
package_name=req.name,
supported_tags=get_supported(),
)
if cache_entry is not None:
logger.debug("Using cached wheel link: %s", cache_entry.link)
if req.link is req.original_link and cache_entry.persistent:
req.cached_wheel_source_link = req.link
if cache_entry.origin is not None:
req.download_info = cache_entry.origin
else:
req.download_info = direct_url_from_link(
req.link, link_is_in_wheel_cache=cache_entry.persistent
)
req.link = cache_entry.link
def _get_dist_for(self, req: InstallRequirement) -> BaseDistribution:
if req.editable:
return self.preparer.prepare_editable_requirement(req)
assert req.satisfied_by is None
skip_reason = self._check_skip_installed(req)
if req.satisfied_by:
return self.preparer.prepare_installed_requirement(req, skip_reason)
self._populate_link(req)
dist = self.preparer.prepare_linked_requirement(req)
if not self.ignore_installed:
req.check_if_exists(self.use_user_site)
if req.satisfied_by:
should_modify = (
self.upgrade_strategy != "to-satisfy-only"
or self.force_reinstall
or self.ignore_installed
or req.link.scheme == "file"
)
if should_modify:
self._set_req_to_reinstall(req)
else:
logger.info(
"Requirement already satisfied (use --upgrade to upgrade): %s",
req,
)
return dist
def _resolve_one(
self,
requirement_set: RequirementSet,
req_to_install: InstallRequirement,
) -> List[InstallRequirement]:
if req_to_install.constraint or req_to_install.prepared:
return []
req_to_install.prepared = True
dist = self._get_dist_for(req_to_install)
_check_dist_requires_python(
dist,
version_info=self._py_version_info,
ignore_requires_python=self.ignore_requires_python,
)
more_reqs: List[InstallRequirement] = []
def add_req(subreq: Requirement, extras_requested: Iterable[str]) -> None:
sub_install_req = self._make_install_req(str(subreq), req_to_install)
parent_req_name = req_to_install.name
to_scan_again, add_to_parent = self._add_requirement_to_set(
requirement_set,
sub_install_req,
parent_req_name=parent_req_name,
extras_requested=extras_requested,
)
if parent_req_name and add_to_parent:
self._discovered_dependencies[parent_req_name].append(add_to_parent)
more_reqs.extend(to_scan_again)
with indent_log():
if not requirement_set.has_requirement(req_to_install.name):
assert req_to_install.user_supplied
self._add_requirement_to_set(
requirement_set, req_to_install, parent_req_name=None
)
if not self.ignore_dependencies:
if req_to_install.extras:
logger.debug(
"Installing extra requirements: %r",
",".join(req_to_install.extras),
)
missing_requested = sorted(
set(req_to_install.extras) - set(dist.iter_provided_extras())
)
for missing in missing_requested:
logger.warning(
"%s %s does not provide the extra '%s'",
dist.raw_name,
dist.version,
missing,
)
available_requested = sorted(
set(dist.iter_provided_extras()) & set(req_to_install.extras)
)
for subreq in dist.iter_dependencies(available_requested):
add_req(subreq, extras_requested=available_requested)
return more_reqs
def get_installation_order(
self, req_set: RequirementSet
) -> List[InstallRequirement]:
order = []
ordered_reqs: Set[InstallRequirement] = set()
def schedule(req: InstallRequirement) -> None:
if req.satisfied_by or req in ordered_reqs:
return
if req.constraint:
return
ordered_reqs.add(req)
for dep in self._discovered_dependencies[req.name]:
schedule(dep)
order.append(req)
for install_req in req_set.requirements.values():
schedule(install_req)
return order