import collections
import email.message
import functools
import itertools
import json
import logging
import os
import urllib.parse
import urllib.request
from html.parser import HTMLParser
from optparse import Values
from typing import (
TYPE_CHECKING,
Callable,
Dict,
Iterable,
List,
MutableMapping,
NamedTuple,
Optional,
Sequence,
Tuple,
Union,
)
from pip._vendor import requests
from pip._vendor.requests import Response
from pip._vendor.requests.exceptions import RetryError, SSLError
from pip._internal.exceptions import NetworkConnectionError
from pip._internal.models.link import Link
from pip._internal.models.search_scope import SearchScope
from pip._internal.network.session import PipSession
from pip._internal.network.utils import raise_for_status
from pip._internal.utils.filetypes import is_archive_file
from pip._internal.utils.misc import redact_auth_from_url
from pip._internal.vcs import vcs
from .sources import CandidatesFromPage, LinkSource, build_source
if TYPE_CHECKING:
from typing import Protocol
else:
Protocol = object
logger = logging.getLogger(__name__)
ResponseHeaders = MutableMapping[str, str]
def _match_vcs_scheme(url: str) -> Optional[str]:
for scheme in vcs.schemes:
if url.lower().startswith(scheme) and url[len(scheme)] in "+:":
return scheme
return None
class _NotAPIContent(Exception):
def __init__(self, content_type: str, request_desc: str) -> None:
super().__init__(content_type, request_desc)
self.content_type = content_type
self.request_desc = request_desc
def _ensure_api_header(response: Response) -> None:
content_type = response.headers.get("Content-Type", "Unknown")
content_type_l = content_type.lower()
if content_type_l.startswith(
(
"text/html",
"application/vnd.pypi.simple.v1+html",
"application/vnd.pypi.simple.v1+json",
)
):
return
raise _NotAPIContent(content_type, response.request.method)
class _NotHTTP(Exception):
pass
def _ensure_api_response(url: str, session: PipSession) -> None:
scheme, netloc, path, query, fragment = urllib.parse.urlsplit(url)
if scheme not in {"http", "https"}:
raise _NotHTTP()
resp = session.head(url, allow_redirects=True)
raise_for_status(resp)
_ensure_api_header(resp)
def _get_simple_response(url: str, session: PipSession) -> Response:
if is_archive_file(Link(url).filename):
_ensure_api_response(url, session=session)
logger.debug("Getting page %s", redact_auth_from_url(url))
resp = session.get(
url,
headers={
"Accept": ", ".join(
[
"application/vnd.pypi.simple.v1+json",
"application/vnd.pypi.simple.v1+html; q=0.1",
"text/html; q=0.01",
]
),
"Cache-Control": "max-age=0",
},
)
raise_for_status(resp)
_ensure_api_header(resp)
logger.debug(
"Fetched page %s as %s",
redact_auth_from_url(url),
resp.headers.get("Content-Type", "Unknown"),
)
return resp
def _get_encoding_from_headers(headers: ResponseHeaders) -> Optional[str]:
if headers and "Content-Type" in headers:
m = email.message.Message()
m["content-type"] = headers["Content-Type"]
charset = m.get_param("charset")
if charset:
return str(charset)
return None
class CacheablePageContent:
def __init__(self, page: "IndexContent") -> None:
assert page.cache_link_parsing
self.page = page
def __eq__(self, other: object) -> bool:
return isinstance(other, type(self)) and self.page.url == other.page.url
def __hash__(self) -> int:
return hash(self.page.url)
class ParseLinks(Protocol):
def __call__(self, page: "IndexContent") -> Iterable[Link]:
...
def with_cached_index_content(fn: ParseLinks) -> ParseLinks:
@functools.lru_cache(maxsize=None)
def wrapper(cacheable_page: CacheablePageContent) -> List[Link]:
return list(fn(cacheable_page.page))
@functools.wraps(fn)
def wrapper_wrapper(page: "IndexContent") -> List[Link]:
if page.cache_link_parsing:
return wrapper(CacheablePageContent(page))
return list(fn(page))
return wrapper_wrapper
@with_cached_index_content
def parse_links(page: "IndexContent") -> Iterable[Link]:
content_type_l = page.content_type.lower()
if content_type_l.startswith("application/vnd.pypi.simple.v1+json"):
data = json.loads(page.content)
for file in data.get("files", []):
link = Link.from_json(file, page.url)
if link is None:
continue
yield link
return
parser = HTMLLinkParser(page.url)
encoding = page.encoding or "utf-8"
parser.feed(page.content.decode(encoding))
url = page.url
base_url = parser.base_url or url
for anchor in parser.anchors:
link = Link.from_element(anchor, page_url=url, base_url=base_url)
if link is None:
continue
yield link
class IndexContent:
def __init__(
self,
content: bytes,
content_type: str,
encoding: Optional[str],
url: str,
cache_link_parsing: bool = True,
) -> None:
self.content = content
self.content_type = content_type
self.encoding = encoding
self.url = url
self.cache_link_parsing = cache_link_parsing
def __str__(self) -> str:
return redact_auth_from_url(self.url)
class HTMLLinkParser(HTMLParser):
def __init__(self, url: str) -> None:
super().__init__(convert_charrefs=True)
self.url: str = url
self.base_url: Optional[str] = None
self.anchors: List[Dict[str, Optional[str]]] = []
def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
if tag == "base" and self.base_url is None:
href = self.get_href(attrs)
if href is not None:
self.base_url = href
elif tag == "a":
self.anchors.append(dict(attrs))
def get_href(self, attrs: List[Tuple[str, Optional[str]]]) -> Optional[str]:
for name, value in attrs:
if name == "href":
return value
return None
def _handle_get_simple_fail(
link: Link,
reason: Union[str, Exception],
meth: Optional[Callable[..., None]] = None,
) -> None:
if meth is None:
meth = logger.debug
meth("Could not fetch URL %s: %s - skipping", link, reason)
def _make_index_content(
response: Response, cache_link_parsing: bool = True
) -> IndexContent:
encoding = _get_encoding_from_headers(response.headers)
return IndexContent(
response.content,
response.headers["Content-Type"],
encoding=encoding,
url=response.url,
cache_link_parsing=cache_link_parsing,
)
def _get_index_content(link: Link, *, session: PipSession) -> Optional["IndexContent"]:
url = link.url.split("#", 1)[0]
vcs_scheme = _match_vcs_scheme(url)
if vcs_scheme:
logger.warning(
"Cannot look at %s URL %s because it does not support lookup as web pages.",
vcs_scheme,
link,
)
return None
scheme, _, path, _, _, _ = urllib.parse.urlparse(url)
if scheme == "file" and os.path.isdir(urllib.request.url2pathname(path)):
if not url.endswith("/"):
url += "/"
url = urllib.parse.urljoin(url, "index.html")
logger.debug(" file: URL is directory, getting %s", url)
try:
resp = _get_simple_response(url, session=session)
except _NotHTTP:
logger.warning(
"Skipping page %s because it looks like an archive, and cannot "
"be checked by a HTTP HEAD request.",
link,
)
except _NotAPIContent as exc:
logger.warning(
"Skipping page %s because the %s request got Content-Type: %s. "
"The only supported Content-Types are application/vnd.pypi.simple.v1+json, "
"application/vnd.pypi.simple.v1+html, and text/html",
link,
exc.request_desc,
exc.content_type,
)
except NetworkConnectionError as exc:
_handle_get_simple_fail(link, exc)
except RetryError as exc:
_handle_get_simple_fail(link, exc)
except SSLError as exc:
reason = "There was a problem confirming the ssl certificate: "
reason += str(exc)
_handle_get_simple_fail(link, reason, meth=logger.info)
except requests.ConnectionError as exc:
_handle_get_simple_fail(link, f"connection error: {exc}")
except requests.Timeout:
_handle_get_simple_fail(link, "timed out")
else:
return _make_index_content(resp, cache_link_parsing=link.cache_link_parsing)
return None
class CollectedSources(NamedTuple):
find_links: Sequence[Optional[LinkSource]]
index_urls: Sequence[Optional[LinkSource]]
class LinkCollector:
def __init__(
self,
session: PipSession,
search_scope: SearchScope,
) -> None:
self.search_scope = search_scope
self.session = session
@classmethod
def create(
cls,
session: PipSession,
options: Values,
suppress_no_index: bool = False,
) -> "LinkCollector":
index_urls = [options.index_url] + options.extra_index_urls
if options.no_index and not suppress_no_index:
logger.debug(
"Ignoring indexes: %s",
",".join(redact_auth_from_url(url) for url in index_urls),
)
index_urls = []
find_links = options.find_links or []
search_scope = SearchScope.create(
find_links=find_links,
index_urls=index_urls,
no_index=options.no_index,
)
link_collector = LinkCollector(
session=session,
search_scope=search_scope,
)
return link_collector
@property
def find_links(self) -> List[str]:
return self.search_scope.find_links
def fetch_response(self, location: Link) -> Optional[IndexContent]:
return _get_index_content(location, session=self.session)
def collect_sources(
self,
project_name: str,
candidates_from_page: CandidatesFromPage,
) -> CollectedSources:
index_url_sources = collections.OrderedDict(
build_source(
loc,
candidates_from_page=candidates_from_page,
page_validator=self.session.is_secure_origin,
expand_dir=False,
cache_link_parsing=False,
project_name=project_name,
)
for loc in self.search_scope.get_index_urls_locations(project_name)
).values()
find_links_sources = collections.OrderedDict(
build_source(
loc,
candidates_from_page=candidates_from_page,
page_validator=self.session.is_secure_origin,
expand_dir=True,
cache_link_parsing=True,
project_name=project_name,
)
for loc in self.find_links
).values()
if logger.isEnabledFor(logging.DEBUG):
lines = [
f"* {s.link}"
for s in itertools.chain(find_links_sources, index_url_sources)
if s is not None and s.link is not None
]
lines = [
f"{len(lines)} location(s) to search "
f"for versions of {project_name}:"
] + lines
logger.debug("\n".join(lines))
return CollectedSources(
find_links=list(find_links_sources),
index_urls=list(index_url_sources),
)