import functools
import itertools
import logging
import os
import posixpath
import re
import urllib.parse
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Mapping,
NamedTuple,
Optional,
Tuple,
Union,
)
from pip._internal.utils.deprecation import deprecated
from pip._internal.utils.filetypes import WHEEL_EXTENSION
from pip._internal.utils.hashes import Hashes
from pip._internal.utils.misc import (
pairwise,
redact_auth_from_url,
split_auth_from_netloc,
splitext,
)
from pip._internal.utils.models import KeyBasedCompareMixin
from pip._internal.utils.urls import path_to_url, url_to_path
if TYPE_CHECKING:
from pip._internal.index.collector import IndexContent
logger = logging.getLogger(__name__)
_SUPPORTED_HASHES = ("sha512", "sha384", "sha256", "sha224", "sha1", "md5")
@dataclass(frozen=True)
class LinkHash:
name: str
value: str
_hash_url_fragment_re = re.compile(
r"[#&]({choices})=([^&]*)".format(
choices="|".join(re.escape(hash_name) for hash_name in _SUPPORTED_HASHES)
),
)
def __post_init__(self) -> None:
assert self.name in _SUPPORTED_HASHES
@classmethod
@functools.lru_cache(maxsize=None)
def find_hash_url_fragment(cls, url: str) -> Optional["LinkHash"]:
match = cls._hash_url_fragment_re.search(url)
if match is None:
return None
name, value = match.groups()
return cls(name=name, value=value)
def as_dict(self) -> Dict[str, str]:
return {self.name: self.value}
def as_hashes(self) -> Hashes:
return Hashes({self.name: [self.value]})
def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool:
if hashes is None:
return False
return hashes.is_hash_allowed(self.name, hex_digest=self.value)
@dataclass(frozen=True)
class MetadataFile:
hashes: Optional[Dict[str, str]]
def __post_init__(self) -> None:
if self.hashes is not None:
assert all(name in _SUPPORTED_HASHES for name in self.hashes)
def supported_hashes(hashes: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]:
if hashes is None:
return None
hashes = {n: v for n, v in hashes.items() if n in _SUPPORTED_HASHES}
if not hashes:
return None
return hashes
def _clean_url_path_part(part: str) -> str:
return urllib.parse.quote(urllib.parse.unquote(part))
def _clean_file_url_path(part: str) -> str:
return urllib.request.pathname2url(urllib.request.url2pathname(part))
_reserved_chars_re = re.compile("(@|%2F)", re.IGNORECASE)
def _clean_url_path(path: str, is_local_path: bool) -> str:
if is_local_path:
clean_func = _clean_file_url_path
else:
clean_func = _clean_url_path_part
parts = _reserved_chars_re.split(path)
cleaned_parts = []
for to_clean, reserved in pairwise(itertools.chain(parts, [""])):
cleaned_parts.append(clean_func(to_clean))
cleaned_parts.append(reserved.upper())
return "".join(cleaned_parts)
def _ensure_quoted_url(url: str) -> str:
result = urllib.parse.urlparse(url)
is_local_path = not result.netloc
path = _clean_url_path(result.path, is_local_path=is_local_path)
return urllib.parse.urlunparse(result._replace(path=path))
class Link(KeyBasedCompareMixin):
__slots__ = [
"_parsed_url",
"_url",
"_hashes",
"comes_from",
"requires_python",
"yanked_reason",
"metadata_file_data",
"cache_link_parsing",
"egg_fragment",
]
def __init__(
self,
url: str,
comes_from: Optional[Union[str, "IndexContent"]] = None,
requires_python: Optional[str] = None,
yanked_reason: Optional[str] = None,
metadata_file_data: Optional[MetadataFile] = None,
cache_link_parsing: bool = True,
hashes: Optional[Mapping[str, str]] = None,
) -> None:
if url.startswith("\\\\"):
url = path_to_url(url)
self._parsed_url = urllib.parse.urlsplit(url)
self._url = url
link_hash = LinkHash.find_hash_url_fragment(url)
hashes_from_link = {} if link_hash is None else link_hash.as_dict()
if hashes is None:
self._hashes = hashes_from_link
else:
self._hashes = {**hashes, **hashes_from_link}
self.comes_from = comes_from
self.requires_python = requires_python if requires_python else None
self.yanked_reason = yanked_reason
self.metadata_file_data = metadata_file_data
super().__init__(key=url, defining_class=Link)
self.cache_link_parsing = cache_link_parsing
self.egg_fragment = self._egg_fragment()
@classmethod
def from_json(
cls,
file_data: Dict[str, Any],
page_url: str,
) -> Optional["Link"]:
file_url = file_data.get("url")
if file_url is None:
return None
url = _ensure_quoted_url(urllib.parse.urljoin(page_url, file_url))
pyrequire = file_data.get("requires-python")
yanked_reason = file_data.get("yanked")
hashes = file_data.get("hashes", {})
metadata_info = file_data.get("core-metadata")
if metadata_info is None:
metadata_info = file_data.get("dist-info-metadata")
if isinstance(metadata_info, dict):
metadata_file_data = MetadataFile(supported_hashes(metadata_info))
elif metadata_info:
metadata_file_data = MetadataFile(None)
else:
metadata_file_data = None
if yanked_reason and not isinstance(yanked_reason, str):
yanked_reason = ""
elif not yanked_reason:
yanked_reason = None
return cls(
url,
comes_from=page_url,
requires_python=pyrequire,
yanked_reason=yanked_reason,
hashes=hashes,
metadata_file_data=metadata_file_data,
)
@classmethod
def from_element(
cls,
anchor_attribs: Dict[str, Optional[str]],
page_url: str,
base_url: str,
) -> Optional["Link"]:
href = anchor_attribs.get("href")
if not href:
return None
url = _ensure_quoted_url(urllib.parse.urljoin(base_url, href))
pyrequire = anchor_attribs.get("data-requires-python")
yanked_reason = anchor_attribs.get("data-yanked")
metadata_info = anchor_attribs.get("data-core-metadata")
if metadata_info is None:
metadata_info = anchor_attribs.get("data-dist-info-metadata")
if metadata_info == "true":
metadata_file_data = MetadataFile(None)
elif metadata_info is None:
metadata_file_data = None
else:
hashname, sep, hashval = metadata_info.partition("=")
if sep == "=":
metadata_file_data = MetadataFile(supported_hashes({hashname: hashval}))
else:
logger.debug(
"Index returned invalid data-dist-info-metadata value: %s",
metadata_info,
)
metadata_file_data = MetadataFile(None)
return cls(
url,
comes_from=page_url,
requires_python=pyrequire,
yanked_reason=yanked_reason,
metadata_file_data=metadata_file_data,
)
def __str__(self) -> str:
if self.requires_python:
rp = f" (requires-python:{self.requires_python})"
else:
rp = ""
if self.comes_from:
return f"{redact_auth_from_url(self._url)} (from {self.comes_from}){rp}"
else:
return redact_auth_from_url(str(self._url))
def __repr__(self) -> str:
return f"<Link {self}>"
@property
def url(self) -> str:
return self._url
@property
def filename(self) -> str:
path = self.path.rstrip("/")
name = posixpath.basename(path)
if not name:
netloc, user_pass = split_auth_from_netloc(self.netloc)
return netloc
name = urllib.parse.unquote(name)
assert name, f"URL {self._url!r} produced no filename"
return name
@property
def file_path(self) -> str:
return url_to_path(self.url)
@property
def scheme(self) -> str:
return self._parsed_url.scheme
@property
def netloc(self) -> str:
return self._parsed_url.netloc
@property
def path(self) -> str:
return urllib.parse.unquote(self._parsed_url.path)
def splitext(self) -> Tuple[str, str]:
return splitext(posixpath.basename(self.path.rstrip("/")))
@property
def ext(self) -> str:
return self.splitext()[1]
@property
def url_without_fragment(self) -> str:
scheme, netloc, path, query, fragment = self._parsed_url
return urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
_egg_fragment_re = re.compile(r"[#&]egg=([^&]*)")
_project_name_re = re.compile(
r"^([A-Z0-9]|[A-Z0-9][A-Z0-9._-]*[A-Z0-9])$", re.IGNORECASE
)
def _egg_fragment(self) -> Optional[str]:
match = self._egg_fragment_re.search(self._url)
if not match:
return None
project_name = match.group(1)
if not self._project_name_re.match(project_name):
deprecated(
reason=f"{self} contains an egg fragment with a non-PEP 508 name",
replacement="to use the req @ url syntax, and remove the egg fragment",
gone_in="25.0",
issue=11617,
)
return project_name
_subdirectory_fragment_re = re.compile(r"[#&]subdirectory=([^&]*)")
@property
def subdirectory_fragment(self) -> Optional[str]:
match = self._subdirectory_fragment_re.search(self._url)
if not match:
return None
return match.group(1)
def metadata_link(self) -> Optional["Link"]:
if self.metadata_file_data is None:
return None
metadata_url = f"{self.url_without_fragment}.metadata"
if self.metadata_file_data.hashes is None:
return Link(metadata_url)
return Link(metadata_url, hashes=self.metadata_file_data.hashes)
def as_hashes(self) -> Hashes:
return Hashes({k: [v] for k, v in self._hashes.items()})
@property
def hash(self) -> Optional[str]:
return next(iter(self._hashes.values()), None)
@property
def hash_name(self) -> Optional[str]:
return next(iter(self._hashes), None)
@property
def show_url(self) -> str:
return posixpath.basename(self._url.split("#", 1)[0].split("?", 1)[0])
@property
def is_file(self) -> bool:
return self.scheme == "file"
def is_existing_dir(self) -> bool:
return self.is_file and os.path.isdir(self.file_path)
@property
def is_wheel(self) -> bool:
return self.ext == WHEEL_EXTENSION
@property
def is_vcs(self) -> bool:
from pip._internal.vcs import vcs
return self.scheme in vcs.all_schemes
@property
def is_yanked(self) -> bool:
return self.yanked_reason is not None
@property
def has_hash(self) -> bool:
return bool(self._hashes)
def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool:
if hashes is None:
return False
return any(hashes.is_hash_allowed(k, v) for k, v in self._hashes.items())
class _CleanResult(NamedTuple):
parsed: urllib.parse.SplitResult
query: Dict[str, List[str]]
subdirectory: str
hashes: Dict[str, str]
def _clean_link(link: Link) -> _CleanResult:
parsed = link._parsed_url
netloc = parsed.netloc.rsplit("@", 1)[-1]
if parsed.scheme == "file" and not netloc:
netloc = "localhost"
fragment = urllib.parse.parse_qs(parsed.fragment)
if "egg" in fragment:
logger.debug("Ignoring egg= fragment in %s", link)
try:
subdirectory = fragment["subdirectory"][0]
except (IndexError, KeyError):
subdirectory = ""
hashes = {k: fragment[k][0] for k in _SUPPORTED_HASHES if k in fragment}
return _CleanResult(
parsed=parsed._replace(netloc=netloc, query="", fragment=""),
query=urllib.parse.parse_qs(parsed.query),
subdirectory=subdirectory,
hashes=hashes,
)
@functools.lru_cache(maxsize=None)
def links_equivalent(link1: Link, link2: Link) -> bool:
return _clean_link(link1) == _clean_link(link2)