import logging
import os
import shutil
import subprocess
import sysconfig
import typing
import urllib.parse
from abc import ABC, abstractmethod
from functools import lru_cache
from os.path import commonprefix
from pathlib import Path
from typing import Any, Dict, List, NamedTuple, Optional, Tuple
from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
from pip._vendor.requests.models import Request, Response
from pip._vendor.requests.utils import get_netrc_auth
from pip._internal.utils.logging import getLogger
from pip._internal.utils.misc import (
ask,
ask_input,
ask_password,
remove_auth_from_url,
split_auth_netloc_from_url,
)
from pip._internal.vcs.versioncontrol import AuthInfo
logger = getLogger(__name__)
KEYRING_DISABLED = False
class Credentials(NamedTuple):
url: str
username: str
password: str
class KeyRingBaseProvider(ABC):
has_keyring: bool
@abstractmethod
def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
...
@abstractmethod
def save_auth_info(self, url: str, username: str, password: str) -> None:
...
class KeyRingNullProvider(KeyRingBaseProvider):
has_keyring = False
def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
return None
def save_auth_info(self, url: str, username: str, password: str) -> None:
return None
class KeyRingPythonProvider(KeyRingBaseProvider):
has_keyring = True
def __init__(self) -> None:
import keyring
self.keyring = keyring
def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
if hasattr(self.keyring, "get_credential"):
logger.debug("Getting credentials from keyring for %s", url)
cred = self.keyring.get_credential(url, username)
if cred is not None:
return cred.username, cred.password
return None
if username is not None:
logger.debug("Getting password from keyring for %s", url)
password = self.keyring.get_password(url, username)
if password:
return username, password
return None
def save_auth_info(self, url: str, username: str, password: str) -> None:
self.keyring.set_password(url, username, password)
class KeyRingCliProvider(KeyRingBaseProvider):
has_keyring = True
def __init__(self, cmd: str) -> None:
self.keyring = cmd
def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
if username is not None:
password = self._get_password(url, username)
if password is not None:
return username, password
return None
def save_auth_info(self, url: str, username: str, password: str) -> None:
return self._set_password(url, username, password)
def _get_password(self, service_name: str, username: str) -> Optional[str]:
if self.keyring is None:
return None
cmd = [self.keyring, "get", service_name, username]
env = os.environ.copy()
env["PYTHONIOENCODING"] = "utf-8"
res = subprocess.run(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
env=env,
)
if res.returncode:
return None
return res.stdout.decode("utf-8").strip(os.linesep)
def _set_password(self, service_name: str, username: str, password: str) -> None:
if self.keyring is None:
return None
env = os.environ.copy()
env["PYTHONIOENCODING"] = "utf-8"
subprocess.run(
[self.keyring, "set", service_name, username],
input=f"{password}{os.linesep}".encode("utf-8"),
env=env,
check=True,
)
return None
@lru_cache(maxsize=None)
def get_keyring_provider(provider: str) -> KeyRingBaseProvider:
logger.verbose("Keyring provider requested: %s", provider)
if KEYRING_DISABLED:
provider = "disabled"
if provider in ["import", "auto"]:
try:
impl = KeyRingPythonProvider()
logger.verbose("Keyring provider set: import")
return impl
except ImportError:
pass
except Exception as exc:
msg = "Installed copy of keyring fails with exception %s"
if provider == "auto":
msg = msg + ", trying to find a keyring executable as a fallback"
logger.warning(msg, exc, exc_info=logger.isEnabledFor(logging.DEBUG))
if provider in ["subprocess", "auto"]:
cli = shutil.which("keyring")
if cli and cli.startswith(sysconfig.get_path("scripts")):
@typing.no_type_check
def PATH_as_shutil_which_determines_it() -> str:
path = os.environ.get("PATH", None)
if path is None:
try:
path = os.confstr("CS_PATH")
except (AttributeError, ValueError):
path = os.defpath
return path
scripts = Path(sysconfig.get_path("scripts"))
paths = []
for path in PATH_as_shutil_which_determines_it().split(os.pathsep):
p = Path(path)
try:
if not p.samefile(scripts):
paths.append(path)
except FileNotFoundError:
pass
path = os.pathsep.join(paths)
cli = shutil.which("keyring", path=path)
if cli:
logger.verbose("Keyring provider set: subprocess with executable %s", cli)
return KeyRingCliProvider(cli)
logger.verbose("Keyring provider set: disabled")
return KeyRingNullProvider()
class MultiDomainBasicAuth(AuthBase):
def __init__(
self,
prompting: bool = True,
index_urls: Optional[List[str]] = None,
keyring_provider: str = "auto",
) -> None:
self.prompting = prompting
self.index_urls = index_urls
self.keyring_provider = keyring_provider self.passwords: Dict[str, AuthInfo] = {}
self._credentials_to_save: Optional[Credentials] = None
@property
def keyring_provider(self) -> KeyRingBaseProvider:
return get_keyring_provider(self._keyring_provider)
@keyring_provider.setter
def keyring_provider(self, provider: str) -> None:
self._keyring_provider = provider
@property
def use_keyring(self) -> bool:
return self.prompting or self._keyring_provider not in ["auto", "disabled"]
def _get_keyring_auth(
self,
url: Optional[str],
username: Optional[str],
) -> Optional[AuthInfo]:
if not url:
return None
try:
return self.keyring_provider.get_auth_info(url, username)
except Exception as exc:
logger.warning(
"Keyring is skipped due to an exception: %s",
str(exc),
)
global KEYRING_DISABLED
KEYRING_DISABLED = True
get_keyring_provider.cache_clear()
return None
def _get_index_url(self, url: str) -> Optional[str]:
if not url or not self.index_urls:
return None
url = remove_auth_from_url(url).rstrip("/") + "/"
parsed_url = urllib.parse.urlsplit(url)
candidates = []
for index in self.index_urls:
index = index.rstrip("/") + "/"
parsed_index = urllib.parse.urlsplit(remove_auth_from_url(index))
if parsed_url == parsed_index:
return index
if parsed_url.netloc != parsed_index.netloc:
continue
candidate = urllib.parse.urlsplit(index)
candidates.append(candidate)
if not candidates:
return None
candidates.sort(
reverse=True,
key=lambda candidate: commonprefix(
[
parsed_url.path,
candidate.path,
]
).rfind("/"),
)
return urllib.parse.urlunsplit(candidates[0])
def _get_new_credentials(
self,
original_url: str,
*,
allow_netrc: bool = True,
allow_keyring: bool = False,
) -> AuthInfo:
url, netloc, url_user_password = split_auth_netloc_from_url(
original_url,
)
username, password = url_user_password
if username is not None and password is not None:
logger.debug("Found credentials in url for %s", netloc)
return url_user_password
index_url = self._get_index_url(url)
if index_url:
index_info = split_auth_netloc_from_url(index_url)
if index_info:
index_url, _, index_url_user_password = index_info
logger.debug("Found index url %s", index_url)
if index_url and index_url_user_password[0] is not None:
username, password = index_url_user_password
if username is not None and password is not None:
logger.debug("Found credentials in index url for %s", netloc)
return index_url_user_password
if allow_netrc:
netrc_auth = get_netrc_auth(original_url)
if netrc_auth:
logger.debug("Found credentials in netrc for %s", netloc)
return netrc_auth
if allow_keyring:
kr_auth = (
self._get_keyring_auth(index_url, username) or
self._get_keyring_auth(netloc, username)
)
if kr_auth:
logger.debug("Found credentials in keyring for %s", netloc)
return kr_auth
return username, password
def _get_url_and_credentials(
self, original_url: str
) -> Tuple[str, Optional[str], Optional[str]]:
url, netloc, _ = split_auth_netloc_from_url(original_url)
username, password = self._get_new_credentials(original_url)
if (username is None or password is None) and netloc in self.passwords:
un, pw = self.passwords[netloc]
if username is None or username == un:
username, password = un, pw
if username is not None or password is not None:
username = username or ""
password = password or ""
self.passwords[netloc] = (username, password)
assert (
(username is not None and password is not None)
or (username is None and password is None)
), f"Could not load credentials from url: {original_url}"
return url, username, password
def __call__(self, req: Request) -> Request:
url, username, password = self._get_url_and_credentials(req.url)
req.url = url
if username is not None and password is not None:
req = HTTPBasicAuth(username, password)(req)
req.register_hook("response", self.handle_401)
return req
def _prompt_for_password(
self, netloc: str
) -> Tuple[Optional[str], Optional[str], bool]:
username = ask_input(f"User for {netloc}: ") if self.prompting else None
if not username:
return None, None, False
if self.use_keyring:
auth = self._get_keyring_auth(netloc, username)
if auth and auth[0] is not None and auth[1] is not None:
return auth[0], auth[1], False
password = ask_password("Password: ")
return username, password, True
def _should_save_password_to_keyring(self) -> bool:
if (
not self.prompting
or not self.use_keyring
or not self.keyring_provider.has_keyring
):
return False
return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y"
def handle_401(self, resp: Response, **kwargs: Any) -> Response:
if resp.status_code != 401:
return resp
username, password = None, None
if self.use_keyring:
username, password = self._get_new_credentials(
resp.url,
allow_netrc=False,
allow_keyring=True,
)
if not self.prompting and not username and not password:
return resp
parsed = urllib.parse.urlparse(resp.url)
save = False
if not username and not password:
username, password, save = self._prompt_for_password(parsed.netloc)
self._credentials_to_save = None
if username is not None and password is not None:
self.passwords[parsed.netloc] = (username, password)
if save and self._should_save_password_to_keyring():
self._credentials_to_save = Credentials(
url=parsed.netloc,
username=username,
password=password,
)
_ = resp.content
resp.raw.release_conn()
req = HTTPBasicAuth(username or "", password or "")(resp.request)
req.register_hook("response", self.warn_on_401)
if self._credentials_to_save:
req.register_hook("response", self.save_credentials)
new_resp = resp.connection.send(req, **kwargs)
new_resp.history.append(resp)
return new_resp
def warn_on_401(self, resp: Response, **kwargs: Any) -> None:
if resp.status_code == 401:
logger.warning(
"401 Error, Credentials not correct for %s",
resp.request.url,
)
def save_credentials(self, resp: Response, **kwargs: Any) -> None:
assert (
self.keyring_provider.has_keyring
), "should never reach here without keyring"
creds = self._credentials_to_save
self._credentials_to_save = None
if creds and resp.status_code < 400:
try:
logger.info("Saving credentials to keyring")
self.keyring_provider.save_auth_info(
creds.url, creds.username, creds.password
)
except Exception:
logger.exception("Failed to save credentials")